package io.debezium.connector.oracle.logminer.buffered;

import io.debezium.config.Configuration;
import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnector;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.junit.SkipTestDependingOnAdapterNameRule;
import io.debezium.connector.oracle.util.TestHelper;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.embedded.async.AbstractAsyncEngineConnectorTest;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.util.Testing;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;

/* loaded from: input_file:io/debezium/connector/oracle/logminer/buffered/AbstractBufferedLogMinerStreamingChangeEventSourceIT.class */
public abstract class AbstractBufferedLogMinerStreamingChangeEventSourceIT extends AbstractAsyncEngineConnectorTest {
    private OracleConnection connection;

    @Rule
    public TestRule skipRule = new SkipTestDependingOnAdapterNameRule();

    @Before
    public void before() throws Exception {
        this.connection = TestHelper.testConnection();
        setConsumeTimeout(TestHelper.defaultMessageConsumerPollTimeout(), TimeUnit.SECONDS);
        initializeConnectorTestFramework();
        Testing.Files.delete(TestHelper.SCHEMA_HISTORY_PATH);
        TestHelper.dropTable(this.connection, "dbz3752");
        this.connection.execute(new String[]{"CREATE TABLE dbz3752(id number(9,0) primary key, name varchar2(50))"});
        TestHelper.streamTable(this.connection, "dbz3752");
    }

    @After
    public void after() throws Exception {
        stopConnector();
        if (this.connection != null) {
            TestHelper.dropTable(this.connection, "dbz3752");
            this.connection.close();
        }
    }

    protected abstract Configuration.Builder getBufferImplementationConfig();

    protected boolean hasPersistedState() {
        return false;
    }

    @Test
    @FixFor({"DBZ-3752"})
    public void shouldResumeFromPersistedState() throws Exception {
        if (hasPersistedState()) {
            Configuration build = getBufferImplementationConfig().with(OracleConnectorConfig.LOG_MINING_BUFFER_DROP_ON_STOP, false).with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ3752").build();
            start(OracleConnector.class, build);
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            this.connection.execute(new String[]{"INSERT INTO dbz3752 (id,name) values (1, 'Mickey Mouse')"});
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
            Assertions.assertThat(consumeRecordsByTopic.allRecordsInOrder()).hasSize(1);
            List recordsForTopic = consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.DBZ3752");
            Assertions.assertThat(recordsForTopic).hasSize(1);
            Struct struct = ((Struct) ((SourceRecord) recordsForTopic.get(0)).value()).getStruct("after");
            Assertions.assertThat(struct.get("ID")).isEqualTo(1);
            Assertions.assertThat(struct.get("NAME")).isEqualTo("Mickey Mouse");
            stopConnector();
            this.connection.execute(new String[]{"INSERT INTO dbz3752 (id,name) values (2, 'Donald Duck')"});
            start(OracleConnector.class, build);
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            this.connection.execute(new String[]{"INSERT INTO dbz3752 (id,name) values (3, 'Roger Rabbit')"});
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(2);
            Assertions.assertThat(consumeRecordsByTopic2.allRecordsInOrder()).hasSize(2);
            List recordsForTopic2 = consumeRecordsByTopic2.recordsForTopic("server1.DEBEZIUM.DBZ3752");
            Assertions.assertThat(recordsForTopic2).hasSize(2);
            Struct struct2 = ((Struct) ((SourceRecord) recordsForTopic2.get(0)).value()).getStruct("after");
            Assertions.assertThat(struct2.get("ID")).isEqualTo(2);
            Assertions.assertThat(struct2.get("NAME")).isEqualTo("Donald Duck");
            Struct struct3 = ((Struct) ((SourceRecord) recordsForTopic2.get(1)).value()).getStruct("after");
            Assertions.assertThat(struct3.get("ID")).isEqualTo(3);
            Assertions.assertThat(struct3.get("NAME")).isEqualTo("Roger Rabbit");
        }
    }

    @Test
    @FixFor({"DBZ-3752"})
    public void shouldResumeLongRunningTransactionFromPersistedState() throws Exception {
        if (hasPersistedState()) {
            Configuration build = getBufferImplementationConfig().with(OracleConnectorConfig.LOG_MINING_BUFFER_DROP_ON_STOP, false).with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ3752").build();
            start(OracleConnector.class, build);
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            OracleConnection testConnection = TestHelper.testConnection();
            try {
                this.connection.executeWithoutCommitting(new String[]{"INSERT INTO dbz3752 (id,name) values (1, 'Mickey Mouse')"});
                testConnection.execute(new String[]{"INSERT INTO dbz3752 (id,name) values (2, 'Donald Duck')"});
                if (testConnection != null) {
                    testConnection.close();
                }
                AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
                Assertions.assertThat(consumeRecordsByTopic.allRecordsInOrder()).hasSize(1);
                List recordsForTopic = consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.DBZ3752");
                Assertions.assertThat(recordsForTopic).hasSize(1);
                Struct struct = ((Struct) ((SourceRecord) recordsForTopic.get(0)).value()).getStruct("after");
                Assertions.assertThat(struct.get("ID")).isEqualTo(2);
                Assertions.assertThat(struct.get("NAME")).isEqualTo("Donald Duck");
                assertNoRecordsToConsume();
                stopConnector();
                this.connection.executeWithoutCommitting(new String[]{"INSERT INTO dbz3752 (id,name) values (3, 'Minnie Mouse')"});
                start(OracleConnector.class, build);
                assertConnectorIsRunning();
                waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
                this.connection.execute(new String[]{"INSERT INTO dbz3752 (id,name) values (4, 'Roger Rabbit')"});
                AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(3);
                Assertions.assertThat(consumeRecordsByTopic2.allRecordsInOrder()).hasSize(3);
                List recordsForTopic2 = consumeRecordsByTopic2.recordsForTopic("server1.DEBEZIUM.DBZ3752");
                Assertions.assertThat(recordsForTopic2).hasSize(3);
                Struct struct2 = ((Struct) ((SourceRecord) recordsForTopic2.get(0)).value()).getStruct("after");
                Assertions.assertThat(struct2.get("ID")).isEqualTo(1);
                Assertions.assertThat(struct2.get("NAME")).isEqualTo("Mickey Mouse");
                Struct struct3 = ((Struct) ((SourceRecord) recordsForTopic2.get(1)).value()).getStruct("after");
                Assertions.assertThat(struct3.get("ID")).isEqualTo(3);
                Assertions.assertThat(struct3.get("NAME")).isEqualTo("Minnie Mouse");
                Struct struct4 = ((Struct) ((SourceRecord) recordsForTopic2.get(2)).value()).getStruct("after");
                Assertions.assertThat(struct4.get("ID")).isEqualTo(4);
                Assertions.assertThat(struct4.get("NAME")).isEqualTo("Roger Rabbit");
            } catch (Throwable th) {
                if (testConnection != null) {
                    try {
                        testConnection.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
    }

    @Test
    @FixFor({"DBZ-8044"})
    public void shouldLogAdditionalDetailsForAbandonedTransaction() throws Exception {
        TestHelper.dropTable(this.connection, "dbz8044");
        try {
            this.connection.execute(new String[]{"CREATE TABLE dbz8044 (id numeric(9,0) primary key, data varchar2(50))"});
            TestHelper.streamTable(this.connection, "dbz8044");
            Configuration build = TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ8044").with(OracleConnectorConfig.LOG_MINING_TRANSACTION_RETENTION_MS, "20000").build();
            LogInterceptor logInterceptor = new LogInterceptor(BufferedLogMinerStreamingChangeEventSource.class);
            start(OracleConnector.class, build);
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            this.connection.executeWithoutCommitting(new String[]{"INSERT INTO dbz8044 (id,data) values (1, 'test')"});
            Awaitility.await().atMost(5L, TimeUnit.MINUTES).until(() -> {
                return Boolean.valueOf(logInterceptor.containsMessage(" is being abandoned"));
            });
            this.connection.commit();
            Assertions.assertThat(logInterceptor.containsMessage(String.format(", 1 tables [%s.DEBEZIUM.DBZ8044]", TestHelper.getDatabaseName()))).isTrue();
        } finally {
            TestHelper.dropTable(this.connection, "dbz8044");
        }
    }
}
