package io.debezium.connector.binlog;

import io.debezium.DebeziumException;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.connector.SnapshotType;
import io.debezium.connector.binlog.BinlogConnectorConfig;
import io.debezium.connector.binlog.junit.SkipTestDependingOnDatabaseRule;
import io.debezium.connector.binlog.junit.SkipWhenDatabaseIs;
import io.debezium.connector.binlog.util.BinlogTestConnection;
import io.debezium.connector.binlog.util.TestHelper;
import io.debezium.connector.binlog.util.UniqueDatabase;
import io.debezium.data.KeyValueStore;
import io.debezium.data.SchemaChangeHistory;
import io.debezium.data.VerifyRecord;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.junit.EqualityCheck;
import io.debezium.junit.SkipTestRule;
import io.debezium.junit.SkipWhenDatabaseVersion;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.history.MemorySchemaHistory;
import io.debezium.util.Testing;
import java.nio.file.Path;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;

@SkipWhenDatabaseVersion(check = EqualityCheck.LESS_THAN, major = 5, minor = 6, reason = "DDL uses fractional second data types, not supported until MySQL 5.6")
/* loaded from: input_file:io/debezium/connector/binlog/BinlogSnapshotSourceIT.class */
public abstract class BinlogSnapshotSourceIT<C extends SourceConnector> extends AbstractBinlogConnectorIT<C> {
    private static final Path SCHEMA_HISTORY_PATH = Testing.Files.createTestingPath("file-schema-history-snapshot.txt").toAbsolutePath();
    protected Configuration config;
    protected final UniqueDatabase DATABASE = TestHelper.getUniqueDatabase("logical_server_name", "connector_test_ro").withDbHistoryPath(SCHEMA_HISTORY_PATH);
    protected final UniqueDatabase OTHER_DATABASE = TestHelper.getUniqueDatabase("logical_server_name", "connector_test", this.DATABASE);
    protected final UniqueDatabase BINARY_FIELD_DATABASE = TestHelper.getUniqueDatabase("logical_server_name", "connector_read_binary_field_test");
    protected final UniqueDatabase CONFLICT_NAMES_DATABASE = TestHelper.getUniqueDatabase("logical_server_name", "mysql_dbz_6533");

    @Rule
    public TestRule skipDatabaseTypeRule = new SkipTestDependingOnDatabaseRule();

    @Rule
    public SkipTestRule skipRule = new SkipTestRule();
    private final Function<SourceRecord, String> getTableNameFromSourceRecord = sourceRecord -> {
        return ((Struct) sourceRecord.value()).getStruct("source").getString("table");
    };

    @Before
    public void beforeEach() {
        Testing.Files.delete(SCHEMA_HISTORY_PATH);
        this.DATABASE.createAndInitialize();
        this.OTHER_DATABASE.createAndInitialize();
        this.BINARY_FIELD_DATABASE.createAndInitialize();
        this.CONFLICT_NAMES_DATABASE.createAndInitialize();
    }

    @After
    public void afterEach() {
        try {
            stopConnector();
            Testing.Files.delete(SCHEMA_HISTORY_PATH);
        } catch (Throwable th) {
            Testing.Files.delete(SCHEMA_HISTORY_PATH);
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Configuration.Builder simpleConfig() {
        return this.DATABASE.defaultConfig().with(BinlogConnectorConfig.INCLUDE_SCHEMA_CHANGES, false).with(getSnapshotLockingModeField(), getSnapshotLockingModeMinimal()).with(BinlogConnectorConfig.INCLUDE_SQL_QUERY, true);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public abstract Field getSnapshotLockingModeField();

    protected abstract String getSnapshotLockingModeMinimal();

    /* JADX INFO: Access modifiers changed from: protected */
    public abstract String getSnapshotLockingModeNone();

    @Test
    public void shouldCreateSnapshotOfSingleDatabase() throws Exception {
        snapshotOfSingleDatabase(true, false, true);
    }

    @Test
    public void shouldCreateSnapshotOfSingleDatabaseWithoutGlobalLock() throws Exception {
        snapshotOfSingleDatabase(false, false, true);
    }

    @Test
    public void shouldCreateSnapshotOfSingleDatabaseWithoutGlobalLockAndStoreOnlyCapturedTables() throws Exception {
        snapshotOfSingleDatabase(false, true, true);
    }

    @Test
    public void shouldCreateSnapshotOfSingleDatabaseNoData() throws Exception {
        snapshotOfSingleDatabase(true, false, false);
    }

    @Test
    public void shouldCreateSnapshotOfSingleDatabaseWithoutGlobalLockNoData() throws Exception {
        snapshotOfSingleDatabase(false, false, false);
    }

    @Test
    public void shouldCreateSnapshotOfSingleDatabaseWithoutGlobalLockAndStoreOnlyCapturedTablesNoData() throws Exception {
        snapshotOfSingleDatabase(false, true, false);
    }

    private void snapshotOfSingleDatabase(boolean z, boolean z2, boolean z3) throws Exception {
        LogInterceptor logInterceptor = new LogInterceptor(BinlogSnapshotChangeEventSource.class);
        Configuration.Builder with = simpleConfig().with(BinlogConnectorConfig.TABLE_INCLUDE_LIST, this.DATABASE.qualifiedTableName("customers") + "," + this.DATABASE.qualifiedTableName("products")).with(BinlogConnectorConfig.INCLUDE_SCHEMA_CHANGES, true);
        if (!z) {
            with.with(BinlogConnectorConfig.USER, "cloud").with(BinlogConnectorConfig.PASSWORD, "cloudpass").with("test.disable.global.locking", "true").with(BinlogConnectorConfig.STORE_ONLY_CAPTURED_TABLES_DDL, z2);
        }
        if (!z3) {
            with.with(BinlogConnectorConfig.SNAPSHOT_MODE, BinlogConnectorConfig.SnapshotMode.NO_DATA);
        }
        this.config = with.build();
        start(getConnectorClass(), this.config);
        waitForSnapshotToBeCompleted(getConnectorName(), this.DATABASE.getServerName());
        KeyValueStore createForTopicsBeginningWith = KeyValueStore.createForTopicsBeginningWith(this.DATABASE.getServerName() + ".");
        SchemaChangeHistory schemaChangeHistory = new SchemaChangeHistory(this.DATABASE.getServerName());
        int i = z2 ? 8 : 14;
        String str = null;
        String str2 = null;
        Iterator it = consumeRecordsByTopicUntil((num, sourceRecord) -> {
            return !sourceRecord.sourceOffset().containsKey("snapshot");
        }).allRecordsInOrder().iterator();
        while (it.hasNext()) {
            SourceRecord sourceRecord2 = (SourceRecord) it.next();
            VerifyRecord.isValid(sourceRecord2);
            VerifyRecord.hasNoSourceQuery(sourceRecord2);
            createForTopicsBeginningWith.add(sourceRecord2);
            schemaChangeHistory.add(sourceRecord2);
            String string = ((Struct) sourceRecord2.value()).getStruct("source").getString("snapshot");
            String string2 = ((Struct) sourceRecord2.value()).getStruct("source").getString("table");
            if (it.hasNext()) {
                Assertions.assertThat(sourceRecord2.sourceOffset().get("snapshot")).isEqualTo(SnapshotType.INITIAL.toString());
                if (Objects.equals(string, "first")) {
                    Assertions.assertThat(str).isNull();
                } else if (Objects.equals(string, "first_in_data_collection")) {
                    Assertions.assertThat(str).isNotEqualTo(string2);
                } else if (Objects.equals(str2, "last_in_data_collection")) {
                    Assertions.assertThat(str).isNotEqualTo(string2);
                }
            } else {
                Assertions.assertThat(sourceRecord2.sourceOffset().get("snapshot")).isNull();
                Assertions.assertThat(string).isEqualTo("last");
            }
            if (!sourceRecord2.topic().equals(this.DATABASE.getServerName())) {
                str = string2;
                str2 = string;
            }
        }
        if (z2) {
            Assertions.assertThat(schemaChangeHistory.ddlRecordsForDatabaseOrEmpty("").size() + schemaChangeHistory.ddlRecordsForDatabaseOrEmpty(this.DATABASE.getDatabaseName()).size()).isEqualTo(i);
            Assertions.assertThat(schemaChangeHistory.ddlRecordsForDatabaseOrEmpty("").size() + schemaChangeHistory.ddlRecordsForDatabaseOrEmpty(this.OTHER_DATABASE.getDatabaseName()).size()).isEqualTo(1);
        } else {
            Assertions.assertThat(schemaChangeHistory.ddlRecordsForDatabaseOrEmpty("").size() + schemaChangeHistory.ddlRecordsForDatabaseOrEmpty(this.DATABASE.getDatabaseName()).size()).isEqualTo(i);
            Assertions.assertThat(schemaChangeHistory.ddlRecordsForDatabaseOrEmpty("").size() + schemaChangeHistory.ddlRecordsForDatabaseOrEmpty(this.OTHER_DATABASE.getDatabaseName()).size()).isEqualTo(1);
        }
        if (z) {
            logInterceptor.containsMessage("Releasing global read lock to enable MySQL writes");
        } else {
            logInterceptor.containsMessage("Table level locking is in place, the schema will be capture in two phases, now capturing:");
        }
        if (z3) {
            Assertions.assertThat(createForTopicsBeginningWith.collectionCount()).isEqualTo(2);
            KeyValueStore.Collection collection = createForTopicsBeginningWith.collection(this.DATABASE.getDatabaseName(), productsTableName());
            Assertions.assertThat(collection.numberOfCreates()).isEqualTo(0L);
            Assertions.assertThat(collection.numberOfUpdates()).isEqualTo(0L);
            Assertions.assertThat(collection.numberOfDeletes()).isEqualTo(0L);
            Assertions.assertThat(collection.numberOfReads()).isEqualTo(9L);
            Assertions.assertThat(collection.numberOfTombstones()).isEqualTo(0L);
            Assertions.assertThat(collection.numberOfKeySchemaChanges()).isEqualTo(1L);
            Assertions.assertThat(collection.numberOfValueSchemaChanges()).isEqualTo(1L);
            KeyValueStore.Collection collection2 = createForTopicsBeginningWith.collection(this.DATABASE.getDatabaseName(), "customers");
            Assertions.assertThat(collection2.numberOfCreates()).isEqualTo(0L);
            Assertions.assertThat(collection2.numberOfUpdates()).isEqualTo(0L);
            Assertions.assertThat(collection2.numberOfDeletes()).isEqualTo(0L);
            Assertions.assertThat(collection2.numberOfReads()).isEqualTo(4L);
            Assertions.assertThat(collection2.numberOfTombstones()).isEqualTo(0L);
            Assertions.assertThat(collection2.numberOfKeySchemaChanges()).isEqualTo(1L);
            Assertions.assertThat(collection2.numberOfValueSchemaChanges()).isEqualTo(1L);
            ArrayList arrayList = new ArrayList();
            collection2.forEach(sourceRecord3 -> {
                arrayList.add(((Struct) sourceRecord3.value()).getStruct("after"));
            });
            Struct struct = (Struct) arrayList.stream().sorted((struct2, struct3) -> {
                return struct2.getInt32("id").compareTo(struct3.getInt32("id"));
            }).findFirst().get();
            Assertions.assertThat(struct.get("first_name")).isInstanceOf(String.class);
            Assertions.assertThat(struct.get("last_name")).isInstanceOf(String.class);
            Assertions.assertThat(struct.get("email")).isInstanceOf(String.class);
            Assertions.assertThat(struct.get("first_name")).isEqualTo("Sally");
            Assertions.assertThat(struct.get("last_name")).isEqualTo("Thomas");
            Assertions.assertThat(struct.get("email")).isEqualTo("sally.thomas@acme.com");
        }
    }

    @Test
    @FixFor({"DBZ-2456"})
    public void shouldCreateSnapshotSelectively() throws Exception {
        this.config = simpleConfig().with(BinlogConnectorConfig.DATABASE_INCLUDE_LIST, "connector_(.*)_" + this.DATABASE.getIdentifier()).with(CommonConnectorConfig.SNAPSHOT_MODE_TABLES, "connector_(.*).CUSTOMERS").build();
        start(getConnectorClass(), this.config);
        waitForSnapshotToBeCompleted(getConnectorName(), this.DATABASE.getServerName());
        KeyValueStore createForTopicsBeginningWith = KeyValueStore.createForTopicsBeginningWith(this.DATABASE.getServerName() + ".");
        SchemaChangeHistory schemaChangeHistory = new SchemaChangeHistory(this.DATABASE.getServerName());
        consumeRecordsByTopic(8).allRecordsInOrder().forEach(sourceRecord -> {
            VerifyRecord.isValid(sourceRecord);
            VerifyRecord.hasNoSourceQuery(sourceRecord);
            createForTopicsBeginningWith.add(sourceRecord);
            schemaChangeHistory.add(sourceRecord);
        });
        Assertions.assertThat(schemaChangeHistory.recordCount()).isEqualTo(0);
        Assertions.assertThat(createForTopicsBeginningWith.databases()).containsOnly(new String[]{this.DATABASE.getDatabaseName(), this.OTHER_DATABASE.getDatabaseName()});
        Assertions.assertThat(createForTopicsBeginningWith.collectionCount()).isEqualTo(2);
        KeyValueStore.Collection collection = createForTopicsBeginningWith.collection(this.DATABASE.getDatabaseName(), "customers");
        Assertions.assertThat(collection.numberOfCreates()).isEqualTo(0L);
        Assertions.assertThat(collection.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat(collection.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat(collection.numberOfReads()).isEqualTo(4L);
        Assertions.assertThat(collection.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat(collection.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat(collection.numberOfValueSchemaChanges()).isEqualTo(1L);
        Assertions.assertThat(createForTopicsBeginningWith.collection(this.DATABASE.getDatabaseName(), "orders")).isNull();
    }

    @Test
    @FixFor({"DBZ-3952"})
    public void shouldNotFailStreamingOnNonSnapshottedTable() throws Exception {
        this.config = simpleConfig().with(BinlogConnectorConfig.DATABASE_INCLUDE_LIST, this.DATABASE.getDatabaseName()).with(RelationalDatabaseConnectorConfig.TABLE_INCLUDE_LIST, this.DATABASE.qualifiedTableName("ORDERS") + "," + this.DATABASE.qualifiedTableName("CUSTOMERS")).with(CommonConnectorConfig.SNAPSHOT_MODE_TABLES, this.DATABASE.qualifiedTableName("ORDERS")).build();
        start(getConnectorClass(), this.config);
        waitForSnapshotToBeCompleted(getConnectorName(), this.DATABASE.getServerName());
        Testing.Print.enable();
        KeyValueStore createForTopicsBeginningWith = KeyValueStore.createForTopicsBeginningWith(this.DATABASE.getServerName() + ".");
        SchemaChangeHistory schemaChangeHistory = new SchemaChangeHistory(this.DATABASE.getServerName());
        consumeRecordsByTopic(5).allRecordsInOrder().forEach(sourceRecord -> {
            VerifyRecord.isValid(sourceRecord);
            VerifyRecord.hasNoSourceQuery(sourceRecord);
            createForTopicsBeginningWith.add(sourceRecord);
            schemaChangeHistory.add(sourceRecord);
        });
        Assertions.assertThat(schemaChangeHistory.recordCount()).isEqualTo(0);
        Assertions.assertThat(createForTopicsBeginningWith.collection(this.DATABASE.getDatabaseName(), "customers")).isNull();
        Assertions.assertThat(createForTopicsBeginningWith.collection(this.DATABASE.getDatabaseName(), "orders").numberOfReads()).isEqualTo(5L);
        BinlogTestConnection testDatabaseConnection = getTestDatabaseConnection(this.DATABASE.getDatabaseName());
        try {
            JdbcConnection connect = testDatabaseConnection.connect();
            try {
                Connection connection = connect.connection();
                try {
                    Statement createStatement = connection.createStatement();
                    try {
                        createStatement.executeUpdate("INSERT INTO customers VALUES (default,'John','Lazy','john.lazy@acme.com')");
                        if (createStatement != null) {
                            createStatement.close();
                        }
                        if (connection != null) {
                            connection.close();
                        }
                        if (connect != null) {
                            connect.close();
                        }
                        if (testDatabaseConnection != null) {
                            testDatabaseConnection.close();
                        }
                        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
                        Assertions.assertThat(consumeRecordsByTopic.topics()).hasSize(1);
                        Assertions.assertThat(((Struct) ((SourceRecord) consumeRecordsByTopic.recordsForTopic(this.DATABASE.topicForTable("customers")).get(0)).value()).getStruct("after").getString("email")).isEqualTo("john.lazy@acme.com");
                    } catch (Throwable th) {
                        if (createStatement != null) {
                            try {
                                createStatement.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        }
                        throw th;
                    }
                } catch (Throwable th3) {
                    if (connection != null) {
                        try {
                            connection.close();
                        } catch (Throwable th4) {
                            th3.addSuppressed(th4);
                        }
                    }
                    throw th3;
                }
            } finally {
            }
        } catch (Throwable th5) {
            if (testDatabaseConnection != null) {
                try {
                    testDatabaseConnection.close();
                } catch (Throwable th6) {
                    th5.addSuppressed(th6);
                }
            }
            throw th5;
        }
    }

    @Test
    @FixFor({"DBZ-3238"})
    public void shouldSnapshotCorrectlyReadFields() throws Exception {
        this.config = simpleConfig().with(BinlogConnectorConfig.DATABASE_INCLUDE_LIST, "connector_read_binary_field_test_" + this.BINARY_FIELD_DATABASE.getIdentifier()).with(BinlogConnectorConfig.TABLE_INCLUDE_LIST, this.BINARY_FIELD_DATABASE.qualifiedTableName("binary_field")).with(BinlogConnectorConfig.ROW_COUNT_FOR_STREAMING_RESULT_SETS, "0").with(BinlogConnectorConfig.SNAPSHOT_FETCH_SIZE, "101").build();
        start(getConnectorClass(), this.config);
        waitForSnapshotToBeCompleted(getConnectorName(), this.BINARY_FIELD_DATABASE.getServerName());
        KeyValueStore createForTopicsBeginningWith = KeyValueStore.createForTopicsBeginningWith(this.BINARY_FIELD_DATABASE.getServerName() + ".");
        SchemaChangeHistory schemaChangeHistory = new SchemaChangeHistory(this.BINARY_FIELD_DATABASE.getServerName());
        consumeRecordsByTopic(1).allRecordsInOrder().forEach(sourceRecord -> {
            VerifyRecord.isValid(sourceRecord);
            VerifyRecord.hasNoSourceQuery(sourceRecord);
            createForTopicsBeginningWith.add(sourceRecord);
            schemaChangeHistory.add(sourceRecord);
        });
        Assertions.assertThat(schemaChangeHistory.recordCount()).isEqualTo(0);
        Assertions.assertThat(createForTopicsBeginningWith.databases()).contains(new String[]{this.BINARY_FIELD_DATABASE.getDatabaseName()});
        Assertions.assertThat(createForTopicsBeginningWith.collectionCount()).isEqualTo(1);
        KeyValueStore.Collection collection = createForTopicsBeginningWith.collection(this.BINARY_FIELD_DATABASE.getDatabaseName(), "binary_field");
        Assertions.assertThat(collection.numberOfCreates()).isEqualTo(0L);
        Assertions.assertThat(collection.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat(collection.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat(collection.numberOfReads()).isEqualTo(1L);
    }

    @Test
    @SkipWhenDatabaseIs(value = SkipWhenDatabaseIs.Type.MARIADB, reason = "Does not use this transform")
    public void shouldCreateSnapshotOfSingleDatabaseUsingInsertEvents() throws Exception {
        this.config = simpleConfig().with(BinlogConnectorConfig.DATABASE_INCLUDE_LIST, "connector_(.*)_" + this.DATABASE.getIdentifier()).with("transforms", "snapshotasinsert").with("transforms.snapshotasinsert.type", "io.debezium.connector.mysql.transforms.ReadToInsertEvent").build();
        start(getConnectorClass(), this.config);
        waitForSnapshotToBeCompleted(getConnectorName(), this.DATABASE.getServerName());
        KeyValueStore createForTopicsBeginningWith = KeyValueStore.createForTopicsBeginningWith(this.DATABASE.getServerName() + ".");
        SchemaChangeHistory schemaChangeHistory = new SchemaChangeHistory(this.DATABASE.getServerName());
        consumeRecordsByTopic(55).allRecordsInOrder().forEach(sourceRecord -> {
            VerifyRecord.isValid(sourceRecord);
            VerifyRecord.hasNoSourceQuery(sourceRecord);
            createForTopicsBeginningWith.add(sourceRecord);
            schemaChangeHistory.add(sourceRecord);
        });
        Assertions.assertThat(schemaChangeHistory.recordCount()).isEqualTo(0);
        Assertions.assertThat(createForTopicsBeginningWith.databases()).containsOnly(new String[]{this.DATABASE.getDatabaseName(), this.OTHER_DATABASE.getDatabaseName()});
        Assertions.assertThat(createForTopicsBeginningWith.collectionCount()).isEqualTo(9);
        KeyValueStore.Collection collection = createForTopicsBeginningWith.collection(this.DATABASE.getDatabaseName(), productsTableName());
        Assertions.assertThat(collection.numberOfCreates()).isEqualTo(9L);
        Assertions.assertThat(collection.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat(collection.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat(collection.numberOfReads()).isEqualTo(0L);
        Assertions.assertThat(collection.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat(collection.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat(collection.numberOfValueSchemaChanges()).isEqualTo(1L);
        KeyValueStore.Collection collection2 = createForTopicsBeginningWith.collection(this.DATABASE.getDatabaseName(), "products_on_hand");
        Assertions.assertThat(collection2.numberOfCreates()).isEqualTo(9L);
        Assertions.assertThat(collection2.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat(collection2.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat(collection2.numberOfReads()).isEqualTo(0L);
        Assertions.assertThat(collection2.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat(collection2.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat(collection2.numberOfValueSchemaChanges()).isEqualTo(1L);
        KeyValueStore.Collection collection3 = createForTopicsBeginningWith.collection(this.DATABASE.getDatabaseName(), "customers");
        Assertions.assertThat(collection3.numberOfCreates()).isEqualTo(4L);
        Assertions.assertThat(collection3.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat(collection3.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat(collection3.numberOfReads()).isEqualTo(0L);
        Assertions.assertThat(collection3.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat(collection3.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat(collection3.numberOfValueSchemaChanges()).isEqualTo(1L);
        KeyValueStore.Collection collection4 = createForTopicsBeginningWith.collection(this.DATABASE.getDatabaseName(), "orders");
        Assertions.assertThat(collection4.numberOfCreates()).isEqualTo(5L);
        Assertions.assertThat(collection4.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat(collection4.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat(collection4.numberOfReads()).isEqualTo(0L);
        Assertions.assertThat(collection4.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat(collection4.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat(collection4.numberOfValueSchemaChanges()).isEqualTo(1L);
        KeyValueStore.Collection collection5 = createForTopicsBeginningWith.collection(this.DATABASE.getDatabaseName(), "dbz_342_timetest");
        Assertions.assertThat(collection5.numberOfCreates()).isEqualTo(1L);
        Assertions.assertThat(collection5.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat(collection5.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat(collection5.numberOfReads()).isEqualTo(0L);
        Assertions.assertThat(collection5.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat(collection5.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat(collection5.numberOfValueSchemaChanges()).isEqualTo(1L);
        ArrayList arrayList = new ArrayList();
        collection5.forEach(sourceRecord2 -> {
            arrayList.add(((Struct) sourceRecord2.value()).getStruct("after"));
        });
        Struct struct = (Struct) arrayList.get(0);
        Assertions.assertThat(struct.get("c1")).isEqualTo(Long.valueOf(toMicroSeconds(isMariaDb() ? "PT517H51M04.77S" : "PT517H51M04.78S")));
        Assertions.assertThat(struct.get("c2")).isEqualTo(Long.valueOf(toMicroSeconds("-PT13H14M50S")));
        Assertions.assertThat(struct.get("c3")).isEqualTo(Long.valueOf(toMicroSeconds("-PT733H0M0.001S")));
        Assertions.assertThat(struct.get("c4")).isEqualTo(Long.valueOf(toMicroSeconds("-PT1H59M59.001S")));
        Assertions.assertThat(struct.get("c5")).isEqualTo(Long.valueOf(toMicroSeconds("-PT838H59M58.999999S")));
        Assertions.assertThat(struct.get("c6")).isEqualTo(Long.valueOf(toMicroSeconds("-PT00H20M38.000000S")));
        Assertions.assertThat(struct.get("c7")).isEqualTo(Long.valueOf(toMicroSeconds("-PT01H01M01.000001S")));
        Assertions.assertThat(struct.get("c8")).isEqualTo(Long.valueOf(toMicroSeconds("-PT01H01M01.000000S")));
        Assertions.assertThat(struct.get("c9")).isEqualTo(Long.valueOf(toMicroSeconds("-PT01H01M00.000000S")));
        Assertions.assertThat(struct.get("c10")).isEqualTo(Long.valueOf(toMicroSeconds("-PT01H00M00.000000S")));
        Assertions.assertThat(struct.get("c11")).isEqualTo(Long.valueOf(toMicroSeconds("-PT00H00M00.000000S")));
    }

    protected String productsTableName() throws SQLException {
        BinlogTestConnection testDatabaseConnection = getTestDatabaseConnection(this.DATABASE.getDatabaseName());
        try {
            String str = testDatabaseConnection.isTableIdCaseSensitive() ? "products" : "Products";
            if (testDatabaseConnection != null) {
                testDatabaseConnection.close();
            }
            return str;
        } catch (Throwable th) {
            if (testDatabaseConnection != null) {
                try {
                    testDatabaseConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void shouldCreateSnapshotOfSingleDatabaseWithSchemaChanges() throws Exception {
        this.config = simpleConfig().with(BinlogConnectorConfig.INCLUDE_SCHEMA_CHANGES, true).build();
        start(getConnectorClass(), this.config);
        waitForSnapshotToBeCompleted(getConnectorName(), this.DATABASE.getServerName());
        KeyValueStore createForTopicsBeginningWith = KeyValueStore.createForTopicsBeginningWith(this.DATABASE.getServerName() + ".");
        SchemaChangeHistory schemaChangeHistory = new SchemaChangeHistory(this.DATABASE.getServerName());
        consumeRecordsByTopic(42).allRecordsInOrder().forEach(sourceRecord -> {
            VerifyRecord.isValid(sourceRecord);
            VerifyRecord.hasNoSourceQuery(sourceRecord);
            createForTopicsBeginningWith.add(sourceRecord);
            schemaChangeHistory.add(sourceRecord);
        });
        Assertions.assertThat(schemaChangeHistory.recordCount()).isEqualTo(14);
        Assertions.assertThat(schemaChangeHistory.databaseCount()).isEqualTo(2);
        Assertions.assertThat(schemaChangeHistory.databases()).containsOnly(new String[]{this.DATABASE.getDatabaseName(), ""});
        Assertions.assertThat(createForTopicsBeginningWith.collectionCount()).isEqualTo(5);
        KeyValueStore.Collection collection = createForTopicsBeginningWith.collection(this.DATABASE.getDatabaseName(), productsTableName());
        Assertions.assertThat(collection.numberOfCreates()).isEqualTo(0L);
        Assertions.assertThat(collection.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat(collection.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat(collection.numberOfReads()).isEqualTo(9L);
        Assertions.assertThat(collection.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat(collection.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat(collection.numberOfValueSchemaChanges()).isEqualTo(1L);
        KeyValueStore.Collection collection2 = createForTopicsBeginningWith.collection(this.DATABASE.getDatabaseName(), "products_on_hand");
        Assertions.assertThat(collection2.numberOfCreates()).isEqualTo(0L);
        Assertions.assertThat(collection2.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat(collection2.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat(collection2.numberOfReads()).isEqualTo(9L);
        Assertions.assertThat(collection2.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat(collection2.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat(collection2.numberOfValueSchemaChanges()).isEqualTo(1L);
        KeyValueStore.Collection collection3 = createForTopicsBeginningWith.collection(this.DATABASE.getDatabaseName(), "customers");
        Assertions.assertThat(collection3.numberOfCreates()).isEqualTo(0L);
        Assertions.assertThat(collection3.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat(collection3.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat(collection3.numberOfReads()).isEqualTo(4L);
        Assertions.assertThat(collection3.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat(collection3.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat(collection3.numberOfValueSchemaChanges()).isEqualTo(1L);
        KeyValueStore.Collection collection4 = createForTopicsBeginningWith.collection(this.DATABASE.getDatabaseName(), "orders");
        Assertions.assertThat(collection4.numberOfCreates()).isEqualTo(0L);
        Assertions.assertThat(collection4.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat(collection4.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat(collection4.numberOfReads()).isEqualTo(5L);
        Assertions.assertThat(collection4.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat(collection4.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat(collection4.numberOfValueSchemaChanges()).isEqualTo(1L);
        KeyValueStore.Collection collection5 = createForTopicsBeginningWith.collection(this.DATABASE.getDatabaseName(), "dbz_342_timetest");
        Assertions.assertThat(collection5.numberOfCreates()).isEqualTo(0L);
        Assertions.assertThat(collection5.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat(collection5.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat(collection5.numberOfReads()).isEqualTo(1L);
        Assertions.assertThat(collection5.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat(collection5.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat(collection5.numberOfValueSchemaChanges()).isEqualTo(1L);
        ArrayList arrayList = new ArrayList();
        collection5.forEach(sourceRecord2 -> {
            arrayList.add(((Struct) sourceRecord2.value()).getStruct("after"));
        });
        Struct struct = (Struct) arrayList.get(0);
        Assertions.assertThat(struct.get("c1")).isEqualTo(Long.valueOf(toMicroSeconds(isMariaDb() ? "PT517H51M04.77S" : "PT517H51M04.78S")));
        Assertions.assertThat(struct.get("c2")).isEqualTo(Long.valueOf(toMicroSeconds("-PT13H14M50S")));
        Assertions.assertThat(struct.get("c3")).isEqualTo(Long.valueOf(toMicroSeconds("-PT733H0M0.001S")));
        Assertions.assertThat(struct.get("c4")).isEqualTo(Long.valueOf(toMicroSeconds("-PT1H59M59.001S")));
        Assertions.assertThat(struct.get("c5")).isEqualTo(Long.valueOf(toMicroSeconds("-PT838H59M58.999999S")));
        Assertions.assertThat(struct.get("c6")).isEqualTo(Long.valueOf(toMicroSeconds("-PT00H20M38.000000S")));
        Assertions.assertThat(struct.get("c7")).isEqualTo(Long.valueOf(toMicroSeconds("-PT01H01M01.000001S")));
        Assertions.assertThat(struct.get("c8")).isEqualTo(Long.valueOf(toMicroSeconds("-PT01H01M01.000000S")));
        Assertions.assertThat(struct.get("c9")).isEqualTo(Long.valueOf(toMicroSeconds("-PT01H01M00.000000S")));
        Assertions.assertThat(struct.get("c10")).isEqualTo(Long.valueOf(toMicroSeconds("-PT01H00M00.000000S")));
        Assertions.assertThat(struct.get("c11")).isEqualTo(Long.valueOf(toMicroSeconds("-PT00H00M00.000000S")));
    }

    @Test(expected = DebeziumException.class)
    public void shouldCreateSnapshotSchemaOnlyRecovery_exception() throws Exception {
        this.config = simpleConfig().with(BinlogConnectorConfig.SNAPSHOT_MODE, BinlogConnectorConfig.SnapshotMode.RECOVERY).build();
        AtomicReference atomicReference = new AtomicReference();
        start(getConnectorClass(), this.config, (z, str, th) -> {
            atomicReference.set(th);
        });
        waitForConnectorShutdown(getConnectorName(), this.DATABASE.getServerName());
        throw ((RuntimeException) atomicReference.get());
    }

    @Test
    public void shouldCreateSnapshotSchemaOnlyRecovery() throws Exception {
        Configuration.Builder with = simpleConfig().with(BinlogConnectorConfig.SNAPSHOT_MODE, BinlogConnectorConfig.SnapshotMode.INITIAL).with(BinlogConnectorConfig.TABLE_INCLUDE_LIST, this.DATABASE.qualifiedTableName("customers")).with(BinlogConnectorConfig.SCHEMA_HISTORY, MemorySchemaHistory.class.getName());
        this.config = with.build();
        start(getConnectorClass(), this.config);
        Assertions.assertThat(consumeRecordsByTopic(4).allRecordsInOrder()).hasSize(4);
        stopConnector();
        with.with(BinlogConnectorConfig.SNAPSHOT_MODE, BinlogConnectorConfig.SnapshotMode.RECOVERY);
        this.config = with.build();
        start(getConnectorClass(), this.config);
        BinlogTestConnection testDatabaseConnection = getTestDatabaseConnection(this.DATABASE.getDatabaseName());
        try {
            JdbcConnection connect = testDatabaseConnection.connect();
            try {
                Connection connection = connect.connection();
                try {
                    Statement createStatement = connection.createStatement();
                    try {
                        createStatement.executeUpdate("INSERT INTO customers VALUES (default,'John','Lazy','john.lazy@acme.com')");
                        if (createStatement != null) {
                            createStatement.close();
                        }
                        if (connection != null) {
                            connection.close();
                        }
                        if (connect != null) {
                            connect.close();
                        }
                        if (testDatabaseConnection != null) {
                            testDatabaseConnection.close();
                        }
                        Assertions.assertThat(consumeRecordsByTopic(1).allRecordsInOrder()).hasSize(1);
                    } catch (Throwable th) {
                        if (createStatement != null) {
                            try {
                                createStatement.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        }
                        throw th;
                    }
                } catch (Throwable th3) {
                    if (connection != null) {
                        try {
                            connection.close();
                        } catch (Throwable th4) {
                            th3.addSuppressed(th4);
                        }
                    }
                    throw th3;
                }
            } finally {
            }
        } catch (Throwable th5) {
            if (testDatabaseConnection != null) {
                try {
                    testDatabaseConnection.close();
                } catch (Throwable th6) {
                    th5.addSuppressed(th6);
                }
            }
            throw th5;
        }
    }

    @Test
    public void shouldSnapshotTablesInOrderSpecifiedInTableIncludeList() throws Exception {
        this.config = simpleConfig().with(BinlogConnectorConfig.TABLE_INCLUDE_LIST, "connector_test_ro_(.*).orders,connector_test_ro_(.*).Products,connector_test_ro_(.*).products_on_hand,connector_test_ro_(.*).dbz_342_timetest").build();
        start(getConnectorClass(), this.config);
        waitForSnapshotToBeCompleted(getConnectorName(), this.DATABASE.getServerName());
        LinkedHashSet linkedHashSet = new LinkedHashSet();
        LinkedHashSet<String> tableNamesInSpecifiedOrder = getTableNamesInSpecifiedOrder("orders", "Products", "products_on_hand", "dbz_342_timetest");
        consumeRecordsByTopic(24).allRecordsInOrder().forEach(sourceRecord -> {
            VerifyRecord.isValid(sourceRecord);
            VerifyRecord.hasNoSourceQuery(sourceRecord);
            if (sourceRecord.value() != null) {
                linkedHashSet.add(this.getTableNameFromSourceRecord.apply(sourceRecord));
            }
        });
        Assert.assertArrayEquals(linkedHashSet.toArray(), tableNamesInSpecifiedOrder.toArray());
    }

    @Test
    @FixFor({"DBZ-6533"})
    public void shouldSnapshotTablesInOrderSpecifiedInTableIncludeListWithConflictingNames() throws Exception {
        this.config = simpleConfig().with(BinlogConnectorConfig.DATABASE_INCLUDE_LIST, this.CONFLICT_NAMES_DATABASE.getDatabaseName()).with(BinlogConnectorConfig.TABLE_INCLUDE_LIST, this.CONFLICT_NAMES_DATABASE.qualifiedTableName("tablename") + "," + this.CONFLICT_NAMES_DATABASE.qualifiedTableName("another") + "," + this.CONFLICT_NAMES_DATABASE.qualifiedTableName("tablename_suffix")).build();
        start(getConnectorClass(), this.config);
        waitForSnapshotToBeCompleted(getConnectorName(), this.DATABASE.getServerName());
        LinkedHashSet linkedHashSet = new LinkedHashSet();
        LinkedHashSet<String> tableNamesInSpecifiedOrder = getTableNamesInSpecifiedOrder("tablename", "another", "tablename_suffix");
        consumeRecordsByTopic(3).allRecordsInOrder().forEach(sourceRecord -> {
            VerifyRecord.isValid(sourceRecord);
            VerifyRecord.hasNoSourceQuery(sourceRecord);
            if (sourceRecord.value() != null) {
                linkedHashSet.add(this.getTableNameFromSourceRecord.apply(sourceRecord));
            }
        });
        Assert.assertArrayEquals(tableNamesInSpecifiedOrder.toArray(), linkedHashSet.toArray());
    }

    @Test
    public void shouldSnapshotTablesInRowCountOrderAsc() throws Exception {
        BinlogTestConnection testDatabaseConnection = getTestDatabaseConnection(this.DATABASE.getDatabaseName());
        try {
            JdbcConnection connect = testDatabaseConnection.connect();
            try {
                Connection connection = connect.connection();
                try {
                    Statement createStatement = connection.createStatement();
                    try {
                        createStatement.execute("ANALYZE TABLE Products");
                        createStatement.execute("ANALYZE TABLE dbz_342_timetest");
                        if (createStatement != null) {
                            createStatement.close();
                        }
                        if (connection != null) {
                            connection.close();
                        }
                        if (connect != null) {
                            connect.close();
                        }
                        if (testDatabaseConnection != null) {
                            testDatabaseConnection.close();
                        }
                        this.config = simpleConfig().with(BinlogConnectorConfig.TABLE_INCLUDE_LIST, "connector_test_ro_(.*).Products,connector_test_ro_(.*).dbz_342_timetest").with(BinlogConnectorConfig.SNAPSHOT_TABLES_ORDER_BY_ROW_COUNT, RelationalDatabaseConnectorConfig.SnapshotTablesRowCountOrder.ASCENDING).build();
                        start(getConnectorClass(), this.config);
                        waitForSnapshotToBeCompleted(getConnectorName(), this.DATABASE.getServerName());
                        LinkedHashSet linkedHashSet = new LinkedHashSet();
                        LinkedHashSet<String> tableNamesInSpecifiedOrder = getTableNamesInSpecifiedOrder("dbz_342_timetest", "Products");
                        consumeRecordsByTopic(10).allRecordsInOrder().forEach(sourceRecord -> {
                            VerifyRecord.isValid(sourceRecord);
                            VerifyRecord.hasNoSourceQuery(sourceRecord);
                            if (sourceRecord.value() != null) {
                                linkedHashSet.add(this.getTableNameFromSourceRecord.apply(sourceRecord));
                            }
                        });
                        Assert.assertArrayEquals(tableNamesInSpecifiedOrder.toArray(), linkedHashSet.toArray());
                    } catch (Throwable th) {
                        if (createStatement != null) {
                            try {
                                createStatement.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        }
                        throw th;
                    }
                } catch (Throwable th3) {
                    if (connection != null) {
                        try {
                            connection.close();
                        } catch (Throwable th4) {
                            th3.addSuppressed(th4);
                        }
                    }
                    throw th3;
                }
            } finally {
            }
        } catch (Throwable th5) {
            if (testDatabaseConnection != null) {
                try {
                    testDatabaseConnection.close();
                } catch (Throwable th6) {
                    th5.addSuppressed(th6);
                }
            }
            throw th5;
        }
    }

    @Test
    public void shouldSnapshotTablesInRowCountOrderDesc() throws Exception {
        BinlogTestConnection testDatabaseConnection = getTestDatabaseConnection(this.DATABASE.getDatabaseName());
        try {
            JdbcConnection connect = testDatabaseConnection.connect();
            try {
                Connection connection = connect.connection();
                try {
                    Statement createStatement = connection.createStatement();
                    try {
                        createStatement.execute("ANALYZE TABLE Products");
                        createStatement.execute("ANALYZE TABLE dbz_342_timetest");
                        if (createStatement != null) {
                            createStatement.close();
                        }
                        if (connection != null) {
                            connection.close();
                        }
                        if (connect != null) {
                            connect.close();
                        }
                        if (testDatabaseConnection != null) {
                            testDatabaseConnection.close();
                        }
                        this.config = simpleConfig().with(BinlogConnectorConfig.TABLE_INCLUDE_LIST, "connector_test_ro_(.*).dbz_342_timetest,connector_test_ro_(.*).Products").with(BinlogConnectorConfig.SNAPSHOT_TABLES_ORDER_BY_ROW_COUNT, RelationalDatabaseConnectorConfig.SnapshotTablesRowCountOrder.DESCENDING).build();
                        start(getConnectorClass(), this.config);
                        waitForSnapshotToBeCompleted(getConnectorName(), this.DATABASE.getServerName());
                        LinkedHashSet linkedHashSet = new LinkedHashSet();
                        LinkedHashSet<String> tableNamesInSpecifiedOrder = getTableNamesInSpecifiedOrder("Products", "dbz_342_timetest");
                        consumeRecordsByTopic(10).allRecordsInOrder().forEach(sourceRecord -> {
                            VerifyRecord.isValid(sourceRecord);
                            VerifyRecord.hasNoSourceQuery(sourceRecord);
                            if (sourceRecord.value() != null) {
                                linkedHashSet.add(this.getTableNameFromSourceRecord.apply(sourceRecord));
                            }
                        });
                        Assert.assertArrayEquals(tableNamesInSpecifiedOrder.toArray(), linkedHashSet.toArray());
                    } catch (Throwable th) {
                        if (createStatement != null) {
                            try {
                                createStatement.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        }
                        throw th;
                    }
                } catch (Throwable th3) {
                    if (connection != null) {
                        try {
                            connection.close();
                        } catch (Throwable th4) {
                            th3.addSuppressed(th4);
                        }
                    }
                    throw th3;
                }
            } finally {
            }
        } catch (Throwable th5) {
            if (testDatabaseConnection != null) {
                try {
                    testDatabaseConnection.close();
                } catch (Throwable th6) {
                    th5.addSuppressed(th6);
                }
            }
            throw th5;
        }
    }

    @Test
    public void shouldSnapshotTablesInLexicographicalOrder() throws Exception {
        this.config = simpleConfig().build();
        start(getConnectorClass(), this.config);
        waitForSnapshotToBeCompleted(getConnectorName(), this.DATABASE.getServerName());
        LinkedHashSet linkedHashSet = new LinkedHashSet();
        LinkedHashSet<String> tableNamesInSpecifiedOrder = getTableNamesInSpecifiedOrder("Products", "customers", "dbz_342_timetest", "orders", "products_on_hand");
        consumeRecordsByTopic(28).allRecordsInOrder().forEach(sourceRecord -> {
            VerifyRecord.isValid(sourceRecord);
            VerifyRecord.hasNoSourceQuery(sourceRecord);
            if (sourceRecord.value() != null) {
                linkedHashSet.add(this.getTableNameFromSourceRecord.apply(sourceRecord));
            }
        });
        Assert.assertArrayEquals(linkedHashSet.toArray(), tableNamesInSpecifiedOrder.toArray());
    }

    private LinkedHashSet<String> getTableNamesInSpecifiedOrder(String... strArr) {
        return new LinkedHashSet<>(Arrays.asList(strArr));
    }

    @Test
    public void shouldCreateSnapshotSchemaOnly() throws Exception {
        this.config = simpleConfig().with(BinlogConnectorConfig.SNAPSHOT_MODE, BinlogConnectorConfig.SnapshotMode.NO_DATA).with(BinlogConnectorConfig.INCLUDE_SCHEMA_CHANGES, true).with(Heartbeat.HEARTBEAT_INTERVAL, 300000).build();
        start(getConnectorClass(), this.config);
        waitForSnapshotToBeCompleted(getConnectorName(), this.DATABASE.getServerName());
        KeyValueStore createForTopicsBeginningWith = KeyValueStore.createForTopicsBeginningWith(this.DATABASE.getServerName() + ".");
        SchemaChangeHistory schemaChangeHistory = new SchemaChangeHistory(this.DATABASE.getServerName());
        List allRecordsInOrder = consumeRecordsByTopic(15).allRecordsInOrder();
        Iterator it = allRecordsInOrder.subList(0, allRecordsInOrder.size() - 1).iterator();
        while (it.hasNext()) {
            SourceRecord sourceRecord = (SourceRecord) it.next();
            VerifyRecord.isValid(sourceRecord);
            VerifyRecord.hasNoSourceQuery(sourceRecord);
            createForTopicsBeginningWith.add(sourceRecord);
            schemaChangeHistory.add(sourceRecord);
            if (!sourceRecord.topic().startsWith("__debezium-heartbeat")) {
                String string = ((Struct) sourceRecord.value()).getStruct("source").getString("snapshot");
                if (it.hasNext()) {
                    Assertions.assertThat(sourceRecord.sourceOffset().get("snapshot")).isEqualTo(SnapshotType.INITIAL.toString());
                    Assertions.assertThat(string).isEqualTo("true");
                } else {
                    Assertions.assertThat(sourceRecord.sourceOffset().get("snapshot")).isNull();
                    Assertions.assertThat(string).isEqualTo("last");
                }
            }
        }
        SourceRecord sourceRecord2 = (SourceRecord) allRecordsInOrder.get(allRecordsInOrder.size() - 1);
        Assertions.assertThat(schemaChangeHistory.recordCount()).isEqualTo(14);
        Assertions.assertThat(createForTopicsBeginningWith.collectionCount()).isEqualTo(0);
        Assertions.assertThat(sourceRecord2.topic()).startsWith("__debezium-heartbeat");
        Assertions.assertThat(sourceRecord2).isNotNull();
        Assertions.assertThat(sourceRecord2.sourceOffset().get("snapshot")).isNull();
    }

    protected long toMicroSeconds(String str) {
        return Duration.parse(str).toNanos() / 1000;
    }
}
