package io.debezium.connector.binlog;

import io.debezium.DebeziumException;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.connector.binlog.BinlogConnectorConfig;
import io.debezium.connector.binlog.junit.SkipTestDependingOnDatabaseRule;
import io.debezium.connector.binlog.junit.SkipWhenDatabaseIs;
import io.debezium.connector.binlog.util.BinlogTestConnection;
import io.debezium.connector.binlog.util.TestHelper;
import io.debezium.connector.binlog.util.UniqueDatabase;
import io.debezium.data.KeyValueStore;
import io.debezium.data.SchemaChangeHistory;
import io.debezium.data.VerifyRecord;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.junit.EqualityCheck;
import io.debezium.junit.SkipWhenDatabaseVersion;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.relational.history.SchemaHistory;
import io.debezium.time.ZonedTimestamp;
import io.debezium.util.Testing;
import java.io.PrintStream;
import java.lang.management.ManagementFactory;
import java.nio.file.Path;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;

@SkipWhenDatabaseIs(value = SkipWhenDatabaseIs.Type.MYSQL, versions = {@SkipWhenDatabaseVersion(check = EqualityCheck.LESS_THAN, major = 5, minor = 6, reason = "DDL uses fractional second data types, not supported until MySQL 5.6")})
/* loaded from: input_file:io/debezium/connector/binlog/BinlogStreamingSourceIT.class */
public abstract class BinlogStreamingSourceIT<C extends SourceConnector> extends AbstractBinlogConnectorIT<C> {
    protected static final Path SCHEMA_HISTORY_PATH = Testing.Files.createTestingPath("file-schema-history-binlog.txt").toAbsolutePath();
    protected Configuration config;
    private KeyValueStore store;
    private SchemaChangeHistory schemaChanges;
    protected final UniqueDatabase DATABASE = TestHelper.getUniqueDatabase("logical_server_name", "connector_test_ro").withDbHistoryPath(SCHEMA_HISTORY_PATH);

    @Rule
    public TestRule skipRule = new SkipTestDependingOnDatabaseRule();

    @Before
    public void beforeEach() {
        stopConnector();
        this.DATABASE.createAndInitialize();
        initializeConnectorTestFramework();
        Testing.Files.delete(SCHEMA_HISTORY_PATH);
        this.store = KeyValueStore.createForTopicsBeginningWith(this.DATABASE.getServerName() + ".");
        this.schemaChanges = new SchemaChangeHistory(this.DATABASE.getServerName());
    }

    @After
    public void afterEach() {
        try {
            stopConnector();
            Testing.Files.delete(SCHEMA_HISTORY_PATH);
        } catch (Throwable th) {
            Testing.Files.delete(SCHEMA_HISTORY_PATH);
            throw th;
        }
    }

    protected int consumeAtLeast(int i) throws InterruptedException {
        return consumeAtLeast(i, 20L, TimeUnit.SECONDS);
    }

    protected int consumeAtLeast(int i, long j, TimeUnit timeUnit) throws InterruptedException {
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(i);
        int size = consumeRecordsByTopic.allRecordsInOrder().size();
        consumeRecordsByTopic.forEach(sourceRecord -> {
            VerifyRecord.isValid(sourceRecord);
            this.store.add(sourceRecord);
            this.schemaChanges.add(sourceRecord);
        });
        Testing.print(size + " records");
        return size;
    }

    protected long filterAtLeast(int i, long j, TimeUnit timeUnit) throws InterruptedException {
        long j2 = i;
        long currentTimeMillis = System.currentTimeMillis();
        while (getNumberOfEventsFiltered() < j2 && System.currentTimeMillis() - currentTimeMillis < timeUnit.toMillis(j)) {
            consumeRecord();
        }
        return getNumberOfEventsFiltered();
    }

    private long getNumberOfEventsFiltered() {
        try {
            return ((Long) ManagementFactory.getPlatformMBeanServer().getAttribute(getStreamingMetricsObjectName(getConnectorName(), this.DATABASE.getServerName(), "streaming"), "NumberOfEventsFiltered")).longValue();
        } catch (Exception e) {
            throw new DebeziumException(e);
        }
    }

    private long getNumberOfSkippedEvents() {
        try {
            return ((Long) ManagementFactory.getPlatformMBeanServer().getAttribute(getStreamingMetricsObjectName(getConnectorName(), this.DATABASE.getServerName(), "streaming"), "NumberOfSkippedEvents")).longValue();
        } catch (Exception e) {
            throw new DebeziumException(e);
        }
    }

    protected Configuration.Builder simpleConfig() {
        return this.DATABASE.defaultConfig().with(BinlogConnectorConfig.USER, "replicator").with(BinlogConnectorConfig.PASSWORD, "replpass").with(BinlogConnectorConfig.INCLUDE_SCHEMA_CHANGES, false).with(BinlogConnectorConfig.INCLUDE_SQL_QUERY, false).with(BinlogConnectorConfig.SNAPSHOT_MODE, BinlogConnectorConfig.SnapshotMode.NEVER);
    }

    @Test
    public void shouldCreateSnapshotOfSingleDatabase() throws Exception {
        this.config = simpleConfig().build();
        start(getConnectorClass(), this.config);
        Assertions.assertThat(consumeAtLeast(28)).isGreaterThanOrEqualTo(28);
        List sourceRecords = this.store.sourceRecords();
        PrintStream printStream = System.out;
        Objects.requireNonNull(printStream);
        sourceRecords.forEach((v1) -> {
            r1.println(v1);
        });
        Assertions.assertThat(this.schemaChanges.recordCount()).isEqualTo(0);
        Assertions.assertThat(this.store.collectionCount()).isEqualTo(5);
        KeyValueStore.Collection collection = this.store.collection(this.DATABASE.getDatabaseName(), productsTableName());
        Assertions.assertThat(collection.numberOfCreates()).isEqualTo(9L);
        Assertions.assertThat(collection.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat(collection.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat(collection.numberOfReads()).isEqualTo(0L);
        Assertions.assertThat(collection.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat(collection.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat(collection.numberOfValueSchemaChanges()).isEqualTo(1L);
        KeyValueStore.Collection collection2 = this.store.collection(this.DATABASE.getDatabaseName(), "products_on_hand");
        Assertions.assertThat(collection2.numberOfCreates()).isEqualTo(9L);
        Assertions.assertThat(collection2.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat(collection2.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat(collection2.numberOfReads()).isEqualTo(0L);
        Assertions.assertThat(collection2.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat(collection2.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat(collection2.numberOfValueSchemaChanges()).isEqualTo(1L);
        KeyValueStore.Collection collection3 = this.store.collection(this.DATABASE.getDatabaseName(), "customers");
        Assertions.assertThat(collection3.numberOfCreates()).isEqualTo(4L);
        Assertions.assertThat(collection3.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat(collection3.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat(collection3.numberOfReads()).isEqualTo(0L);
        Assertions.assertThat(collection3.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat(collection3.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat(collection3.numberOfValueSchemaChanges()).isEqualTo(1L);
        KeyValueStore.Collection collection4 = this.store.collection(this.DATABASE.getDatabaseName(), "orders");
        Assertions.assertThat(collection4.numberOfCreates()).isEqualTo(5L);
        Assertions.assertThat(collection4.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat(collection4.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat(collection4.numberOfReads()).isEqualTo(0L);
        Assertions.assertThat(collection4.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat(collection4.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat(collection4.numberOfValueSchemaChanges()).isEqualTo(1L);
    }

    @Test
    public void shouldCreateSnapshotOfSingleDatabaseWithSchemaChanges() throws Exception {
        this.config = simpleConfig().with(BinlogConnectorConfig.INCLUDE_SCHEMA_CHANGES, true).build();
        start(getConnectorClass(), this.config);
        int i = 28 + 7;
        Assertions.assertThat(consumeAtLeast(i)).isGreaterThanOrEqualTo(i);
        Assertions.assertThat(this.schemaChanges.recordCount()).isEqualTo(7);
        List asList = Arrays.asList(null, "Products", "Products", "products_on_hand", "customers", "orders", "dbz_342_timetest");
        ArrayList arrayList = new ArrayList();
        this.schemaChanges.forEach(sourceRecord -> {
            arrayList.add(((Struct) sourceRecord.value()).getStruct("source").getString("table"));
            Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("source").get("db")).isEqualTo(this.DATABASE.getDatabaseName());
        });
        Assertions.assertThat(arrayList).isEqualTo(asList);
        Assertions.assertThat(this.store.collectionCount()).isEqualTo(5);
        KeyValueStore.Collection collection = this.store.collection(this.DATABASE.getDatabaseName(), productsTableName());
        Assertions.assertThat(collection.numberOfCreates()).isEqualTo(9L);
        Assertions.assertThat(collection.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat(collection.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat(collection.numberOfReads()).isEqualTo(0L);
        Assertions.assertThat(collection.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat(collection.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat(collection.numberOfValueSchemaChanges()).isEqualTo(1L);
        KeyValueStore.Collection collection2 = this.store.collection(this.DATABASE.getDatabaseName(), "products_on_hand");
        Assertions.assertThat(collection2.numberOfCreates()).isEqualTo(9L);
        Assertions.assertThat(collection2.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat(collection2.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat(collection2.numberOfReads()).isEqualTo(0L);
        Assertions.assertThat(collection2.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat(collection2.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat(collection2.numberOfValueSchemaChanges()).isEqualTo(1L);
        KeyValueStore.Collection collection3 = this.store.collection(this.DATABASE.getDatabaseName(), "customers");
        Assertions.assertThat(collection3.numberOfCreates()).isEqualTo(4L);
        Assertions.assertThat(collection3.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat(collection3.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat(collection3.numberOfReads()).isEqualTo(0L);
        Assertions.assertThat(collection3.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat(collection3.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat(collection3.numberOfValueSchemaChanges()).isEqualTo(1L);
        KeyValueStore.Collection collection4 = this.store.collection(this.DATABASE.getDatabaseName(), "orders");
        Assertions.assertThat(collection4.numberOfCreates()).isEqualTo(5L);
        Assertions.assertThat(collection4.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat(collection4.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat(collection4.numberOfReads()).isEqualTo(0L);
        Assertions.assertThat(collection4.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat(collection4.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat(collection4.numberOfValueSchemaChanges()).isEqualTo(1L);
    }

    @Test
    @FixFor({"DBZ-1206"})
    public void shouldFilterAllRecordsBasedOnDatabaseIncludeListFilter() throws Exception {
        this.config = simpleConfig().with(BinlogConnectorConfig.DATABASE_INCLUDE_LIST, "db-does-not-exist").build();
        start(getConnectorClass(), this.config);
        waitForStreamingRunning(getConnectorName(), this.DATABASE.getServerName(), "streaming");
        Assertions.assertThat(filterAtLeast(35, 20L, TimeUnit.SECONDS)).isGreaterThanOrEqualTo(35L);
        Assertions.assertThat(this.schemaChanges.recordCount()).isEqualTo(0);
        Assertions.assertThat(this.store.collectionCount()).isEqualTo(0);
        Assertions.assertThat(getNumberOfSkippedEvents()).isEqualTo(0L);
    }

    /* JADX WARN: Type inference failed for: r0v24, types: [java.time.ZonedDateTime] */
    @Test
    @FixFor({"DBZ-183"})
    public void shouldHandleTimestampTimezones() throws Exception {
        UniqueDatabase withDbHistoryPath = TestHelper.getUniqueDatabase("logical_server_name", "regression_test").withDbHistoryPath(SCHEMA_HISTORY_PATH);
        withDbHistoryPath.createAndInitialize();
        this.config = simpleConfig().with(BinlogConnectorConfig.INCLUDE_SCHEMA_CHANGES, false).with(BinlogConnectorConfig.DATABASE_INCLUDE_LIST, withDbHistoryPath.getDatabaseName()).with(BinlogConnectorConfig.TABLE_INCLUDE_LIST, withDbHistoryPath.qualifiedTableName("dbz_85_fractest")).build();
        start(getConnectorClass(), this.config);
        consumeAtLeast(1);
        String str = isMariaDb() ? "2014-09-08T17:51:04.77" : "2014-09-08T17:51:04.780";
        List sourceRecords = this.store.sourceRecords();
        Assertions.assertThat(sourceRecords.size()).isEqualTo(1);
        Assertions.assertThat(((Struct) ((SourceRecord) sourceRecords.get(0)).value()).getStruct("after").getString("c4")).isEqualTo(ZonedDateTime.of(LocalDateTime.parse(str), withDbHistoryPath.getTimezone()).withZoneSameInstant((ZoneId) ZoneOffset.UTC).format(ZonedTimestamp.FORMATTER));
    }

    @Test
    @FixFor({"DBZ-342"})
    public void shouldHandleMySQLTimeCorrectly() throws Exception {
        UniqueDatabase withDbHistoryPath = TestHelper.getUniqueDatabase("logical_server_name", "regression_test").withDbHistoryPath(SCHEMA_HISTORY_PATH);
        withDbHistoryPath.createAndInitialize();
        this.config = simpleConfig().with(BinlogConnectorConfig.INCLUDE_SCHEMA_CHANGES, false).with(BinlogConnectorConfig.DATABASE_INCLUDE_LIST, withDbHistoryPath.getDatabaseName()).with(BinlogConnectorConfig.TABLE_INCLUDE_LIST, withDbHistoryPath.qualifiedTableName("dbz_342_timetest")).build();
        start(getConnectorClass(), this.config);
        consumeAtLeast(1);
        List sourceRecords = this.store.sourceRecords();
        Assertions.assertThat(sourceRecords.size()).isEqualTo(1);
        Struct struct = ((Struct) ((SourceRecord) sourceRecords.get(0)).value()).getStruct("after");
        String str = "PT517H51M4.78S";
        long j = 1864264780000000L;
        int i = 780;
        if (isMariaDb()) {
            str = "PT517H51M4.77S";
            j = 1864264770000000L;
            i = 770;
        }
        Duration ofNanos = Duration.ofNanos(struct.getInt64("c1").longValue() * 1000);
        Duration duration = toDuration(str);
        Assert.assertEquals(duration, ofNanos);
        Assert.assertEquals(duration.toNanos(), ofNanos.toNanos());
        Assertions.assertThat(ofNanos.toNanos()).isEqualTo(j);
        Assertions.assertThat(ofNanos).isEqualTo(Duration.ofHours(517L).plusMinutes(51L).plusSeconds(4L).plusMillis(i));
        Duration ofNanos2 = Duration.ofNanos(struct.getInt64("c2").longValue() * 1000);
        Duration duration2 = toDuration("-PT13H14M50S");
        Assert.assertEquals(duration2, ofNanos2);
        Assert.assertEquals(duration2.toNanos(), ofNanos2.toNanos());
        Assertions.assertThat(ofNanos2.toNanos()).isEqualTo(-47690000000000L);
        Assert.assertTrue(ofNanos2.isNegative());
        Assertions.assertThat(ofNanos2).isEqualTo(Duration.ofHours(-13L).minusMinutes(14L).minusSeconds(50L));
        Duration ofNanos3 = Duration.ofNanos(struct.getInt64("c3").longValue() * 1000);
        Duration duration3 = toDuration("-PT733H0M0.001S");
        Assert.assertEquals(duration3, ofNanos3);
        Assert.assertEquals(duration3.toNanos(), ofNanos3.toNanos());
        Assertions.assertThat(ofNanos3.toNanos()).isEqualTo(-2638800001000000L);
        Assert.assertTrue(ofNanos3.isNegative());
        Assertions.assertThat(ofNanos3).isEqualTo(Duration.ofHours(-733L).minusMillis(1L));
        Duration ofNanos4 = Duration.ofNanos(struct.getInt64("c4").longValue() * 1000);
        Duration duration4 = toDuration("-PT1H59M59.001S");
        Assert.assertEquals(duration4, ofNanos4);
        Assert.assertEquals(duration4.toNanos(), ofNanos4.toNanos());
        Assertions.assertThat(ofNanos4.toNanos()).isEqualTo(-7199001000000L);
        Assert.assertTrue(ofNanos4.isNegative());
        Assertions.assertThat(ofNanos4).isEqualTo(Duration.ofHours(-1L).minusMinutes(59L).minusSeconds(59L).minusMillis(1L));
        Duration ofNanos5 = Duration.ofNanos(struct.getInt64("c5").longValue() * 1000);
        Duration duration5 = toDuration("-PT838H59M58.999999S");
        Assert.assertEquals(duration5, ofNanos5);
        Assert.assertEquals(duration5.toNanos(), ofNanos5.toNanos());
        Assertions.assertThat(ofNanos5.toNanos()).isEqualTo(-3020398999999000L);
        Assert.assertTrue(ofNanos5.isNegative());
        Assertions.assertThat(ofNanos5).isEqualTo(Duration.ofHours(-838L).minusMinutes(59L).minusSeconds(58L).minusNanos(999999000L));
        Duration ofNanos6 = Duration.ofNanos(struct.getInt64("c6").longValue() * 1000);
        Duration duration6 = toDuration("-PT00H20M38.000000S");
        Assert.assertEquals(duration6, ofNanos6);
        Assert.assertEquals(duration6.toNanos(), ofNanos6.toNanos());
        Assertions.assertThat(ofNanos6.toNanos()).isEqualTo(-1238000000000L);
        Assert.assertTrue(ofNanos6.isNegative());
        Assertions.assertThat(ofNanos6).isEqualTo(Duration.ofHours(0L).negated().minusMinutes(20L).minusSeconds(38L).minusNanos(0L));
        Duration ofNanos7 = Duration.ofNanos(struct.getInt64("c7").longValue() * 1000);
        Duration duration7 = toDuration("-PT01H01M01.000001S");
        Assert.assertEquals(duration7, ofNanos7);
        Assert.assertEquals(duration7.toNanos(), ofNanos7.toNanos());
        Assertions.assertThat(ofNanos7.toNanos()).isEqualTo(-3661000001000L);
        Assert.assertTrue(ofNanos7.isNegative());
        Assertions.assertThat(ofNanos7).isEqualTo(Duration.ofHours(-1L).minusMinutes(1L).minusSeconds(1L).minusNanos(1000L));
        Duration ofNanos8 = Duration.ofNanos(struct.getInt64("c8").longValue() * 1000);
        Duration duration8 = toDuration("-PT01H01M01.000000S");
        Assert.assertEquals(duration8, ofNanos8);
        Assert.assertEquals(duration8.toNanos(), ofNanos8.toNanos());
        Assertions.assertThat(ofNanos8.toNanos()).isEqualTo(-3661000000000L);
        Assert.assertTrue(ofNanos8.isNegative());
        Assertions.assertThat(ofNanos8).isEqualTo(Duration.ofHours(-1L).minusMinutes(1L).minusSeconds(1L).minusNanos(0L));
        Duration ofNanos9 = Duration.ofNanos(struct.getInt64("c9").longValue() * 1000);
        Duration duration9 = toDuration("-PT01H01M00.000000S");
        Assert.assertEquals(duration9, ofNanos9);
        Assert.assertEquals(duration9.toNanos(), ofNanos9.toNanos());
        Assertions.assertThat(ofNanos9.toNanos()).isEqualTo(-3660000000000L);
        Assert.assertTrue(ofNanos9.isNegative());
        Assertions.assertThat(ofNanos9).isEqualTo(Duration.ofHours(-1L).minusMinutes(1L).minusSeconds(0L).minusNanos(0L));
        Duration ofNanos10 = Duration.ofNanos(struct.getInt64("c10").longValue() * 1000);
        Duration duration10 = toDuration("-PT01H00M00.000000S");
        Assert.assertEquals(duration10, ofNanos10);
        Assert.assertEquals(duration10.toNanos(), ofNanos10.toNanos());
        Assertions.assertThat(ofNanos10.toNanos()).isEqualTo(-3600000000000L);
        Assert.assertTrue(ofNanos10.isNegative());
        Assertions.assertThat(ofNanos10).isEqualTo(Duration.ofHours(-1L).minusMinutes(0L).minusSeconds(0L).minusNanos(0L));
        Duration ofNanos11 = Duration.ofNanos(struct.getInt64("c11").longValue() * 1000);
        Duration duration11 = toDuration("PT00H00M00.000000S");
        Assert.assertEquals(duration11, ofNanos11);
        Assert.assertEquals(duration11.toNanos(), ofNanos11.toNanos());
        Assertions.assertThat(ofNanos11.toNanos()).isEqualTo(0L);
        Assert.assertTrue(ofNanos11.isZero());
        Assertions.assertThat(ofNanos11).isEqualTo(Duration.ofHours(0L).minusMinutes(0L).minusSeconds(0L).minusNanos(0L));
    }

    @Test(expected = ConnectException.class)
    public void shouldFailOnSchemaInconsistency() throws Exception {
        inconsistentSchema(null);
    }

    @Test
    public void shouldWarnOnSchemaInconsistency() throws Exception {
        inconsistentSchema(CommonConnectorConfig.EventProcessingFailureHandlingMode.WARN);
    }

    @Test
    public void shouldIgnoreOnSchemaInconsistency() throws Exception {
        inconsistentSchema(CommonConnectorConfig.EventProcessingFailureHandlingMode.SKIP);
    }

    @Test
    @FixFor({"DBZ-4029"})
    public void testHeartbeatActionQueryExecuted() throws Exception {
        this.config = simpleConfig().with(BinlogConnectorConfig.USER, "snapper").with(BinlogConnectorConfig.PASSWORD, "snapperpass").with("__debezium-heartbeat", "myheartbeat").with(Heartbeat.HEARTBEAT_INTERVAL, "100").with("heartbeat.action.query", String.format("INSERT INTO %s.test_heartbeat_table (text) VALUES ('test_heartbeat');", this.DATABASE.getDatabaseName())).build();
        BinlogTestConnection testDatabaseConnection = getTestDatabaseConnection(this.DATABASE.getDatabaseName());
        try {
            testDatabaseConnection.execute(new String[]{"CREATE TABLE test_heartbeat_table (text TEXT);"});
            if (testDatabaseConnection != null) {
                testDatabaseConnection.close();
            }
            AtomicReference atomicReference = new AtomicReference();
            start(getConnectorClass(), this.config, (z, str, th) -> {
                atomicReference.set(th);
            });
            waitForStreamingRunning(getConnectorName(), this.DATABASE.getServerName(), "streaming");
            String format = String.format("SELECT COUNT(*) FROM %s.test_heartbeat_table;", this.DATABASE.getDatabaseName());
            JdbcConnection.ResultSetMapper resultSetMapper = resultSet -> {
                resultSet.next();
                return Integer.valueOf(resultSet.getInt(1));
            };
            Awaitility.await().alias("Awaiting heartbeat action query insert").pollInterval(100L, TimeUnit.MILLISECONDS).atMost(waitTimeForRecords() * 30, TimeUnit.SECONDS).until(() -> {
                BinlogTestConnection testDatabaseConnection2 = getTestDatabaseConnection(this.DATABASE.getDatabaseName());
                try {
                    Boolean valueOf = Boolean.valueOf(((Integer) testDatabaseConnection2.queryAndMap(format, resultSetMapper)).intValue() > 0);
                    if (testDatabaseConnection2 != null) {
                        testDatabaseConnection2.close();
                    }
                    return valueOf;
                } catch (Throwable th2) {
                    if (testDatabaseConnection2 != null) {
                        try {
                            testDatabaseConnection2.close();
                        } catch (Throwable th3) {
                            th2.addSuppressed(th3);
                        }
                    }
                    throw th2;
                }
            });
        } catch (Throwable th2) {
            if (testDatabaseConnection != null) {
                try {
                    testDatabaseConnection.close();
                } catch (Throwable th3) {
                    th2.addSuppressed(th3);
                }
            }
            throw th2;
        }
    }

    private void inconsistentSchema(CommonConnectorConfig.EventProcessingFailureHandlingMode eventProcessingFailureHandlingMode) throws InterruptedException, SQLException {
        LogInterceptor logInterceptor = new LogInterceptor(BinlogStreamingChangeEventSource.class);
        Configuration.Builder with = simpleConfig().with(SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL, true).with(BinlogConnectorConfig.TABLE_INCLUDE_LIST, this.DATABASE.qualifiedTableName("orders"));
        if (eventProcessingFailureHandlingMode == null) {
            this.config = with.build();
        } else {
            this.config = with.with(BinlogConnectorConfig.INCONSISTENT_SCHEMA_HANDLING_MODE, eventProcessingFailureHandlingMode).build();
        }
        start(getConnectorClass(), this.config);
        Assertions.assertThat(consumeAtLeast(5)).isGreaterThanOrEqualTo(5);
        stopConnector();
        this.config = with.with(BinlogConnectorConfig.TABLE_INCLUDE_LIST, this.DATABASE.qualifiedTableName("orders") + "," + this.DATABASE.qualifiedTableName("customers")).build();
        AtomicReference atomicReference = new AtomicReference();
        start(getConnectorClass(), this.config, (z, str, th) -> {
            atomicReference.set(th);
        });
        BinlogTestConnection testDatabaseConnection = getTestDatabaseConnection(this.DATABASE.getDatabaseName());
        try {
            JdbcConnection connect = testDatabaseConnection.connect();
            try {
                Connection connection = connect.connection();
                try {
                    Statement createStatement = connection.createStatement();
                    if (eventProcessingFailureHandlingMode == null) {
                        try {
                            waitForStreamingRunning(getConnectorName(), this.DATABASE.getServerName(), "streaming");
                        } catch (Throwable th2) {
                            if (createStatement != null) {
                                try {
                                    createStatement.close();
                                } catch (Throwable th3) {
                                    th2.addSuppressed(th3);
                                }
                            }
                            throw th2;
                        }
                    }
                    createStatement.executeUpdate("INSERT INTO customers VALUES (default,'John','Lazy','john.lazy@acme.com')");
                    if (createStatement != null) {
                        createStatement.close();
                    }
                    if (connection != null) {
                        connection.close();
                    }
                    if (connect != null) {
                        connect.close();
                    }
                    if (testDatabaseConnection != null) {
                        testDatabaseConnection.close();
                    }
                    if (eventProcessingFailureHandlingMode == null) {
                        Awaitility.await().atMost(Duration.ofSeconds(waitTimeForRecords())).until(() -> {
                            return Boolean.valueOf(logInterceptor.containsMessage("Error during binlog processing."));
                        });
                        waitForEngineShutdown();
                    } else {
                        waitForStreamingRunning(getConnectorName(), this.DATABASE.getServerName(), "streaming");
                    }
                    stopConnector();
                    Throwable th4 = (Throwable) atomicReference.get();
                    if (th4 != null) {
                        throw ((RuntimeException) th4);
                    }
                } catch (Throwable th5) {
                    if (connection != null) {
                        try {
                            connection.close();
                        } catch (Throwable th6) {
                            th5.addSuppressed(th6);
                        }
                    }
                    throw th5;
                }
            } finally {
            }
        } catch (Throwable th7) {
            if (testDatabaseConnection != null) {
                try {
                    testDatabaseConnection.close();
                } catch (Throwable th8) {
                    th7.addSuppressed(th8);
                }
            }
            throw th7;
        }
    }

    private Duration toDuration(String str) {
        return Duration.parse(str);
    }

    private String productsTableName() throws SQLException {
        BinlogTestConnection testDatabaseConnection = getTestDatabaseConnection(this.DATABASE.getDatabaseName());
        try {
            String str = testDatabaseConnection.isTableIdCaseSensitive() ? "products" : "Products";
            if (testDatabaseConnection != null) {
                testDatabaseConnection.close();
            }
            return str;
        } catch (Throwable th) {
            if (testDatabaseConnection != null) {
                try {
                    testDatabaseConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }
}
