package io.debezium.connector.oracle;

import io.debezium.config.Configuration;
import io.debezium.connector.SnapshotType;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.util.TestHelper;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.embedded.async.AbstractAsyncEngineConnectorTest;
import io.debezium.util.Testing;
import java.sql.SQLException;
import java.util.List;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:io/debezium/connector/oracle/SchemaHistoryTopicIT.class */
public class SchemaHistoryTopicIT extends AbstractAsyncEngineConnectorTest {
    private static OracleConnection connection;

    @Before
    public void before() throws SQLException {
        connection = TestHelper.testConnection();
        TestHelper.dropTable(connection, "debezium.tablea");
        TestHelper.dropTable(connection, "debezium.tableb");
        TestHelper.dropTable(connection, "debezium.tablec");
        connection.execute(new String[]{"CREATE TABLE debezium.tablea (id numeric(9,0) not null, cola varchar2(30), primary key(id))", "CREATE TABLE debezium.tableb (id numeric(9,0) not null, colb varchar2(30), primary key(id))", "CREATE TABLE debezium.tablec (id numeric(9,0) not null, colc varchar2(30), primary key(id))"});
        connection.execute(new String[]{"GRANT SELECT ON debezium.tablea to  " + TestHelper.getConnectorUserName()});
        connection.execute(new String[]{"ALTER TABLE debezium.tablea ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS"});
        connection.execute(new String[]{"GRANT SELECT ON debezium.tableb to  " + TestHelper.getConnectorUserName()});
        connection.execute(new String[]{"ALTER TABLE debezium.tableb ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS"});
        connection.execute(new String[]{"GRANT SELECT ON debezium.tablec to  " + TestHelper.getConnectorUserName()});
        connection.execute(new String[]{"ALTER TABLE debezium.tablec ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS"});
        initializeConnectorTestFramework();
        Testing.Files.delete(TestHelper.SCHEMA_HISTORY_PATH);
    }

    @After
    public void after() throws SQLException {
        if (connection != null) {
            TestHelper.dropTable(connection, "debezium.tablea");
            TestHelper.dropTable(connection, "debezium.tableb");
            TestHelper.dropTable(connection, "debezium.tablec");
            connection.close();
        }
    }

    @Test
    @FixFor({"DBZ-1904"})
    public void snapshotSchemaChanges() throws Exception {
        Configuration build = TestHelper.defaultConfig().with(OracleConnectorConfig.SNAPSHOT_MODE, OracleConnectorConfig.SnapshotMode.INITIAL).with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.TABLE[ABC]").with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, true).build();
        for (int i = 0; i < 5; i++) {
            int i2 = 10 + i;
            connection.execute(new String[]{"INSERT INTO debezium.tablea VALUES(" + i2 + ", 'a')"});
            connection.execute(new String[]{"INSERT INTO debezium.tableb VALUES(" + i2 + ", 'b')"});
        }
        start(OracleConnector.class, build);
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);
        Testing.Print.enable();
        List allRecordsInOrder = consumeRecordsByTopic(3).allRecordsInOrder();
        Assertions.assertThat(allRecordsInOrder).hasSize(3);
        allRecordsInOrder.forEach(sourceRecord -> {
            Assertions.assertThat(sourceRecord.topic()).isEqualTo(TestHelper.SERVER_NAME);
            Assertions.assertThat(((Struct) sourceRecord.key()).getString("databaseName")).isEqualTo(TestHelper.getDatabaseName());
            Assertions.assertThat(sourceRecord.sourceOffset().get("snapshot")).isEqualTo(SnapshotType.INITIAL.toString());
        });
        Assertions.assertThat(((Struct) ((SourceRecord) allRecordsInOrder.get(0)).value()).getStruct("source").getString("snapshot")).isEqualTo("true");
        Assertions.assertThat(((Struct) ((SourceRecord) allRecordsInOrder.get(1)).value()).getStruct("source").getString("snapshot")).isEqualTo("true");
        Assertions.assertThat(((Struct) ((SourceRecord) allRecordsInOrder.get(2)).value()).getStruct("source").getString("snapshot")).isEqualTo("true");
        Assertions.assertThat(((Struct) ((SourceRecord) allRecordsInOrder.get(0)).value()).getStruct("source").getString("schema")).isEqualTo("DEBEZIUM");
        Assertions.assertThat(((Struct) ((SourceRecord) allRecordsInOrder.get(0)).value()).getString("ddl")).contains(new CharSequence[]{"CREATE TABLE"});
        Assertions.assertThat(((Struct) ((SourceRecord) allRecordsInOrder.get(0)).value()).getString("schemaName")).isEqualTo("DEBEZIUM");
        List array = ((Struct) ((SourceRecord) allRecordsInOrder.get(0)).value()).getArray("tableChanges");
        Assertions.assertThat(array).hasSize(1);
        Assertions.assertThat(((Struct) array.get(0)).get("type")).isEqualTo("CREATE");
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(10);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.TABLEA")).hasSize(5);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.TABLEB")).hasSize(5);
        consumeRecordsByTopic.recordsForTopic("server1.DEBEZIUM.TABLEB").forEach(sourceRecord2 -> {
            assertSchemaMatchesStruct((Struct) ((Struct) sourceRecord2.value()).get("after"), SchemaBuilder.struct().optional().name("server1.DEBEZIUM.TABLEB.Value").field("ID", Schema.INT32_SCHEMA).field("COLB", Schema.OPTIONAL_STRING_SCHEMA).build());
        });
    }
}
