package io.debezium.connector.oracle.logminer;

import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnector;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.junit.SkipTestDependingOnAdapterNameRule;
import io.debezium.connector.oracle.junit.SkipWhenAdapterNameIsNot;
import io.debezium.connector.oracle.util.TestHelper;
import io.debezium.doc.FixFor;
import io.debezium.embedded.async.AbstractAsyncEngineConnectorTest;
import io.debezium.junit.logging.LogInterceptor;
import java.lang.management.ManagementFactory;
import java.sql.SQLException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;
import javax.management.JMException;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
@SkipWhenAdapterNameIsNot(value = SkipWhenAdapterNameIsNot.AdapterName.ANY_LOGMINER, reason = "LogMiner specific")
/* loaded from: input_file:io/debezium/connector/oracle/logminer/UsernameFilterIT.class */
public class UsernameFilterIT extends AbstractAsyncEngineConnectorTest {

    @Rule
    public final TestRule skipAdapterRule = new SkipTestDependingOnAdapterNameRule();
    private static OracleConnection connection;
    private final String lobEnabled;

    @BeforeClass
    public static void beforeSuperClass() throws SQLException {
        connection = TestHelper.testConnection();
    }

    @AfterClass
    public static void closeConnection() throws SQLException {
        if (connection == null || !connection.isConnected()) {
            return;
        }
        connection.close();
    }

    @Parameterized.Parameters(name = "{index}: lobEnabled={0}")
    public static Collection<Object[]> lobEnabled() {
        return Arrays.asList(new Object[]{"false"}, new Object[]{"true"});
    }

    public UsernameFilterIT(String str) {
        this.lobEnabled = str;
    }

    @Test
    @FixFor({"DBZ-3978"})
    @SkipWhenAdapterNameIsNot(value = SkipWhenAdapterNameIsNot.AdapterName.LOGMINER_BUFFERED, reason = "Buffered filters at commit time while unbuffered filters at transaction start")
    public void shouldExcludeEventsByUsernameFilter() throws Exception {
        try {
            TestHelper.dropTable(connection, "dbz3978");
            connection.execute(new String[]{"CREATE TABLE dbz3978 (id number(9,0), data varchar2(50), primary key (id))"});
            TestHelper.streamTable(connection, "dbz3978");
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ3978").with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.NO_DATA).with(OracleConnectorConfig.LOG_MINING_USERNAME_EXCLUDE_LIST, "DEBEZIUM").with(OracleConnectorConfig.LOB_ENABLED, this.lobEnabled).with(OracleConnectorConfig.LOG_MINING_QUERY_FILTER_MODE, "none").build());
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            connection.executeWithoutCommitting(new String[]{"INSERT INTO debezium.dbz3978 VALUES (1, 'Test1')"});
            connection.executeWithoutCommitting(new String[]{"INSERT INTO debezium.dbz3978 VALUES (2, 'Test2')"});
            connection.execute(new String[]{"COMMIT"});
            Assertions.assertThat(waitForAvailableRecords(10L, TimeUnit.SECONDS)).isFalse();
            Assertions.assertThat((Long) getStreamingMetric("TotalCapturedDmlCount")).isGreaterThanOrEqualTo(2L);
            TestHelper.dropTable(connection, "dbz3978");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "dbz3978");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-8884"})
    public void shouldIncludeEventsByUsernameFilterTransactionSplitOverMultipleMiningSessionsNoQueryFilter() throws Exception {
        try {
            TestHelper.dropTable(connection, "dbz8884");
            TestHelper.dropTable(connection, "dbz8884b");
            connection.execute(new String[]{"CREATE TABLE dbz8884 (id number(9,0), data varchar2(50), primary key(id))"});
            connection.execute(new String[]{"CREATE TABLE dbz8884b (id number(9,0), data varchar2(50), primary key(id))"});
            TestHelper.streamTable(connection, "dbz8884");
            TestHelper.streamTable(connection, "dbz8884b");
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ8884").with(OracleConnectorConfig.LOG_MINING_USERNAME_INCLUDE_LIST, "DEBEZIUM").with(OracleConnectorConfig.LOB_ENABLED, this.lobEnabled).with(OracleConnectorConfig.LOG_MINING_QUERY_FILTER_MODE, "none").build());
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            for (int i = 1; i <= 10; i++) {
                connection.executeWithoutCommitting(new String[]{String.format("INSERT INTO dbz8884b (id,data) values (%d,'Test%d')", Integer.valueOf(i), Integer.valueOf(i))});
                Thread.sleep(1000L);
            }
            for (int i2 = 1; i2 <= 10; i2++) {
                connection.executeWithoutCommitting(new String[]{String.format("INSERT INTO dbz8884 (id,data) values (%d,'Test%d')", Integer.valueOf(i2), Integer.valueOf(i2))});
                Thread.sleep(1000L);
            }
            connection.execute(new String[]{"COMMIT"});
            List recordsForTopic = consumeRecordsByTopic(10).recordsForTopic(topicName("DEBEZIUM", "DBZ8884"));
            Assertions.assertThat(recordsForTopic).hasSize(10);
            for (int i3 = 1; i3 <= 10; i3++) {
                SourceRecord sourceRecord = (SourceRecord) recordsForTopic.get(i3 - 1);
                Assertions.assertThat(getAfter(sourceRecord).get("ID")).isEqualTo(Integer.valueOf(i3));
                Assertions.assertThat(getAfter(sourceRecord).get("DATA")).isEqualTo(String.format("Test%d", Integer.valueOf(i3)));
                Assertions.assertThat(getSource(sourceRecord).get("user_name")).isEqualTo("DEBEZIUM");
            }
            assertNoRecordsToConsume();
            TestHelper.dropTable(connection, "dbz8884");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "dbz8884");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-8884"})
    public void shouldIncludeEventsByUsernameFilterTransactionSplitOverMultipleMiningSessionsWithQueryFilter() throws Exception {
        try {
            TestHelper.dropTable(connection, "dbz8884");
            TestHelper.dropTable(connection, "dbz8884b");
            connection.execute(new String[]{"CREATE TABLE dbz8884 (id number(9,0), data varchar2(50), primary key(id))"});
            connection.execute(new String[]{"CREATE TABLE dbz8884b (id number(9,0), data varchar2(50), primary key(id))"});
            TestHelper.streamTable(connection, "dbz8884");
            TestHelper.streamTable(connection, "dbz8884b");
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ8884").with(OracleConnectorConfig.LOG_MINING_USERNAME_INCLUDE_LIST, "DEBEZIUM").with(OracleConnectorConfig.LOB_ENABLED, this.lobEnabled).with(OracleConnectorConfig.LOG_MINING_QUERY_FILTER_MODE, "in").build());
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            for (int i = 1; i <= 10; i++) {
                connection.executeWithoutCommitting(new String[]{String.format("INSERT INTO dbz8884b (id,data) values (%d,'Test%d')", Integer.valueOf(i), Integer.valueOf(i))});
                Thread.sleep(1000L);
            }
            for (int i2 = 1; i2 <= 10; i2++) {
                connection.executeWithoutCommitting(new String[]{String.format("INSERT INTO dbz8884 (id,data) values (%d,'Test%d')", Integer.valueOf(i2), Integer.valueOf(i2))});
                Thread.sleep(1000L);
            }
            connection.execute(new String[]{"COMMIT"});
            List recordsForTopic = consumeRecordsByTopic(10).recordsForTopic(topicName("DEBEZIUM", "DBZ8884"));
            Assertions.assertThat(recordsForTopic).hasSize(10);
            for (int i3 = 1; i3 <= 10; i3++) {
                SourceRecord sourceRecord = (SourceRecord) recordsForTopic.get(i3 - 1);
                Assertions.assertThat(getAfter(sourceRecord).get("ID")).isEqualTo(Integer.valueOf(i3));
                Assertions.assertThat(getAfter(sourceRecord).get("DATA")).isEqualTo(String.format("Test%d", Integer.valueOf(i3)));
                Assertions.assertThat(getSource(sourceRecord).get("user_name")).isEqualTo("DEBEZIUM");
            }
            assertNoRecordsToConsume();
            TestHelper.dropTable(connection, "dbz8884");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "dbz8884");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-8884"})
    public void shouldOnlyCaptureEventsForIncludedUsernames() throws Exception {
        TestHelper.dropTable(connection, "dbz8884");
        try {
            connection.execute(new String[]{"CREATE TABLE dbz8884 (id numeric(9,0) primary key, data varchar2(50))"});
            TestHelper.streamTable(connection, "dbz8884");
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ8884").with(OracleConnectorConfig.LOG_MINING_USERNAME_INCLUDE_LIST, "abc").with(OracleConnectorConfig.LOB_ENABLED, this.lobEnabled).build());
            assertConnectorIsRunning();
            waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
            LogInterceptor abstractEventProcessorLogInterceptor = TestHelper.getAbstractEventProcessorLogInterceptor();
            connection.execute(new String[]{"INSERT INTO dbz8884 (id,data) values (1,'abc')"});
            Awaitility.await().atMost(Duration.ofSeconds(TestHelper.defaultMessageConsumerPollTimeout())).until(() -> {
                return Boolean.valueOf(abstractEventProcessorLogInterceptor.containsMessage("Skipped transaction with username DEBEZIUM"));
            });
            assertNoRecordsToConsume();
            TestHelper.dropTable(connection, "dbz8884");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "dbz8884");
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-8884"})
    public void shouldThrowConfigurationExceptionWhenUsernameIncludeExcludeBothSpecified() throws Exception {
        TestHelper.dropTable(connection, "dbz8884");
        try {
            connection.execute(new String[]{"CREATE TABLE dbz8884 (id numeric(9,0) primary key, data varchar2(50))"});
            TestHelper.streamTable(connection, "dbz8884");
            LogInterceptor logInterceptor = new LogInterceptor(UsernameFilterIT.class);
            start(OracleConnector.class, TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ8884").with(OracleConnectorConfig.LOG_MINING_USERNAME_INCLUDE_LIST, "DEBEZIUM").with(OracleConnectorConfig.LOG_MINING_USERNAME_EXCLUDE_LIST, "DEBEZIUM").with(OracleConnectorConfig.LOB_ENABLED, this.lobEnabled).build());
            Awaitility.await().atMost(Duration.ofSeconds(TestHelper.defaultMessageConsumerPollTimeout())).until(() -> {
                return Boolean.valueOf(logInterceptor.containsErrorMessage("Connector configuration is not valid. The 'log.mining.username.exclude.list' value is invalid: \"log.mining.username.include.list\" is already specified"));
            });
            TestHelper.dropTable(connection, "dbz8884");
        } catch (Throwable th) {
            TestHelper.dropTable(connection, "dbz8884");
            throw th;
        }
    }

    public static <T> T getStreamingMetric(String str) throws JMException {
        return (T) ManagementFactory.getPlatformMBeanServer().getAttribute(getStreamingMetricsObjectName(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME), str);
    }

    private static String topicName(String str, String str2) {
        return String.format("%s.%s.%s", TestHelper.SERVER_NAME, str, str2);
    }

    private static Struct getAfter(SourceRecord sourceRecord) {
        return ((Struct) sourceRecord.value()).getStruct("after");
    }

    private static Struct getSource(SourceRecord sourceRecord) {
        return ((Struct) sourceRecord.value()).getStruct("source");
    }
}
