package io.debezium.connector.oracle;

import io.debezium.config.Configuration;
import io.debezium.connector.oracle.junit.SkipTestDependingOnStrategyRule;
import io.debezium.connector.oracle.junit.SkipWhenLogMiningStrategyIs;
import io.debezium.connector.oracle.util.TestHelper;
import io.debezium.data.VerifyRecord;
import io.debezium.doc.FixFor;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.processors.AbstractReselectProcessorTest;
import io.debezium.processors.reselect.ReselectColumnsPostProcessor;
import io.debezium.util.Testing;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.sql.Blob;
import java.sql.Clob;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;
import org.testcontainers.shaded.org.awaitility.Awaitility;

/* loaded from: input_file:io/debezium/connector/oracle/OracleReselectColumnsProcessorIT.class */
public class OracleReselectColumnsProcessorIT extends AbstractReselectProcessorTest<OracleConnector> {

    @Rule
    public final TestRule skipStrategyRule = new SkipTestDependingOnStrategyRule();
    private OracleConnection connection;

    @Before
    public void beforeEach() throws Exception {
        this.connection = TestHelper.testConnection();
        setConsumeTimeout(TestHelper.defaultMessageConsumerPollTimeout(), TimeUnit.SECONDS);
        initializeConnectorTestFramework();
        Testing.Files.delete(TestHelper.SCHEMA_HISTORY_PATH);
        super.beforeEach();
    }

    @After
    public void afterEach() throws Exception {
        super.afterEach();
        if (this.connection != null) {
            this.connection.close();
        }
    }

    protected Class<OracleConnector> getConnectorClass() {
        return OracleConnector.class;
    }

    protected JdbcConnection databaseConnection() {
        return this.connection;
    }

    protected Configuration.Builder getConfigurationBuilder() {
        return TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ4321").with(OracleConnectorConfig.CUSTOM_POST_PROCESSORS, "reselector").with("reselector.type", ReselectColumnsPostProcessor.class.getName());
    }

    protected String topicName() {
        return "server1.DEBEZIUM.DBZ4321";
    }

    protected String tableName() {
        return "DEBEZIUM.DBZ4321";
    }

    protected String reselectColumnsList() {
        return "DEBEZIUM.DBZ4321:DATA";
    }

    protected void createTable() throws Exception {
        TestHelper.dropTable(this.connection, "dbz4321");
        this.connection.execute(new String[]{"CREATE TABLE dbz4321 (id numeric(9,0) primary key, data varchar2(50), data2 numeric(9,0))"});
        TestHelper.streamTable(this.connection, "dbz4321");
    }

    protected void dropTable() throws Exception {
        TestHelper.dropTable(this.connection, "dbz4321");
    }

    protected String getInsertWithValue() {
        return "INSERT INTO dbz4321 (id,data,data2) values (1,'one',1)";
    }

    protected String getInsertWithNullValue() {
        return "INSERT INTO dbz4321 (id,data,data2) values (1,null,1)";
    }

    protected void waitForStreamingStarted() throws InterruptedException {
        waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
    }

    protected String fieldName(String str) {
        return str.toUpperCase();
    }

    @Test
    @SkipWhenLogMiningStrategyIs(value = SkipWhenLogMiningStrategyIs.Strategy.HYBRID, reason = "Cannot use lob.enabled with Hybrid")
    @FixFor({"DBZ-7729"})
    public void testColumnReselectionUsesPrimaryKeyColumnAndValuesDespiteMessageKeyColumnConfigs() throws Exception {
        TestHelper.dropTable(this.connection, "dbz7729");
        try {
            LogInterceptor reselectLogInterceptor = getReselectLogInterceptor();
            this.connection.execute(new String[]{"CREATE TABLE dbz7729 (id numeric(9,0) primary key, data clob, data2 numeric(9,0), data3 varchar2(25))"});
            TestHelper.streamTable(this.connection, "dbz7729");
            start(getConnectorClass(), getConfigurationBuilder().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ7729").with(OracleConnectorConfig.MSG_KEY_COLUMNS, "(.*).DEBEZIUM.DBZ7729:DATA3").with(OracleConnectorConfig.LOB_ENABLED, "true").with("reselector.reselect.columns.include.list", "DEBEZIUM.DBZ7729:DATA").build());
            assertConnectorIsRunning();
            waitForStreamingStarted();
            String randomAlphabetic = RandomStringUtils.randomAlphabetic(10000);
            Clob createClob = this.connection.connection().createClob();
            createClob.setString(1L, randomAlphabetic);
            this.connection.prepareQuery("INSERT INTO dbz7729 (id,data,data2,data3) values (1,?,1,'A')", preparedStatement -> {
                preparedStatement.setClob(1, createClob);
            }, (JdbcConnection.ResultSetConsumer) null);
            this.connection.commit();
            this.connection.execute(new String[]{"UPDATE dbz7729 set data2=10 where id = 1"});
            List recordsForTopic = consumeRecordsByTopic(2).recordsForTopic("server1.DEBEZIUM.DBZ7729");
            Assertions.assertThat(recordsForTopic).hasSize(2);
            SourceRecord sourceRecord = (SourceRecord) recordsForTopic.get(1);
            VerifyRecord.isValidUpdate(sourceRecord, true);
            Struct struct = (Struct) sourceRecord.key();
            Assertions.assertThat(struct.schema().fields()).hasSize(1);
            Assertions.assertThat(struct.get("DATA3")).isEqualTo("A");
            Struct struct2 = ((Struct) sourceRecord.value()).getStruct("after");
            Assertions.assertThat(struct2.get("ID")).isEqualTo(1);
            Assertions.assertThat(struct2.get("DATA")).isEqualTo(randomAlphabetic);
            Assertions.assertThat(struct2.get("DATA2")).isEqualTo(10);
            Assertions.assertThat(struct2.get("DATA3")).isEqualTo("A");
            assertColumnReselectedForUnavailableValue(reselectLogInterceptor, TestHelper.getDatabaseName() + ".DEBEZIUM.DBZ7729", "DATA");
            TestHelper.dropTable(this.connection, "dbz7729");
        } catch (Throwable th) {
            TestHelper.dropTable(this.connection, "dbz7729");
            throw th;
        }
    }

    @Test
    @SkipWhenLogMiningStrategyIs(value = SkipWhenLogMiningStrategyIs.Strategy.HYBRID, reason = "Cannot use lob.enabled with Hybrid")
    @FixFor({"DBZ-4321"})
    public void testClobReselectedWhenValueIsUnavailable() throws Exception {
        TestHelper.dropTable(this.connection, "dbz4321");
        try {
            LogInterceptor reselectLogInterceptor = getReselectLogInterceptor();
            this.connection.execute(new String[]{"CREATE TABLE dbz4321 (id numeric(9,0) primary key, data clob, data2 numeric(9,0))"});
            TestHelper.streamTable(this.connection, "dbz4321");
            start(OracleConnector.class, getConfigurationBuilder().with(OracleConnectorConfig.LOB_ENABLED, "true").with("reselector.reselect.columns.include.list", reselectColumnsList()).build());
            assertConnectorIsRunning();
            waitForStreamingStarted();
            String randomAlphabetic = RandomStringUtils.randomAlphabetic(10000);
            Clob createClob = this.connection.connection().createClob();
            createClob.setString(1L, randomAlphabetic);
            this.connection.prepareQuery("INSERT INTO dbz4321 (id,data,data2) values (1,?,1)", preparedStatement -> {
                preparedStatement.setClob(1, createClob);
            }, (JdbcConnection.ResultSetConsumer) null);
            this.connection.commit();
            this.connection.execute(new String[]{"UPDATE dbz4321 set data2=10 where id = 1"});
            List recordsForTopic = consumeRecordsByTopic(2).recordsForTopic("server1.DEBEZIUM.DBZ4321");
            Assertions.assertThat(recordsForTopic).hasSize(2);
            SourceRecord sourceRecord = (SourceRecord) recordsForTopic.get(1);
            VerifyRecord.isValidUpdate(sourceRecord, "ID", 1);
            Struct struct = ((Struct) sourceRecord.value()).getStruct("after");
            Assertions.assertThat(struct.get("ID")).isEqualTo(1);
            Assertions.assertThat(struct.get("DATA")).isEqualTo(randomAlphabetic);
            Assertions.assertThat(struct.get("DATA2")).isEqualTo(10);
            assertColumnReselectedForUnavailableValue(reselectLogInterceptor, TestHelper.getDatabaseName() + ".DEBEZIUM.DBZ4321", "DATA");
            TestHelper.dropTable(this.connection, "dbz4321");
        } catch (Throwable th) {
            TestHelper.dropTable(this.connection, "dbz4321");
            throw th;
        }
    }

    @Test
    @SkipWhenLogMiningStrategyIs(value = SkipWhenLogMiningStrategyIs.Strategy.HYBRID, reason = "Cannot use lob.enabled with Hybrid")
    @FixFor({"DBZ-4321"})
    public void testBlobReselectedWhenValueIsUnavailable() throws Exception {
        TestHelper.dropTable(this.connection, "dbz4321");
        try {
            LogInterceptor reselectLogInterceptor = getReselectLogInterceptor();
            this.connection.execute(new String[]{"CREATE TABLE dbz4321 (id numeric(9,0) primary key, data blob, data2 numeric(9,0))"});
            TestHelper.streamTable(this.connection, "dbz4321");
            start(OracleConnector.class, getConfigurationBuilder().with(OracleConnectorConfig.LOB_ENABLED, "true").with("reselector.reselect.columns.include.list", reselectColumnsList()).build());
            assertConnectorIsRunning();
            waitForStreamingStarted();
            byte[] bytes = RandomStringUtils.random(10000).getBytes(StandardCharsets.UTF_8);
            Blob createBlob = this.connection.connection().createBlob();
            createBlob.setBytes(1L, bytes);
            this.connection.prepareQuery("INSERT INTO dbz4321 (id,data,data2) values (1,?,1)", preparedStatement -> {
                preparedStatement.setBlob(1, createBlob);
            }, (JdbcConnection.ResultSetConsumer) null);
            this.connection.commit();
            this.connection.execute(new String[]{"UPDATE dbz4321 set data2=10 where id = 1"});
            List recordsForTopic = consumeRecordsByTopic(2).recordsForTopic("server1.DEBEZIUM.DBZ4321");
            Assertions.assertThat(recordsForTopic).hasSize(2);
            SourceRecord sourceRecord = (SourceRecord) recordsForTopic.get(1);
            VerifyRecord.isValidUpdate(sourceRecord, "ID", 1);
            Struct struct = ((Struct) sourceRecord.value()).getStruct("after");
            Assertions.assertThat(struct.get("ID")).isEqualTo(1);
            Assertions.assertThat(struct.get("DATA")).isEqualTo(ByteBuffer.wrap(bytes));
            Assertions.assertThat(struct.get("DATA2")).isEqualTo(10);
            assertColumnReselectedForUnavailableValue(reselectLogInterceptor, TestHelper.getDatabaseName() + ".DEBEZIUM.DBZ4321", "DATA");
            TestHelper.dropTable(this.connection, "dbz4321");
        } catch (Throwable th) {
            TestHelper.dropTable(this.connection, "dbz4321");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-8493"})
    @Ignore("This requires running ALTER SYSTEM SET UNDO_RETENTION=60 within the PDB, which we do not want to automate")
    public void testShouldNotThrowErrorUsingFallbackQuery() throws Exception {
        TestHelper.dropTable(this.connection, "dbz4321");
        try {
            getReselectLogInterceptor();
            this.connection.execute(new String[]{"CREATE TABLE dbz4321 (id numeric(9,0) primary key, data clob, data2 numeric(9,0))"});
            TestHelper.streamTable(this.connection, "dbz4321");
            Configuration build = getConfigurationBuilder().with(OracleConnectorConfig.LOB_ENABLED, "true").with("reselector.reselect.columns.include.list", reselectColumnsList()).build();
            start(OracleConnector.class, build);
            assertConnectorIsRunning();
            waitForStreamingStarted();
            String randomAlphanumeric = RandomStringUtils.randomAlphanumeric(10000);
            Clob createClob = this.connection.connection().createClob();
            createClob.setString(1L, randomAlphanumeric);
            this.connection.prepareQuery("INSERT INTO dbz4321 (id,data,data2) values (1,?,1)", preparedStatement -> {
                preparedStatement.setClob(1, createClob);
            }, (JdbcConnection.ResultSetConsumer) null);
            this.connection.commit();
            List recordsForTopic = consumeRecordsByTopic(1).recordsForTopic("server1.DEBEZIUM.DBZ4321");
            Assertions.assertThat(recordsForTopic).hasSize(1);
            SourceRecord sourceRecord = (SourceRecord) recordsForTopic.get(0);
            VerifyRecord.isValidInsert(sourceRecord, "ID", 1);
            Struct struct = ((Struct) sourceRecord.value()).getStruct("after");
            Assertions.assertThat(struct.get("ID")).isEqualTo(1);
            Assertions.assertThat(struct.get("DATA")).isEqualTo(randomAlphanumeric);
            Assertions.assertThat(struct.get("DATA2")).isEqualTo(1);
            stopConnector();
            this.connection.execute(new String[]{"UPDATE dbz4321 set data2 = 10 where id = 1"});
            long undoRetentionSeconds = TestHelper.getUndoRetentionSeconds();
            System.out.println("UNDO_RETENTION is set to " + undoRetentionSeconds + " seconds, waiting until it expires.");
            AtomicLong atomicLong = new AtomicLong();
            Awaitility.await().atMost(undoRetentionSeconds * 3, TimeUnit.SECONDS).pollInterval(Duration.ofSeconds(1L)).until(() -> {
                System.out.println("Total time waited: " + atomicLong.get() + " seconds");
                return Boolean.valueOf(atomicLong.addAndGet(1L) >= undoRetentionSeconds * 2);
            });
            start(OracleConnector.class, build);
            assertConnectorIsRunning();
            waitForStreamingStarted();
            List recordsForTopic2 = consumeRecordsByTopic(1).recordsForTopic("server1.DEBEZIUM.DBZ4321");
            Assertions.assertThat(recordsForTopic2).hasSize(1);
            SourceRecord sourceRecord2 = (SourceRecord) recordsForTopic2.get(0);
            VerifyRecord.isValidUpdate(sourceRecord2, "ID", 1);
            Struct struct2 = ((Struct) sourceRecord2.value()).getStruct("after");
            Assertions.assertThat(struct2.get("ID")).isEqualTo(1);
            Assertions.assertThat(struct2.get("DATA")).isEqualTo(randomAlphanumeric);
            Assertions.assertThat(struct2.get("DATA2")).isEqualTo(10);
            TestHelper.dropTable(this.connection, "dbz4321");
        } catch (Throwable th) {
            TestHelper.dropTable(this.connection, "dbz4321");
            throw th;
        }
    }
}
