package io.debezium.connector.db2;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.connector.SnapshotType;
import io.debezium.connector.db2.Db2ConnectorConfig;
import io.debezium.connector.db2.util.TestHelper;
import io.debezium.converters.CloudEventsConverterTest;
import io.debezium.data.SchemaAndValueField;
import io.debezium.data.VerifyRecord;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.embedded.async.AbstractAsyncEngineConnectorTest;
import io.debezium.junit.ConditionalFail;
import io.debezium.junit.Flaky;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.relational.RelationalDatabaseSchema;
import io.debezium.relational.history.MemorySchemaHistory;
import io.debezium.util.Testing;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import junit.framework.TestCase;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

/* loaded from: input_file:io/debezium/connector/db2/Db2ConnectorIT.class */
public class Db2ConnectorIT extends AbstractAsyncEngineConnectorTest {
    private Db2Connection connection;

    @Rule
    public ConditionalFail conditionalFail = new ConditionalFail();

    @Before
    public void before() throws SQLException {
        TestHelper.dropAllTables();
        this.connection = TestHelper.testConnection();
        this.connection.execute(new String[]{"DELETE FROM ASNCDC.IBMSNAP_REGISTER"});
        this.connection.execute(new String[]{"CREATE TABLE tablea (id int not null, cola varchar(30), primary key (id))", "CREATE TABLE tableb (id int not null, colb varchar(30), primary key (id))", "CREATE TABLE masked_hashed_column_table (id int not null, name varchar(255), name2 varchar(255), name3 varchar(20), primary key (id))", "CREATE TABLE truncated_column_table (id int not null, name varchar(20), primary key (id))", "CREATE TABLE dt_table (id int not null, c1 int, c2 int, c3a numeric(5,2), c3b varchar(128), f1 float(10), f2 decimal(8,4), primary key(id))", "INSERT INTO tablea VALUES(1, 'a')"});
        TestHelper.enableTableCdc(this.connection, "TABLEA");
        TestHelper.enableTableCdc(this.connection, "TABLEB");
        TestHelper.enableTableCdc(this.connection, "MASKED_HASHED_COLUMN_TABLE");
        TestHelper.enableTableCdc(this.connection, "TRUNCATED_COLUMN_TABLE");
        TestHelper.enableTableCdc(this.connection, "DT_TABLE");
        initializeConnectorTestFramework();
        Testing.Files.delete(TestHelper.DB_HISTORY_PATH);
        Testing.Print.enable();
    }

    @After
    public void after() throws SQLException {
        if (this.connection != null) {
            TestHelper.disableDbCdc(this.connection);
            TestHelper.disableTableCdc(this.connection, "DT_TABLE");
            TestHelper.disableTableCdc(this.connection, "TRUNCATED_COLUMN_TABLE");
            TestHelper.disableTableCdc(this.connection, "MASKED_HASHED_COLUMN_TABLE");
            TestHelper.disableTableCdc(this.connection, "TABLEB");
            TestHelper.disableTableCdc(this.connection, "TABLEA");
            this.connection.execute(new String[]{"DROP TABLE tablea", "DROP TABLE tableb", "DROP TABLE masked_hashed_column_table", "DROP TABLE truncated_column_table", "DROP TABLE dt_table"});
            this.connection.execute(new String[]{"DELETE FROM ASNCDC.IBMSNAP_REGISTER"});
            this.connection.execute(new String[]{"DELETE FROM ASNCDC.IBMQREP_COLVERSION"});
            this.connection.execute(new String[]{"DELETE FROM ASNCDC.IBMQREP_TABVERSION"});
            this.connection.close();
        }
    }

    @Test
    public void deleteWithoutTombstone() throws Exception {
        start(Db2Connector.class, TestHelper.defaultConfig().with(Db2ConnectorConfig.SNAPSHOT_MODE, Db2ConnectorConfig.SnapshotMode.INITIAL).with(Db2ConnectorConfig.TOMBSTONES_ON_DELETE, false).build());
        assertConnectorIsRunning();
        consumeRecordsByTopic(1);
        TestHelper.enableDbCdc(this.connection);
        this.connection.execute(new String[]{"UPDATE ASNCDC.IBMSNAP_REGISTER SET STATE = 'A' WHERE SOURCE_OWNER = 'DB2INST1'"});
        TestHelper.refreshAndWait(this.connection);
        for (int i = 0; i < 5; i++) {
            int i2 = 10 + i;
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + i2 + ", 'a')"});
            this.connection.execute(new String[]{"INSERT INTO tableb VALUES(" + i2 + ", 'b')"});
        }
        TestHelper.refreshAndWait(this.connection);
        consumeRecordsByTopic(10);
        this.connection.execute(new String[]{"DELETE FROM tableB"});
        TestHelper.refreshAndWait(this.connection);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(5);
        List recordsForTopic = consumeRecordsByTopic.recordsForTopic("testdb.DB2INST1.TABLEA");
        List recordsForTopic2 = consumeRecordsByTopic.recordsForTopic("testdb.DB2INST1.TABLEB");
        Assertions.assertThat(recordsForTopic).isNullOrEmpty();
        Assertions.assertThat(recordsForTopic2).hasSize(5);
        for (int i3 = 0; i3 < 5; i3++) {
            SourceRecord sourceRecord = (SourceRecord) recordsForTopic2.get(i3);
            List<SchemaAndValueField> asList = Arrays.asList(new SchemaAndValueField("ID", Schema.INT32_SCHEMA, Integer.valueOf(i3 + 10)), new SchemaAndValueField("COLB", Schema.OPTIONAL_STRING_SCHEMA, "b"));
            Struct struct = (Struct) sourceRecord.value();
            assertRecord((Struct) struct.get("before"), asList);
            Assert.assertNull(struct.get("after"));
        }
        stopConnector();
    }

    @Test
    public void updatePrimaryKey() throws Exception {
        start(Db2Connector.class, TestHelper.defaultConfig().with(Db2ConnectorConfig.SNAPSHOT_MODE, Db2ConnectorConfig.SnapshotMode.INITIAL).build());
        assertConnectorIsRunning();
        this.connection.execute(new String[]{"INSERT INTO tableb VALUES(1, 'b')"});
        consumeRecordsByTopic(2);
        TestHelper.enableDbCdc(this.connection);
        this.connection.execute(new String[]{"UPDATE ASNCDC.IBMSNAP_REGISTER SET STATE = 'A' WHERE SOURCE_OWNER = 'DB2INST1'"});
        TestHelper.refreshAndWait(this.connection);
        this.connection.setAutoCommit(false);
        this.connection.execute(new String[]{"UPDATE tablea SET id=100 WHERE id=1", "UPDATE tableb SET id=100 WHERE id=1"});
        TestHelper.refreshAndWait(this.connection);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(6);
        List recordsForTopic = consumeRecordsByTopic.recordsForTopic("testdb.DB2INST1.TABLEA");
        List recordsForTopic2 = consumeRecordsByTopic.recordsForTopic("testdb.DB2INST1.TABLEB");
        Assertions.assertThat(recordsForTopic).hasSize(3);
        Assertions.assertThat(recordsForTopic2).hasSize(3);
        List<SchemaAndValueField> asList = Arrays.asList(new SchemaAndValueField("ID", Schema.INT32_SCHEMA, 1), new SchemaAndValueField("COLA", Schema.OPTIONAL_STRING_SCHEMA, "a"));
        List<SchemaAndValueField> asList2 = Arrays.asList(new SchemaAndValueField("ID", Schema.INT32_SCHEMA, 1));
        List<SchemaAndValueField> asList3 = Arrays.asList(new SchemaAndValueField("ID", Schema.INT32_SCHEMA, 100), new SchemaAndValueField("COLA", Schema.OPTIONAL_STRING_SCHEMA, "a"));
        List<SchemaAndValueField> asList4 = Arrays.asList(new SchemaAndValueField("ID", Schema.INT32_SCHEMA, 100));
        SourceRecord sourceRecord = (SourceRecord) recordsForTopic.get(0);
        SourceRecord sourceRecord2 = (SourceRecord) recordsForTopic.get(1);
        SourceRecord sourceRecord3 = (SourceRecord) recordsForTopic.get(2);
        Struct struct = (Struct) sourceRecord.key();
        Struct struct2 = (Struct) sourceRecord.value();
        assertRecord(struct2.getStruct("before"), asList);
        assertRecord(struct, asList2);
        Assert.assertNull(struct2.get("after"));
        Struct struct3 = (Struct) sourceRecord2.key();
        Struct struct4 = (Struct) sourceRecord2.value();
        assertRecord(struct3, asList2);
        Assert.assertNull(struct4);
        Struct struct5 = (Struct) sourceRecord3.key();
        Struct struct6 = (Struct) sourceRecord3.value();
        assertRecord(struct6.getStruct("after"), asList3);
        assertRecord(struct5, asList4);
        Assert.assertNull(struct6.get("before"));
        List<SchemaAndValueField> asList5 = Arrays.asList(new SchemaAndValueField("ID", Schema.INT32_SCHEMA, 1), new SchemaAndValueField("COLB", Schema.OPTIONAL_STRING_SCHEMA, "b"));
        List<SchemaAndValueField> asList6 = Arrays.asList(new SchemaAndValueField("ID", Schema.INT32_SCHEMA, 1));
        List<SchemaAndValueField> asList7 = Arrays.asList(new SchemaAndValueField("ID", Schema.INT32_SCHEMA, 100), new SchemaAndValueField("COLB", Schema.OPTIONAL_STRING_SCHEMA, "b"));
        List<SchemaAndValueField> asList8 = Arrays.asList(new SchemaAndValueField("ID", Schema.INT32_SCHEMA, 100));
        SourceRecord sourceRecord4 = (SourceRecord) recordsForTopic2.get(0);
        SourceRecord sourceRecord5 = (SourceRecord) recordsForTopic2.get(1);
        SourceRecord sourceRecord6 = (SourceRecord) recordsForTopic2.get(2);
        Struct struct7 = (Struct) sourceRecord4.key();
        Struct struct8 = (Struct) sourceRecord4.value();
        assertRecord(struct8.getStruct("before"), asList5);
        assertRecord(struct7, asList6);
        Assert.assertNull(struct8.get("after"));
        Struct struct9 = (Struct) sourceRecord5.key();
        Struct struct10 = (Struct) sourceRecord5.value();
        assertRecord(struct9, asList6);
        Assert.assertNull(struct10);
        Struct struct11 = (Struct) sourceRecord6.key();
        Struct struct12 = (Struct) sourceRecord6.value();
        assertRecord(struct12.getStruct("after"), asList7);
        assertRecord(struct11, asList8);
        Assert.assertNull(struct12.get("before"));
        stopConnector();
    }

    @Test
    @FixFor({"DBZ-1152"})
    public void updatePrimaryKeyWithRestartInMiddle() throws Exception {
        Configuration build = TestHelper.defaultConfig().with(Db2ConnectorConfig.SNAPSHOT_MODE, Db2ConnectorConfig.SnapshotMode.INITIAL).build();
        start(Db2Connector.class, build);
        assertConnectorIsRunning();
        this.connection.execute(new String[]{"INSERT INTO tableb VALUES(1, 'b')"});
        TestHelper.enableDbCdc(this.connection);
        this.connection.execute(new String[]{"UPDATE ASNCDC.IBMSNAP_REGISTER SET STATE = 'A' WHERE SOURCE_OWNER = 'DB2INST1'"});
        TestHelper.refreshAndWait(this.connection);
        consumeRecordsByTopic(2);
        this.connection.setAutoCommit(false);
        this.connection.execute(new String[]{"UPDATE tablea SET id=100 WHERE id=1", "UPDATE tableb SET id=100 WHERE id=1"});
        TestHelper.refreshAndWait(this.connection);
        AbstractConnectorTest.SourceRecords consumeRecordsButSkipUntil = consumeRecordsButSkipUntil(2, (struct, struct2) -> {
            return (struct2 != null && struct2.getString("op").equals("c") && struct.getInt32("ID").intValue() == 1) ? false : true;
        });
        stopConnector();
        start(Db2Connector.class, build);
        assertConnectorIsRunning();
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(4);
        List recordsForTopic = consumeRecordsButSkipUntil.recordsForTopic("testdb.DB2INST1.TABLEA");
        recordsForTopic.addAll(consumeRecordsByTopic.recordsForTopic("testdb.DB2INST1.TABLEA"));
        List recordsForTopic2 = consumeRecordsByTopic.recordsForTopic("testdb.DB2INST1.TABLEB");
        Assertions.assertThat(recordsForTopic).hasSize(3);
        Assertions.assertThat(recordsForTopic2).hasSize(3);
        List<SchemaAndValueField> asList = Arrays.asList(new SchemaAndValueField("ID", Schema.INT32_SCHEMA, 1), new SchemaAndValueField("COLA", Schema.OPTIONAL_STRING_SCHEMA, "a"));
        List<SchemaAndValueField> asList2 = Arrays.asList(new SchemaAndValueField("ID", Schema.INT32_SCHEMA, 1));
        List<SchemaAndValueField> asList3 = Arrays.asList(new SchemaAndValueField("ID", Schema.INT32_SCHEMA, 100), new SchemaAndValueField("COLA", Schema.OPTIONAL_STRING_SCHEMA, "a"));
        List<SchemaAndValueField> asList4 = Arrays.asList(new SchemaAndValueField("ID", Schema.INT32_SCHEMA, 100));
        SourceRecord sourceRecord = (SourceRecord) recordsForTopic.get(0);
        SourceRecord sourceRecord2 = (SourceRecord) recordsForTopic.get(1);
        SourceRecord sourceRecord3 = (SourceRecord) recordsForTopic.get(2);
        Struct struct3 = (Struct) sourceRecord.key();
        Struct struct4 = (Struct) sourceRecord.value();
        assertRecord(struct4.getStruct("before"), asList);
        assertRecord(struct3, asList2);
        Assert.assertNull(struct4.get("after"));
        Struct struct5 = (Struct) sourceRecord2.key();
        Struct struct6 = (Struct) sourceRecord2.value();
        assertRecord(struct5, asList2);
        Assert.assertNull(struct6);
        Struct struct7 = (Struct) sourceRecord3.key();
        Struct struct8 = (Struct) sourceRecord3.value();
        assertRecord(struct8.getStruct("after"), asList3);
        assertRecord(struct7, asList4);
        Assert.assertNull(struct8.get("before"));
        List<SchemaAndValueField> asList5 = Arrays.asList(new SchemaAndValueField("ID", Schema.INT32_SCHEMA, 1), new SchemaAndValueField("COLB", Schema.OPTIONAL_STRING_SCHEMA, "b"));
        List<SchemaAndValueField> asList6 = Arrays.asList(new SchemaAndValueField("ID", Schema.INT32_SCHEMA, 1));
        List<SchemaAndValueField> asList7 = Arrays.asList(new SchemaAndValueField("ID", Schema.INT32_SCHEMA, 100), new SchemaAndValueField("COLB", Schema.OPTIONAL_STRING_SCHEMA, "b"));
        List<SchemaAndValueField> asList8 = Arrays.asList(new SchemaAndValueField("ID", Schema.INT32_SCHEMA, 100));
        SourceRecord sourceRecord4 = (SourceRecord) recordsForTopic2.get(0);
        SourceRecord sourceRecord5 = (SourceRecord) recordsForTopic2.get(1);
        SourceRecord sourceRecord6 = (SourceRecord) recordsForTopic2.get(2);
        Struct struct9 = (Struct) sourceRecord4.key();
        Struct struct10 = (Struct) sourceRecord4.value();
        assertRecord(struct10.getStruct("before"), asList5);
        assertRecord(struct9, asList6);
        Assert.assertNull(struct10.get("after"));
        Struct struct11 = (Struct) sourceRecord5.key();
        Struct struct12 = (Struct) sourceRecord5.value();
        assertRecord(struct11, asList6);
        Assert.assertNull(struct12);
        Struct struct13 = (Struct) sourceRecord6.key();
        Struct struct14 = (Struct) sourceRecord6.value();
        assertRecord(struct14.getStruct("after"), asList7);
        assertRecord(struct13, asList8);
        Assert.assertNull(struct14.get("before"));
        stopConnector();
    }

    @Test
    @FixFor({"DBZ-1069"})
    public void verifyOffsets() throws Exception {
        Configuration build = TestHelper.defaultConfig().with(Db2ConnectorConfig.SNAPSHOT_MODE, Db2ConnectorConfig.SnapshotMode.INITIAL).build();
        TestHelper.enableDbCdc(this.connection);
        this.connection.execute(new String[]{"UPDATE ASNCDC.IBMSNAP_REGISTER SET STATE = 'A' WHERE SOURCE_OWNER = 'DB2INST1'"});
        TestHelper.refreshAndWait(this.connection);
        for (int i = 0; i < 5; i++) {
            int i2 = 10 + i;
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + i2 + ", 'a')"});
            this.connection.execute(new String[]{"INSERT INTO tableb VALUES(" + i2 + ", 'b')"});
        }
        TestHelper.refreshAndWait(this.connection);
        int i3 = 0;
        while (!this.connection.getMaxLsn().isAvailable()) {
            if (i3 == 30) {
                Assert.fail("Initial changes not written to CDC structures");
            }
            Testing.debug("Waiting for initial changes to be propagated to CDC structures");
            Thread.sleep(1000L);
            i3++;
        }
        start(Db2Connector.class, build);
        assertConnectorIsRunning();
        TestHelper.refreshAndWait(this.connection);
        List allRecordsInOrder = consumeRecordsByTopic(11).allRecordsInOrder();
        Iterator it = allRecordsInOrder.subList(1, allRecordsInOrder.size()).iterator();
        while (it.hasNext()) {
            SourceRecord sourceRecord = (SourceRecord) it.next();
            Assertions.assertThat(sourceRecord.sourceOffset().get("snapshot")).as("Snapshot phase", new Object[0]).isEqualTo(SnapshotType.INITIAL.toString());
            if (it.hasNext()) {
                Assertions.assertThat(sourceRecord.sourceOffset().get("snapshot_completed")).as("Snapshot in progress", new Object[0]).isEqualTo(false);
            } else {
                Assertions.assertThat(sourceRecord.sourceOffset().get("snapshot_completed")).as("Snapshot completed", new Object[0]).isEqualTo(true);
            }
        }
        stopConnector();
        for (int i4 = 0; i4 < 5; i4++) {
            int i5 = 100 + i4;
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + i5 + ", 'a')"});
            this.connection.execute(new String[]{"INSERT INTO tableb VALUES(" + i5 + ", 'b')"});
        }
        start(Db2Connector.class, build);
        assertConnectorIsRunning();
        TestHelper.refreshAndWait(this.connection);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(10);
        List recordsForTopic = consumeRecordsByTopic.recordsForTopic("testdb.DB2INST1.TABLEA");
        List recordsForTopic2 = consumeRecordsByTopic.recordsForTopic("testdb.DB2INST1.TABLEB");
        Assertions.assertThat(recordsForTopic).hasSize(5);
        Assertions.assertThat(recordsForTopic2).hasSize(5);
        for (int i6 = 0; i6 < 5; i6++) {
            int i7 = i6 + 100;
            SourceRecord sourceRecord2 = (SourceRecord) recordsForTopic.get(i6);
            SourceRecord sourceRecord3 = (SourceRecord) recordsForTopic2.get(i6);
            List<SchemaAndValueField> asList = Arrays.asList(new SchemaAndValueField("ID", Schema.INT32_SCHEMA, Integer.valueOf(i7)), new SchemaAndValueField("COLA", Schema.OPTIONAL_STRING_SCHEMA, "a"));
            List<SchemaAndValueField> asList2 = Arrays.asList(new SchemaAndValueField("ID", Schema.INT32_SCHEMA, Integer.valueOf(i7)), new SchemaAndValueField("COLB", Schema.OPTIONAL_STRING_SCHEMA, "b"));
            Struct struct = (Struct) sourceRecord2.value();
            assertRecord((Struct) struct.get("after"), asList);
            Assert.assertNull(struct.get("before"));
            Struct struct2 = (Struct) sourceRecord3.value();
            assertRecord((Struct) struct2.get("after"), asList2);
            Assert.assertNull(struct2.get("before"));
            Assertions.assertThat(sourceRecord2.sourceOffset().get("snapshot")).as("Streaming phase", new Object[0]).isNull();
            Assertions.assertThat(sourceRecord2.sourceOffset().get("snapshot_completed")).as("Streaming phase", new Object[0]).isNull();
            Assertions.assertThat(sourceRecord2.sourceOffset().get("change_lsn")).as("LSN present", new Object[0]).isNotNull();
            Assertions.assertThat(sourceRecord3.sourceOffset().get("snapshot")).as("Streaming phase", new Object[0]).isNull();
            Assertions.assertThat(sourceRecord3.sourceOffset().get("snapshot_completed")).as("Streaming phase", new Object[0]).isNull();
            Assertions.assertThat(sourceRecord3.sourceOffset().get("change_lsn")).as("LSN present", new Object[0]).isNotNull();
        }
    }

    @Test
    public void testTableIncludeList() throws Exception {
        Configuration build = TestHelper.defaultConfig().with(Db2ConnectorConfig.SNAPSHOT_MODE, Db2ConnectorConfig.SnapshotMode.NO_DATA).with(Db2ConnectorConfig.TABLE_INCLUDE_LIST, "db2inst1.tableb").build();
        this.connection.execute(new String[]{"INSERT INTO tableb VALUES(1, 'b')"});
        start(Db2Connector.class, build);
        assertConnectorIsRunning();
        consumeRecordsByTopic(1);
        TestHelper.enableDbCdc(this.connection);
        this.connection.execute(new String[]{"UPDATE ASNCDC.IBMSNAP_REGISTER SET STATE = 'A' WHERE SOURCE_OWNER = 'DB2INST1'"});
        TestHelper.refreshAndWait(this.connection);
        for (int i = 0; i < 5; i++) {
            int i2 = 10 + i;
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + i2 + ", 'a')"});
            this.connection.execute(new String[]{"INSERT INTO tableb VALUES(" + i2 + ", 'b')"});
        }
        TestHelper.refreshAndWait(this.connection);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(5);
        List recordsForTopic = consumeRecordsByTopic.recordsForTopic("testdb.DB2INST1.TABLEA");
        List recordsForTopic2 = consumeRecordsByTopic.recordsForTopic("testdb.DB2INST1.TABLEB");
        Assertions.assertThat(recordsForTopic == null || recordsForTopic.isEmpty()).isTrue();
        Assertions.assertThat(recordsForTopic2).hasSize(5);
        stopConnector();
    }

    @Test
    public void testTableExcludeList() throws Exception {
        Configuration build = TestHelper.defaultConfig().with(Db2ConnectorConfig.SNAPSHOT_MODE, Db2ConnectorConfig.SnapshotMode.INITIAL).with(Db2ConnectorConfig.TABLE_EXCLUDE_LIST, "db2inst1.tablea").build();
        this.connection.execute(new String[]{"INSERT INTO tableb VALUES(1, 'b')"});
        start(Db2Connector.class, build);
        assertConnectorIsRunning();
        consumeRecordsByTopic(1);
        TestHelper.enableDbCdc(this.connection);
        this.connection.execute(new String[]{"UPDATE ASNCDC.IBMSNAP_REGISTER SET STATE = 'A' WHERE SOURCE_OWNER = 'DB2INST1'"});
        TestHelper.refreshAndWait(this.connection);
        for (int i = 0; i < 5; i++) {
            int i2 = 10 + i;
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + i2 + ", 'a')"});
            this.connection.execute(new String[]{"INSERT INTO tableb VALUES(" + i2 + ", 'b')"});
        }
        TestHelper.refreshAndWait(this.connection);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(5);
        List recordsForTopic = consumeRecordsByTopic.recordsForTopic("testdb.DB2INST1.TABLEA");
        List recordsForTopic2 = consumeRecordsByTopic.recordsForTopic("testdb.DB2INST1.TABLEB");
        Assertions.assertThat(recordsForTopic == null || recordsForTopic.isEmpty()).isTrue();
        Assertions.assertThat(recordsForTopic2).hasSize(5);
        stopConnector();
    }

    private void restartInTheMiddleOfTx(boolean z, boolean z2) throws Exception {
        Configuration build = TestHelper.defaultConfig().with(Db2ConnectorConfig.SNAPSHOT_MODE, Db2ConnectorConfig.SnapshotMode.INITIAL).build();
        if (z) {
            start(Db2Connector.class, build);
            assertConnectorIsRunning();
            consumeRecordsByTopic(1);
            stopConnector();
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(-1, '-a')"});
            TestHelper.refreshAndWait(this.connection);
        }
        start(Db2Connector.class, build, sourceRecord -> {
            if (!"testdb.DB2INST1.TABLEA.Envelope".equals(sourceRecord.valueSchema().name())) {
                return false;
            }
            Struct struct = ((Struct) sourceRecord.value()).getStruct("after");
            Integer int32 = struct.getInt32("ID");
            return int32 != null && int32.intValue() == 25 && "a".equals(struct.getString("COLA"));
        });
        assertConnectorIsRunning();
        consumeRecordsByTopic(1);
        TestHelper.enableDbCdc(this.connection);
        this.connection.execute(new String[]{"UPDATE ASNCDC.IBMSNAP_REGISTER SET STATE = 'A' WHERE SOURCE_OWNER = 'DB2INST1'"});
        TestHelper.refreshAndWait(this.connection);
        if (z2) {
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(-2, '-a')"});
            TestHelper.refreshAndWait(this.connection);
            assertRecord(((Struct) ((SourceRecord) consumeRecordsByTopic(1).allRecordsInOrder().get(0)).value()).getStruct("after"), Arrays.asList(new SchemaAndValueField("ID", Schema.INT32_SCHEMA, -2), new SchemaAndValueField("COLA", Schema.OPTIONAL_STRING_SCHEMA, "-a")));
        }
        this.connection.setAutoCommit(false);
        for (int i = 0; i < 30; i++) {
            int i2 = 10 + i;
            this.connection.executeWithoutCommitting(new String[]{"INSERT INTO tablea VALUES(" + i2 + ", 'a')"});
            this.connection.executeWithoutCommitting(new String[]{"INSERT INTO tableb VALUES(" + i2 + ", 'b')"});
        }
        this.connection.connection().commit();
        TestHelper.refreshAndWait(this.connection);
        List allRecordsInOrder = consumeRecordsByTopic(30).allRecordsInOrder();
        Assertions.assertThat(allRecordsInOrder).hasSize(30);
        assertRecord((Struct) ((Struct) ((SourceRecord) allRecordsInOrder.get(29)).value()).get("after"), Arrays.asList(new SchemaAndValueField("ID", Schema.INT32_SCHEMA, 24), new SchemaAndValueField("COLB", Schema.OPTIONAL_STRING_SCHEMA, "b")));
        stopConnector();
        start(Db2Connector.class, build);
        assertConnectorIsRunning();
        TestHelper.refreshAndWait(this.connection);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(30);
        Assertions.assertThat(consumeRecordsByTopic.allRecordsInOrder()).hasSize(30);
        List recordsForTopic = consumeRecordsByTopic.recordsForTopic("testdb.DB2INST1.TABLEA");
        List recordsForTopic2 = consumeRecordsByTopic.recordsForTopic("testdb.DB2INST1.TABLEB");
        for (int i3 = 0; i3 < 15; i3++) {
            int i4 = 25 + i3;
            SourceRecord sourceRecord2 = (SourceRecord) recordsForTopic.get(i3);
            SourceRecord sourceRecord3 = (SourceRecord) recordsForTopic2.get(i3);
            List<SchemaAndValueField> asList = Arrays.asList(new SchemaAndValueField("ID", Schema.INT32_SCHEMA, Integer.valueOf(i4)), new SchemaAndValueField("COLA", Schema.OPTIONAL_STRING_SCHEMA, "a"));
            List<SchemaAndValueField> asList2 = Arrays.asList(new SchemaAndValueField("ID", Schema.INT32_SCHEMA, Integer.valueOf(i4)), new SchemaAndValueField("COLB", Schema.OPTIONAL_STRING_SCHEMA, "b"));
            Struct struct = (Struct) sourceRecord2.value();
            assertRecord((Struct) struct.get("after"), asList);
            Assert.assertNull(struct.get("before"));
            Struct struct2 = (Struct) sourceRecord3.value();
            assertRecord((Struct) struct2.get("after"), asList2);
            Assert.assertNull(struct2.get("before"));
        }
        for (int i5 = 0; i5 < 30; i5++) {
            int i6 = 1000 + i5;
            this.connection.executeWithoutCommitting(new String[]{"INSERT INTO tablea VALUES(" + i6 + ", 'a')"});
            this.connection.executeWithoutCommitting(new String[]{"INSERT INTO tableb VALUES(" + i6 + ", 'b')"});
            this.connection.connection().commit();
        }
        TestHelper.refreshAndWait(this.connection);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(60);
        List recordsForTopic3 = consumeRecordsByTopic2.recordsForTopic("testdb.DB2INST1.TABLEA");
        List recordsForTopic4 = consumeRecordsByTopic2.recordsForTopic("testdb.DB2INST1.TABLEB");
        Assertions.assertThat(recordsForTopic3).hasSize(30);
        Assertions.assertThat(recordsForTopic4).hasSize(30);
        for (int i7 = 0; i7 < 30; i7++) {
            int i8 = i7 + 1000;
            SourceRecord sourceRecord4 = (SourceRecord) recordsForTopic3.get(i7);
            SourceRecord sourceRecord5 = (SourceRecord) recordsForTopic4.get(i7);
            List<SchemaAndValueField> asList3 = Arrays.asList(new SchemaAndValueField("ID", Schema.INT32_SCHEMA, Integer.valueOf(i8)), new SchemaAndValueField("COLA", Schema.OPTIONAL_STRING_SCHEMA, "a"));
            List<SchemaAndValueField> asList4 = Arrays.asList(new SchemaAndValueField("ID", Schema.INT32_SCHEMA, Integer.valueOf(i8)), new SchemaAndValueField("COLB", Schema.OPTIONAL_STRING_SCHEMA, "b"));
            Struct struct3 = (Struct) sourceRecord4.value();
            assertRecord((Struct) struct3.get("after"), asList3);
            Assert.assertNull(struct3.get("before"));
            Struct struct4 = (Struct) sourceRecord5.value();
            assertRecord((Struct) struct4.get("after"), asList4);
            Assert.assertNull(struct4.get("before"));
        }
    }

    @Test
    @Flaky("DBZ-6849")
    @FixFor({"DBZ-1128"})
    public void restartInTheMiddleOfTxAfterSnapshot() throws Exception {
        restartInTheMiddleOfTx(true, false);
    }

    @Test
    @Flaky("DBZ-6849")
    @FixFor({"DBZ-1128"})
    public void restartInTheMiddleOfTxAfterCompletedTx() throws Exception {
        restartInTheMiddleOfTx(false, true);
    }

    @Test
    @Flaky("DBZ-6849")
    public void restartInTheMiddleOfTx() throws Exception {
        restartInTheMiddleOfTx(false, false);
    }

    @Test
    @FixFor({"DBZ-1242"})
    public void testEmptySchemaWarningAfterApplyingFilters() throws Exception {
        LogInterceptor logInterceptor = new LogInterceptor(RelationalDatabaseSchema.class);
        start(Db2Connector.class, TestHelper.defaultConfig().with(Db2ConnectorConfig.SNAPSHOT_MODE, Db2ConnectorConfig.SnapshotMode.INITIAL).with(Db2ConnectorConfig.TABLE_INCLUDE_LIST, "my_products").build());
        assertConnectorIsRunning();
        waitForAvailableRecords(100L, TimeUnit.MILLISECONDS);
        stopConnector(z -> {
            Assertions.assertThat(logInterceptor.containsWarnMessage("After applying the include/exclude list filters, no changes will be captured. Please check your configuration!")).isTrue();
        });
    }

    @Test
    @FixFor({"DBZ-775"})
    public void shouldConsumeEventsWithMaskedAndTruncatedColumns() throws Exception {
        start(Db2Connector.class, TestHelper.defaultConfig().with(Db2ConnectorConfig.SNAPSHOT_MODE, Db2ConnectorConfig.SnapshotMode.NO_DATA).with("column.mask.with.12.chars", "DB2INST1.MASKED_HASHED_COLUMN_TABLE.NAME").with("column.mask.hash.SHA-256.with.salt.CzQMA0cB5K", "DB2INST1.MASKED_HASHED_COLUMN_TABLE.NAME2,DB2INST1.MASKED_HASHED_COLUMN_TABLE.NAME3").with("column.truncate.to.4.chars", "DB2INST1.TRUNCATED_COLUMN_TABLE.NAME").build());
        assertConnectorIsRunning();
        consumeRecordsByTopic(1);
        TestHelper.enableDbCdc(this.connection);
        this.connection.execute(new String[]{"UPDATE ASNCDC.IBMSNAP_REGISTER SET STATE = 'A' WHERE SOURCE_OWNER = 'DB2INST1'"});
        TestHelper.refreshAndWait(this.connection);
        this.connection.setAutoCommit(false);
        this.connection.execute(new String[]{"INSERT INTO masked_hashed_column_table (id, name, name2, name3) VALUES (10, 'some_name', 'test', 'test')"});
        this.connection.execute(new String[]{"INSERT INTO truncated_column_table VALUES(11, 'some_name')"});
        TestHelper.refreshAndWait(this.connection);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(2);
        List recordsForTopic = consumeRecordsByTopic.recordsForTopic("testdb.DB2INST1.MASKED_HASHED_COLUMN_TABLE");
        List recordsForTopic2 = consumeRecordsByTopic.recordsForTopic("testdb.DB2INST1.TRUNCATED_COLUMN_TABLE");
        Assertions.assertThat(recordsForTopic).hasSize(1);
        SourceRecord sourceRecord = (SourceRecord) recordsForTopic.get(0);
        VerifyRecord.isValidInsert(sourceRecord, "ID", 10);
        Struct struct = (Struct) sourceRecord.value();
        if (struct.getStruct("after") != null) {
            Struct struct2 = struct.getStruct("after");
            Assertions.assertThat(struct2.getString("NAME")).isEqualTo("************");
            Assertions.assertThat(struct2.getString("NAME2")).isEqualTo("8e68c68edbbac316dfe2f6ada6b0d2d3e2002b487a985d4b7c7c82dd83b0f4d7");
            Assertions.assertThat(struct2.getString("NAME3")).isEqualTo("8e68c68edbbac316dfe2");
        }
        Assertions.assertThat(recordsForTopic2).hasSize(1);
        SourceRecord sourceRecord2 = (SourceRecord) recordsForTopic2.get(0);
        VerifyRecord.isValidInsert(sourceRecord2, "ID", 11);
        Struct struct3 = (Struct) sourceRecord2.value();
        if (struct3.getStruct("after") != null) {
            Assertions.assertThat(struct3.getStruct("after").getString("NAME")).isEqualTo("some");
        }
        stopConnector();
    }

    @Test
    @FixFor({"DBZ-775"})
    public void shouldRewriteIdentityKey() throws Exception {
        start(Db2Connector.class, TestHelper.defaultConfig().with(Db2ConnectorConfig.SNAPSHOT_MODE, Db2ConnectorConfig.SnapshotMode.NO_DATA).with(Db2ConnectorConfig.MSG_KEY_COLUMNS, "(.*).tablea:id,cola").build());
        consumeRecordsByTopic(1);
        TestHelper.enableDbCdc(this.connection);
        this.connection.execute(new String[]{"UPDATE ASNCDC.IBMSNAP_REGISTER SET STATE = 'A' WHERE SOURCE_OWNER = 'DB2INST1'"});
        TestHelper.refreshAndWait(this.connection);
        this.connection.setAutoCommit(false);
        this.connection.execute(new String[]{"INSERT INTO tablea (id, cola) values (100, 'hundred')"});
        List recordsForTopic = consumeRecordsByTopic(1).recordsForTopic("testdb.DB2INST1.TABLEA");
        Assertions.assertThat(((SourceRecord) recordsForTopic.get(0)).key()).isNotNull();
        Struct struct = (Struct) ((SourceRecord) recordsForTopic.get(0)).key();
        Assertions.assertThat(struct.get("ID")).isNotNull();
        Assertions.assertThat(struct.get("COLA")).isNotNull();
        stopConnector();
    }

    @Test
    @FixFor({"DBZ-1916", "DBZ-1830"})
    public void shouldPropagateSourceTypeByDatatype() throws Exception {
        start(Db2Connector.class, TestHelper.defaultConfig().with(Db2ConnectorConfig.SNAPSHOT_MODE, Db2ConnectorConfig.SnapshotMode.NO_DATA).with("datatype.propagate.source.type", ".+\\.NUMERIC,.+\\.VARCHAR,.+\\.DECIMAL,.+\\.REAL").build());
        consumeRecordsByTopic(1);
        consumeRecordsByTopic(1);
        TestHelper.enableDbCdc(this.connection);
        this.connection.execute(new String[]{"UPDATE ASNCDC.IBMSNAP_REGISTER SET STATE = 'A' WHERE SOURCE_OWNER = 'DB2INST1'"});
        TestHelper.refreshAndWait(this.connection);
        this.connection.setAutoCommit(false);
        this.connection.execute(new String[]{"INSERT INTO dt_table (id,c1,c2,c3a,c3b,f1,f2) values (1,123,456,789.01,'test',1.228,234.56)"});
        List recordsForTopic = consumeRecordsByTopic(1).recordsForTopic("testdb.DB2INST1.DT_TABLE");
        Assertions.assertThat(recordsForTopic).hasSize(1);
        Field field = ((SourceRecord) recordsForTopic.get(0)).valueSchema().field("before");
        Assertions.assertThat(field.schema().field("ID").schema().parameters()).isNull();
        Assertions.assertThat(field.schema().field("C1").schema().parameters()).isNull();
        Assertions.assertThat(field.schema().field("C2").schema().parameters()).isNull();
        Assertions.assertThat(field.schema().field("C3A").schema().parameters()).contains(new Map.Entry[]{Assertions.entry(TestHelper.TYPE_NAME_PARAMETER_KEY, "DECIMAL"), Assertions.entry(TestHelper.TYPE_LENGTH_PARAMETER_KEY, "5"), Assertions.entry(TestHelper.TYPE_SCALE_PARAMETER_KEY, "2")});
        Assertions.assertThat(field.schema().field("C3B").schema().parameters()).contains(new Map.Entry[]{Assertions.entry(TestHelper.TYPE_NAME_PARAMETER_KEY, "VARCHAR"), Assertions.entry(TestHelper.TYPE_LENGTH_PARAMETER_KEY, "128")});
        Assertions.assertThat(field.schema().field("F2").schema().parameters()).contains(new Map.Entry[]{Assertions.entry(TestHelper.TYPE_NAME_PARAMETER_KEY, "DECIMAL"), Assertions.entry(TestHelper.TYPE_LENGTH_PARAMETER_KEY, "8"), Assertions.entry(TestHelper.TYPE_SCALE_PARAMETER_KEY, "4")});
        Assertions.assertThat(field.schema().field("F1").schema().parameters()).contains(new Map.Entry[]{Assertions.entry(TestHelper.TYPE_NAME_PARAMETER_KEY, "REAL"), Assertions.entry(TestHelper.TYPE_LENGTH_PARAMETER_KEY, "24")});
    }

    @Test
    @FixFor({"DBZ-3668"})
    public void shouldOutputRecordsInCloudEventsFormat() throws Exception {
        Configuration build = TestHelper.defaultConfig().with(Db2ConnectorConfig.TABLE_INCLUDE_LIST, "db2inst1.tablea").build();
        this.connection.execute(new String[]{"INSERT INTO tablea (id,cola) values (1001, 'DBZ3668')"});
        start(Db2Connector.class, build);
        assertConnectorIsRunning();
        List<SourceRecord> recordsForTopic = consumeRecordsByTopic(1).recordsForTopic("testdb.DB2INST1.TABLEA");
        Assertions.assertThat(recordsForTopic).hasSize(1);
        for (SourceRecord sourceRecord : recordsForTopic) {
            CloudEventsConverterTest.shouldConvertToCloudEventsInJson(sourceRecord, false);
            CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro(sourceRecord, false);
            CloudEventsConverterTest.shouldConvertToCloudEventsInAvro(sourceRecord, "db2", TestHelper.TEST_DATABASE, false);
        }
        this.connection.execute(new String[]{"INSERT INTO tablea (id,cola) values (1002, 'DBZ3668')"});
        List<SourceRecord> recordsForTopic2 = consumeRecordsByTopic(1).recordsForTopic("testdb.DB2INST1.TABLEA");
        Assertions.assertThat(recordsForTopic2).hasSize(1);
        for (SourceRecord sourceRecord2 : recordsForTopic2) {
            CloudEventsConverterTest.shouldConvertToCloudEventsInJson(sourceRecord2, false, jsonNode -> {
                Assertions.assertThat(jsonNode.get("id").asText()).contains(new CharSequence[]{"commit_lsn:"});
            });
            CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro(sourceRecord2, false);
            CloudEventsConverterTest.shouldConvertToCloudEventsInAvro(sourceRecord2, "db2", TestHelper.TEST_DATABASE, false);
        }
    }

    @Test
    public void shouldNotUseOffsetWhenSnapshotIsAlways() throws Exception {
        try {
            try {
                Configuration build = TestHelper.defaultConfig().with(Db2ConnectorConfig.SNAPSHOT_MODE, Db2ConnectorConfig.SnapshotMode.ALWAYS).with(Db2ConnectorConfig.TABLE_INCLUDE_LIST, "DB2INST1.ALWAYS_SNAPSHOT").with(Db2ConnectorConfig.SNAPSHOT_MODE_TABLES, "DB2INST1.ALWAYS_SNAPSHOT").with(Db2ConnectorConfig.STORE_ONLY_CAPTURED_TABLES_DDL, true).with(Db2ConnectorConfig.INCLUDE_SCHEMA_CHANGES, false).build();
                this.connection.execute(new String[]{"CREATE TABLE always_snapshot ( id INT PRIMARY KEY NOT NULL, data VARCHAR(50) NOT NULL);"});
                this.connection.execute(new String[]{"INSERT INTO always_snapshot VALUES (1,'Test1');"});
                this.connection.execute(new String[]{"INSERT INTO always_snapshot VALUES (2,'Test2');"});
                TestHelper.enableTableCdc(this.connection, "ALWAYS_SNAPSHOT");
                start(Db2Connector.class, build);
                TestHelper.waitForCDC();
                AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(2);
                Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("testdb.DB2INST1.ALWAYS_SNAPSHOT")).hasSize(2);
                Struct struct = (Struct) ((Struct) ((SourceRecord) consumeRecordsByTopic.allRecordsInOrder().get(0)).value()).get("after");
                TestCase.assertEquals(1, struct.get("id"));
                TestCase.assertEquals("Test1", struct.get("data"));
                Struct struct2 = (Struct) ((Struct) ((SourceRecord) consumeRecordsByTopic.allRecordsInOrder().get(1)).value()).get("after");
                TestCase.assertEquals(2, struct2.get("id"));
                TestCase.assertEquals("Test2", struct2.get("data"));
                stopConnector();
                this.connection.execute(new String[]{"DELETE FROM ALWAYS_SNAPSHOT WHERE id=1;"});
                this.connection.execute(new String[]{"INSERT INTO ALWAYS_SNAPSHOT VALUES (3,'Test3');"});
                start(Db2Connector.class, build);
                TestHelper.waitForCDC();
                AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(2);
                Assertions.assertThat(consumeRecordsByTopic2.recordsForTopic("testdb.DB2INST1.ALWAYS_SNAPSHOT")).hasSize(2);
                Struct struct3 = (Struct) ((Struct) ((SourceRecord) consumeRecordsByTopic2.allRecordsInOrder().get(0)).value()).get("after");
                TestCase.assertEquals(2, struct3.get("id"));
                TestCase.assertEquals("Test2", struct3.get("data"));
                Struct struct4 = (Struct) ((Struct) ((SourceRecord) consumeRecordsByTopic2.allRecordsInOrder().get(1)).value()).get("after");
                TestCase.assertEquals(3, struct4.get("id"));
                TestCase.assertEquals("Test3", struct4.get("data"));
                TestHelper.disableTableCdc(this.connection, "ALWAYS_SNAPSHOT");
                this.connection.execute(new String[]{"DROP TABLE ALWAYS_SNAPSHOT"});
            } catch (Exception e) {
                e.printStackTrace();
                TestHelper.disableTableCdc(this.connection, "ALWAYS_SNAPSHOT");
                this.connection.execute(new String[]{"DROP TABLE ALWAYS_SNAPSHOT"});
            }
        } catch (Throwable th) {
            TestHelper.disableTableCdc(this.connection, "ALWAYS_SNAPSHOT");
            this.connection.execute(new String[]{"DROP TABLE ALWAYS_SNAPSHOT"});
            throw th;
        }
    }

    @Test
    public void shouldCreateSnapshotSchemaOnlyRecovery() throws Exception {
        Configuration.Builder with = TestHelper.defaultConfig().with(Db2ConnectorConfig.SNAPSHOT_MODE, Db2ConnectorConfig.SnapshotMode.INITIAL).with(Db2ConnectorConfig.TABLE_INCLUDE_LIST, "DB2INST1.TABLEA").with(Db2ConnectorConfig.SCHEMA_HISTORY, MemorySchemaHistory.class.getName());
        start(Db2Connector.class, with.build());
        TestHelper.waitForSnapshotToBeCompleted();
        Assertions.assertThat(consumeRecordsByTopic(1).allRecordsInOrder()).hasSize(1);
        stopConnector();
        with.with(Db2ConnectorConfig.SNAPSHOT_MODE, Db2ConnectorConfig.SnapshotMode.RECOVERY);
        start(Db2Connector.class, with.build());
        TestHelper.waitForSnapshotToBeCompleted();
        TestHelper.enableDbCdc(this.connection);
        this.connection.execute(new String[]{"UPDATE ASNCDC.IBMSNAP_REGISTER SET STATE = 'A' WHERE SOURCE_OWNER = 'DB2INST1'"});
        TestHelper.refreshAndWait(this.connection);
        this.connection.execute(new String[]{"INSERT INTO tablea VALUES (100,'100')"});
        this.connection.execute(new String[]{"INSERT INTO tablea VALUES (200,'200')"});
        TestHelper.refreshAndWait(this.connection);
        Assertions.assertThat(consumeRecordsByTopic(2).allRecordsInOrder()).hasSize(2);
    }

    @Test
    public void shouldAllowForCustomSnapshot() throws InterruptedException, SQLException {
        Configuration build = TestHelper.defaultConfig().with(Db2ConnectorConfig.SNAPSHOT_MODE, Db2ConnectorConfig.SnapshotMode.CUSTOM.getValue()).with(Db2ConnectorConfig.SNAPSHOT_MODE_CUSTOM_NAME, CustomTestSnapshot.class.getName()).with(CommonConnectorConfig.SNAPSHOT_MODE_TABLES, "DB2INST1.TABLEA,DB2INST1.TABLEB").with(CommonConnectorConfig.SNAPSHOT_QUERY_MODE, CommonConnectorConfig.SnapshotQueryMode.CUSTOM).with(CommonConnectorConfig.SNAPSHOT_QUERY_MODE_CUSTOM_NAME, CustomTestSnapshot.class.getName()).build();
        this.connection.execute(new String[]{"INSERT INTO tableb VALUES (1, '1');"});
        start(Db2Connector.class, build);
        assertConnectorIsRunning();
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(2);
        List recordsForTopic = consumeRecordsByTopic.recordsForTopic("testdb.DB2INST1.TABLEA");
        List recordsForTopic2 = consumeRecordsByTopic.recordsForTopic("testdb.DB2INST1.TABLEB");
        if (recordsForTopic2 != null) {
            recordsForTopic2 = (List) recordsForTopic2.stream().filter(sourceRecord -> {
                return "r".equals(((Struct) sourceRecord.value()).get("op"));
            }).collect(Collectors.toList());
        }
        Assertions.assertThat(recordsForTopic.size()).isEqualTo(1);
        Assertions.assertThat(recordsForTopic2).isNull();
        VerifyRecord.isValidRead((SourceRecord) recordsForTopic.get(0), "ID", 1);
        TestHelper.enableDbCdc(this.connection);
        this.connection.execute(new String[]{"UPDATE ASNCDC.IBMSNAP_REGISTER SET STATE = 'A' WHERE SOURCE_OWNER = 'DB2INST1'"});
        TestHelper.refreshAndWait(this.connection);
        this.connection.execute(new String[]{"INSERT INTO tablea VALUES (2, '1');"});
        this.connection.execute(new String[]{"INSERT INTO tableb VALUES (2, '1');"});
        TestHelper.refreshAndWait(this.connection);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(2);
        List recordsForTopic3 = consumeRecordsByTopic2.recordsForTopic("testdb.DB2INST1.TABLEA");
        List recordsForTopic4 = consumeRecordsByTopic2.recordsForTopic("testdb.DB2INST1.TABLEB");
        Assertions.assertThat(recordsForTopic3.size()).isEqualTo(1);
        Assertions.assertThat(recordsForTopic4.size()).isEqualTo(1);
        VerifyRecord.isValidInsert((SourceRecord) recordsForTopic3.get(0), "ID", 2);
        VerifyRecord.isValidInsert((SourceRecord) recordsForTopic4.get(0), "ID", 2);
        stopConnector();
        start(Db2Connector.class, TestHelper.defaultConfig().with(Db2ConnectorConfig.SNAPSHOT_MODE, Db2ConnectorConfig.SnapshotMode.CUSTOM.getValue()).with(Db2ConnectorConfig.SNAPSHOT_MODE_CUSTOM_NAME, CustomTestSnapshot.class.getName()).with(CommonConnectorConfig.SNAPSHOT_QUERY_MODE, CommonConnectorConfig.SnapshotQueryMode.CUSTOM).with(CommonConnectorConfig.SNAPSHOT_QUERY_MODE_CUSTOM_NAME, CustomTestSnapshot.class.getName()).build());
        assertConnectorIsRunning();
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic3 = consumeRecordsByTopic(4);
        List recordsForTopic5 = consumeRecordsByTopic3.recordsForTopic("testdb.DB2INST1.TABLEA");
        List recordsForTopic6 = consumeRecordsByTopic3.recordsForTopic("testdb.DB2INST1.TABLEB");
        Assertions.assertThat(recordsForTopic5.size()).isEqualTo(2);
        Assertions.assertThat(recordsForTopic6.size()).isEqualTo(2);
        VerifyRecord.isValidRead((SourceRecord) recordsForTopic5.get(0), "ID", 1);
        VerifyRecord.isValidRead((SourceRecord) recordsForTopic5.get(1), "ID", 2);
        VerifyRecord.isValidRead((SourceRecord) recordsForTopic6.get(0), "ID", 1);
        VerifyRecord.isValidRead((SourceRecord) recordsForTopic6.get(1), "ID", 2);
    }

    private void assertRecord(Struct struct, List<SchemaAndValueField> list) {
        list.forEach(schemaAndValueField -> {
            schemaAndValueField.assertFor(struct);
        });
    }
}
