package io.debezium.connector.oracle.logminer;

import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnector;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.junit.SkipTestDependingOnAdapterNameRule;
import io.debezium.connector.oracle.junit.SkipWhenAdapterNameIsNot;
import io.debezium.connector.oracle.util.TestHelper;
import io.debezium.doc.FixFor;
import io.debezium.embedded.async.AbstractAsyncEngineConnectorTest;
import io.debezium.junit.logging.LogInterceptor;
import java.sql.SQLException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
@SkipWhenAdapterNameIsNot(SkipWhenAdapterNameIsNot.AdapterName.ANY_LOGMINER)
/* loaded from: input_file:io/debezium/connector/oracle/logminer/ClientIdFilterIT.class */
public class ClientIdFilterIT extends AbstractAsyncEngineConnectorTest {

    @Rule
    public final TestRule skipAdapterRule = new SkipTestDependingOnAdapterNameRule();
    private static OracleConnection connection;
    private final String lobEnabled;

    @BeforeClass
    public static void beforeClass() throws SQLException {
        connection = TestHelper.testConnection();
    }

    @AfterClass
    public static void closeConnection() throws SQLException {
        if (connection != null) {
            connection.close();
        }
    }

    @Parameterized.Parameters(name = "{index}: lobEnabled={0}")
    public static Collection<Object[]> lobEnabled() {
        return Arrays.asList(new Object[]{"false"}, new Object[]{"true"});
    }

    public ClientIdFilterIT(String str) {
        this.lobEnabled = str;
    }

    @Test
    @FixFor({"DBZ-8904"})
    public void shouldExcludeTransactionWithAnExcludedClientId() throws Exception {
        TestHelper.dropTable(connection, "dbz8904");
        try {
            connection.execute(new String[]{"CREATE TABLE dbz8904 (id numeric(9,0) primary key, data varchar2(50))"});
            TestHelper.streamTable(connection, "dbz8904");
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ8904").with(OracleConnectorConfig.LOG_MINING_CLIENTID_EXCLUDE_LIST, "abc,xyz").with(OracleConnectorConfig.LOB_ENABLED, this.lobEnabled).build());
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            LogInterceptor abstractEventProcessorLogInterceptor = TestHelper.getAbstractEventProcessorLogInterceptor();
            OracleConnection testConnection = TestHelper.testConnection();
            try {
                testConnection.connection().setClientInfo("OCSID.CLIENTID", "abc");
                testConnection.execute(new String[]{"INSERT INTO dbz8904 (id,data) values (1,'abc')"});
                if (testConnection != null) {
                    testConnection.close();
                }
                testConnection = TestHelper.testConnection();
                try {
                    testConnection.connection().setClientInfo("OCSID.CLIENTID", "xyz");
                    testConnection.execute(new String[]{"INSERT INTO dbz8904 (id,data) values (2,'xyz')"});
                    if (testConnection != null) {
                        testConnection.close();
                    }
                    Awaitility.await().atMost(Duration.ofSeconds(TestHelper.defaultMessageConsumerPollTimeout())).until(() -> {
                        return Boolean.valueOf(abstractEventProcessorLogInterceptor.containsMessage("Skipped transaction with excluded client id abc") && abstractEventProcessorLogInterceptor.containsMessage("Skipped transaction with excluded client id xyz"));
                    });
                    connection.execute(new String[]{"INSERT INTO dbz8904 (id,data) values (3,'none')"});
                    List recordsForTopic = consumeRecordsByTopic(1).recordsForTopic("server1.DEBEZIUM.DBZ8904");
                    Assertions.assertThat(recordsForTopic).hasSize(1);
                    Assertions.assertThat(getAfter((SourceRecord) recordsForTopic.get(0)).get("ID")).isEqualTo(3);
                    Assertions.assertThat(getAfter((SourceRecord) recordsForTopic.get(0)).get("DATA")).isEqualTo("none");
                    assertNoRecordsToConsume();
                    TestHelper.dropTable(connection, "dbz8904");
                } finally {
                }
            } finally {
            }
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "dbz8904");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-8904"})
    public void shouldExcludeTransactionWithoutLoggingWithAnExcludedClientId() throws Exception {
        TestHelper.dropTable(connection, "dbz8904");
        try {
            connection.execute(new String[]{"CREATE TABLE dbz8904 (id numeric(9,0) primary key, data varchar2(50))"});
            TestHelper.streamTable(connection, "dbz8904");
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ8904").with(OracleConnectorConfig.LOG_MINING_CLIENTID_EXCLUDE_LIST, "abc,xyz").with(OracleConnectorConfig.LOG_MINING_QUERY_FILTER_MODE, "in").with(OracleConnectorConfig.LOB_ENABLED, this.lobEnabled).build());
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            LogInterceptor abstractEventProcessorLogInterceptor = TestHelper.getAbstractEventProcessorLogInterceptor();
            OracleConnection testConnection = TestHelper.testConnection();
            try {
                testConnection.connection().setClientInfo("OCSID.CLIENTID", "abc");
                testConnection.execute(new String[]{"INSERT INTO dbz8904 (id,data) values (1,'abc')"});
                if (testConnection != null) {
                    testConnection.close();
                }
                testConnection = TestHelper.testConnection();
                try {
                    testConnection.connection().setClientInfo("OCSID.CLIENTID", "xyz");
                    testConnection.execute(new String[]{"INSERT INTO dbz8904 (id,data) values (2,'xyz')"});
                    if (testConnection != null) {
                        testConnection.close();
                    }
                    connection.execute(new String[]{"INSERT INTO dbz8904 (id,data) values (3,'none')"});
                    List recordsForTopic = consumeRecordsByTopic(1).recordsForTopic("server1.DEBEZIUM.DBZ8904");
                    Assertions.assertThat(recordsForTopic).hasSize(1);
                    Assertions.assertThat(getAfter((SourceRecord) recordsForTopic.get(0)).get("ID")).isEqualTo(3);
                    Assertions.assertThat(getAfter((SourceRecord) recordsForTopic.get(0)).get("DATA")).isEqualTo("none");
                    Assertions.assertThat(abstractEventProcessorLogInterceptor.containsMessage("Skipped transaction with excluded client id abc")).isFalse();
                    Assertions.assertThat(abstractEventProcessorLogInterceptor.containsMessage("Skipped transaction with excluded client id xyz")).isFalse();
                    assertNoRecordsToConsume();
                    TestHelper.dropTable(connection, "dbz8904");
                } finally {
                }
            } finally {
            }
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "dbz8904");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-8904"})
    public void shouldExcludeTransactionWithAnIncludedClientId() throws Exception {
        TestHelper.dropTable(connection, "dbz8904");
        try {
            connection.execute(new String[]{"CREATE TABLE dbz8904 (id numeric(9,0) primary key, data varchar2(50))"});
            TestHelper.streamTable(connection, "dbz8904");
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ8904").with(OracleConnectorConfig.LOG_MINING_CLIENTID_INCLUDE_LIST, "abc").with(OracleConnectorConfig.LOB_ENABLED, this.lobEnabled).build());
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            LogInterceptor abstractEventProcessorLogInterceptor = TestHelper.getAbstractEventProcessorLogInterceptor();
            OracleConnection testConnection = TestHelper.testConnection();
            try {
                testConnection.connection().setClientInfo("OCSID.CLIENTID", "abc");
                testConnection.execute(new String[]{"INSERT INTO dbz8904 (id,data) values (1,'abc')"});
                if (testConnection != null) {
                    testConnection.close();
                }
                testConnection = TestHelper.testConnection();
                try {
                    testConnection.connection().setClientInfo("OCSID.CLIENTID", "xyz");
                    testConnection.execute(new String[]{"INSERT INTO dbz8904 (id,data) values (2,'xyz')"});
                    if (testConnection != null) {
                        testConnection.close();
                    }
                    Awaitility.await().atMost(Duration.ofSeconds(TestHelper.defaultMessageConsumerPollTimeout())).until(() -> {
                        return Boolean.valueOf(abstractEventProcessorLogInterceptor.containsMessage("Skipped transaction with client id xyz"));
                    });
                    connection.execute(new String[]{"INSERT INTO dbz8904 (id,data) values (3,'none')"});
                    List recordsForTopic = consumeRecordsByTopic(1).recordsForTopic("server1.DEBEZIUM.DBZ8904");
                    Assertions.assertThat(recordsForTopic).hasSize(1);
                    Assertions.assertThat(getAfter((SourceRecord) recordsForTopic.get(0)).get("ID")).isEqualTo(1);
                    Assertions.assertThat(getAfter((SourceRecord) recordsForTopic.get(0)).get("DATA")).isEqualTo("abc");
                    assertNoRecordsToConsume();
                    TestHelper.dropTable(connection, "dbz8904");
                } finally {
                }
            } finally {
            }
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "dbz8904");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-8904"})
    public void shouldExcludeTransactionWithoutLoggingWithAnIncludedClientId() throws Exception {
        TestHelper.dropTable(connection, "dbz8904");
        try {
            connection.execute(new String[]{"CREATE TABLE dbz8904 (id numeric(9,0) primary key, data varchar2(50))"});
            TestHelper.streamTable(connection, "dbz8904");
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ8904").with(OracleConnectorConfig.LOG_MINING_CLIENTID_INCLUDE_LIST, "abc").with(OracleConnectorConfig.LOB_ENABLED, this.lobEnabled).build());
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            LogInterceptor abstractEventProcessorLogInterceptor = TestHelper.getAbstractEventProcessorLogInterceptor();
            OracleConnection testConnection = TestHelper.testConnection();
            try {
                testConnection.connection().setClientInfo("OCSID.CLIENTID", "abc");
                testConnection.execute(new String[]{"INSERT INTO dbz8904 (id,data) values (1,'abc')"});
                if (testConnection != null) {
                    testConnection.close();
                }
                testConnection = TestHelper.testConnection();
                try {
                    testConnection.connection().setClientInfo("OCSID.CLIENTID", "xyz");
                    testConnection.execute(new String[]{"INSERT INTO dbz8904 (id,data) values (2,'xyz')"});
                    if (testConnection != null) {
                        testConnection.close();
                    }
                    connection.execute(new String[]{"INSERT INTO dbz8904 (id,data) values (3,'none')"});
                    List recordsForTopic = consumeRecordsByTopic(1).recordsForTopic("server1.DEBEZIUM.DBZ8904");
                    Assertions.assertThat(recordsForTopic).hasSize(1);
                    Assertions.assertThat(getAfter((SourceRecord) recordsForTopic.get(0)).get("ID")).isEqualTo(1);
                    Assertions.assertThat(getAfter((SourceRecord) recordsForTopic.get(0)).get("DATA")).isEqualTo("abc");
                    Assertions.assertThat(abstractEventProcessorLogInterceptor.containsMessage("Skipped transaction with excluded client id xyz")).isFalse();
                    assertNoRecordsToConsume();
                    TestHelper.dropTable(connection, "dbz8904");
                } finally {
                }
            } finally {
            }
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "dbz8904");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-8904"})
    public void shouldThrowConfigurationExceptionWhenClientIdIncludeExcludeBothSpecified() throws Exception {
        TestHelper.dropTable(connection, "dbz8904");
        try {
            connection.execute(new String[]{"CREATE TABLE dbz8904 (id numeric(9,0) primary key, data varchar2(50))"});
            TestHelper.streamTable(connection, "dbz8904");
            LogInterceptor logInterceptor = new LogInterceptor(ClientIdFilterIT.class);
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ8904").with(OracleConnectorConfig.LOG_MINING_CLIENTID_INCLUDE_LIST, "abc").with(OracleConnectorConfig.LOG_MINING_CLIENTID_EXCLUDE_LIST, "xyz").with(OracleConnectorConfig.LOB_ENABLED, this.lobEnabled).build());
            Awaitility.await().atMost(Duration.ofSeconds(TestHelper.defaultMessageConsumerPollTimeout())).until(() -> {
                return Boolean.valueOf(logInterceptor.containsErrorMessage("Connector configuration is not valid. The 'log.mining.clientid.exclude.list' value is invalid: \"log.mining.clientid.include.list\": is already specified"));
            });
            TestHelper.dropTable(connection, "dbz8904");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "dbz8904");
            throw th;
        }
    }

    private static Struct getAfter(SourceRecord sourceRecord) {
        return ((Struct) sourceRecord.value()).getStruct("after");
    }
}
