package io.debezium.connector.sqlserver;

import io.debezium.connector.sqlserver.SqlServerConnectorConfig;
import io.debezium.connector.sqlserver.util.TestHelper;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.embedded.async.AbstractAsyncEngineConnectorTest;
import io.debezium.relational.TableId;
import io.debezium.util.Testing;
import java.sql.SQLException;
import java.util.List;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:io/debezium/connector/sqlserver/SqlServerConnectorMultiPartitionModeIT.class */
public class SqlServerConnectorMultiPartitionModeIT extends AbstractAsyncEngineConnectorTest {
    private SqlServerConnection connection;

    @Before
    public void before() throws SQLException {
        TestHelper.createTestDatabases(TestHelper.TEST_DATABASE_1, TestHelper.TEST_DATABASE_2);
        this.connection = TestHelper.multiPartitionTestConnection();
        TableId tableId = new TableId(TestHelper.TEST_DATABASE_1, "dbo", "tableA");
        TableId tableId2 = new TableId(TestHelper.TEST_DATABASE_1, "dbo", "tableB");
        this.connection.execute(new String[]{"CREATE TABLE %s (id int primary key, colA varchar(32))".formatted(this.connection.quotedTableIdString(tableId)), "CREATE TABLE %s (id int primary key, colB varchar(32))".formatted(this.connection.quotedTableIdString(tableId2)), "INSERT INTO %s VALUES(1, 'a1')".formatted(this.connection.quotedTableIdString(tableId)), "INSERT INTO %s VALUES(2, 'b')".formatted(this.connection.quotedTableIdString(tableId2))});
        TestHelper.enableTableCdc(this.connection, tableId);
        TestHelper.enableTableCdc(this.connection, tableId2);
        TableId tableId3 = new TableId(TestHelper.TEST_DATABASE_2, "dbo", "tableA");
        TableId tableId4 = new TableId(TestHelper.TEST_DATABASE_2, "dbo", "tableC");
        this.connection.execute(new String[]{"CREATE TABLE %s (id int primary key, colA varchar(32))".formatted(this.connection.quotedTableIdString(tableId3)), "CREATE TABLE %s (id int primary key, colC varchar(32))".formatted(this.connection.quotedTableIdString(tableId4)), "INSERT INTO %s VALUES(3, 'a2')".formatted(this.connection.quotedTableIdString(tableId3)), "INSERT INTO %s VALUES(4, 'c')".formatted(this.connection.quotedTableIdString(tableId4))});
        TestHelper.enableTableCdc(this.connection, tableId3);
        TestHelper.enableTableCdc(this.connection, tableId4);
        initializeConnectorTestFramework();
        Testing.Files.delete(TestHelper.SCHEMA_HISTORY_PATH);
    }

    @After
    public void after() throws SQLException {
        if (this.connection != null) {
            this.connection.close();
        }
    }

    @Test
    public void snapshotAndStreaming() throws Exception {
        start(SqlServerConnector.class, TestHelper.defaultConfig(TestHelper.TEST_DATABASE_1, TestHelper.TEST_DATABASE_2).with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.INITIAL).build());
        assertConnectorIsRunning();
        TestHelper.waitForDatabaseSnapshotsToBeCompleted(TestHelper.TEST_DATABASE_1, TestHelper.TEST_DATABASE_2);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(4);
        List recordsForTopic = consumeRecordsByTopic.recordsForTopic(TestHelper.topicName(TestHelper.TEST_DATABASE_1, "tableA"));
        Assertions.assertThat(recordsForTopic).hasSize(1);
        assertValue((SourceRecord) recordsForTopic.get(0), "colA", "a1");
        List recordsForTopic2 = consumeRecordsByTopic.recordsForTopic(TestHelper.topicName(TestHelper.TEST_DATABASE_1, "tableB"));
        Assertions.assertThat(recordsForTopic2).hasSize(1);
        assertValue((SourceRecord) recordsForTopic2.get(0), "colB", "b");
        List recordsForTopic3 = consumeRecordsByTopic.recordsForTopic(TestHelper.topicName(TestHelper.TEST_DATABASE_2, "tableA"));
        Assertions.assertThat(recordsForTopic3).hasSize(1);
        assertValue((SourceRecord) recordsForTopic3.get(0), "colA", "a2");
        List recordsForTopic4 = consumeRecordsByTopic.recordsForTopic(TestHelper.topicName(TestHelper.TEST_DATABASE_2, "tableC"));
        Assertions.assertThat(recordsForTopic4).hasSize(1);
        assertValue((SourceRecord) recordsForTopic4.get(0), "colC", "c");
        this.connection.execute(new String[]{"USE testDB1", "INSERT INTO tableA VALUES(5, 'a1s')"});
        this.connection.execute(new String[]{"USE testDB2", "INSERT INTO tableA VALUES(6, 'a2s')"});
        TestHelper.waitForStreamingStarted();
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(2);
        List recordsForTopic5 = consumeRecordsByTopic2.recordsForTopic(TestHelper.topicName(TestHelper.TEST_DATABASE_1, "tableA"));
        Assertions.assertThat(recordsForTopic5).hasSize(1);
        assertValue((SourceRecord) recordsForTopic5.get(0), "colA", "a1s");
        List recordsForTopic6 = consumeRecordsByTopic2.recordsForTopic(TestHelper.topicName(TestHelper.TEST_DATABASE_2, "tableA"));
        Assertions.assertThat(recordsForTopic5).hasSize(1);
        assertValue((SourceRecord) recordsForTopic6.get(0), "colA", "a2s");
    }

    private void assertValue(SourceRecord sourceRecord, String str, Object obj) {
        Assertions.assertThat(((Struct) ((Struct) sourceRecord.value()).get("after")).get(str)).isEqualTo(obj);
    }
}
