package io.debezium.connector.oracle.logminer.unbuffered;

import ch.qos.logback.classic.Level;
import io.debezium.config.Configuration;
import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnector;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleOffsetContext;
import io.debezium.connector.oracle.OraclePartition;
import io.debezium.connector.oracle.Scn;
import io.debezium.connector.oracle.junit.SkipTestDependingOnAdapterNameRule;
import io.debezium.connector.oracle.junit.SkipWhenAdapterNameIsNot;
import io.debezium.connector.oracle.util.TestHelper;
import io.debezium.doc.FixFor;
import io.debezium.embedded.async.AbstractAsyncEngineConnectorTest;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.util.Testing;
import java.sql.SQLException;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;

@SkipWhenAdapterNameIsNot(SkipWhenAdapterNameIsNot.AdapterName.LOGMINER_UNBUFFERED)
/* loaded from: input_file:io/debezium/connector/oracle/logminer/unbuffered/UnbufferedLogMinerAdapterIT.class */
public class UnbufferedLogMinerAdapterIT extends AbstractAsyncEngineConnectorTest {

    @Rule
    public final TestRule skipAdapterRule = new SkipTestDependingOnAdapterNameRule();
    private OracleConnection connection;

    @Before
    public void beforeEach() throws SQLException {
        setConsumeTimeout(TestHelper.defaultMessageConsumerPollTimeout(), TimeUnit.SECONDS);
        initializeConnectorTestFramework();
        Testing.Files.delete(TestHelper.SCHEMA_HISTORY_PATH);
        TestHelper.dropAllTables();
        this.connection = TestHelper.testConnection();
    }

    @After
    public void afterEach() throws SQLException {
        stopConnector();
        if (this.connection != null) {
            this.connection.close();
        }
    }

    @Test
    @FixFor({"DBZ-9013"})
    public void shouldAdvanceOffsetLowWatermarkWhenWhenNoInProgressTransactionsExist() throws Exception {
        TestHelper.dropTable(this.connection, "dbz9013");
        try {
            this.connection.execute(new String[]{"CREATE TABLE dbz9013 (id numeric(9,0) primary key, data varchar2(50))"});
            TestHelper.streamTable(this.connection, "dbz9013");
            LogInterceptor logInterceptor = new LogInterceptor(ResumePositionProvider.class);
            logInterceptor.setLoggerLevel(ResumePositionProvider.class, Level.DEBUG);
            LogInterceptor logInterceptor2 = new LogInterceptor(UnbufferedLogMinerStreamingChangeEventSource.class);
            logInterceptor2.setLoggerLevel(UnbufferedLogMinerStreamingChangeEventSource.class, Level.DEBUG);
            Configuration build = TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ9013").build();
            start(OracleConnector.class, build);
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            Awaitility.await().atMost(Duration.ofSeconds(30L)).until(() -> {
                return Boolean.valueOf(logInterceptor.containsMessage("Resume/Commit SCN "));
            });
            this.connection.execute(new String[]{"INSERT INTO dbz9013 (id,data) values (1,'test')"});
            Awaitility.await().atMost(Duration.ofSeconds(30L)).until(() -> {
                return Boolean.valueOf(logInterceptor2.containsMessage("Advancing offset low-watermark scn"));
            });
            Assertions.assertThat(consumeRecordsByTopic(1).recordsForTopic("server1.DEBEZIUM.DBZ9013")).hasSize(1);
            stopConnector();
            Map readLastCommittedOffset = readLastCommittedOffset(build, new OraclePartition(TestHelper.SERVER_NAME, TestHelper.DATABASE).getSourcePartition());
            Scn loadSnapshotScn = OracleOffsetContext.loadSnapshotScn(readLastCommittedOffset);
            Assertions.assertThat(loadSnapshotScn).isNotNull();
            Scn scnFromOffsetMapByKey = OracleOffsetContext.getScnFromOffsetMapByKey(readLastCommittedOffset, "scn");
            Assertions.assertThat(scnFromOffsetMapByKey).isNotNull();
            Assertions.assertThat(scnFromOffsetMapByKey.asBigInteger()).isGreaterThan(loadSnapshotScn.asBigInteger());
            TestHelper.dropTable(this.connection, "dbz9013");
        } catch (Throwable th) {
            TestHelper.dropTable(this.connection, "dbz9013");
            throw th;
        }
    }
}
