package io.debezium.connector.binlog;

import io.debezium.config.Configuration;
import io.debezium.connector.binlog.BinlogConnectorConfig;
import io.debezium.connector.binlog.util.BinlogTestConnection;
import io.debezium.connector.binlog.util.TestHelper;
import io.debezium.connector.binlog.util.UniqueDatabase;
import io.debezium.doc.FixFor;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.junit.EqualityCheck;
import io.debezium.junit.SkipWhenDatabaseVersion;
import io.debezium.util.Testing;
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

@SkipWhenDatabaseVersion(check = EqualityCheck.LESS_THAN, major = 5, minor = BinlogSchemaValidateIT.INITIAL_EVENT_COUNT, reason = "DDL uses fractional second data types, not supported until MySQL 5.6")
/* loaded from: input_file:io/debezium/connector/binlog/BinlogSchemaValidateIT.class */
public abstract class BinlogSchemaValidateIT<C extends SourceConnector> extends AbstractBinlogConnectorIT<C> {
    private static final Path DB_HISTORY_PATH = Testing.Files.createTestingPath("file-db-history-connect.txt").toAbsolutePath();
    private final UniqueDatabase DATABASE = TestHelper.getUniqueDatabase("sql_bin_log_off", "sql_bin_log_off_test").withDbHistoryPath(DB_HISTORY_PATH);
    private Configuration config;
    private static final int INITIAL_EVENT_COUNT = 6;

    @Before
    public void beforeEach() {
        stopConnector();
        this.DATABASE.createAndInitialize();
        initializeConnectorTestFramework();
        Testing.Files.delete(DB_HISTORY_PATH);
    }

    @After
    public void afterEach() {
        try {
            try {
                stopConnector();
                Testing.Files.delete(DB_HISTORY_PATH);
            } catch (IllegalStateException e) {
                if (!e.getMessage().startsWith("Engine is already being shutting down")) {
                    throw e;
                }
                Testing.Files.delete(DB_HISTORY_PATH);
            }
        } catch (Throwable th) {
            Testing.Files.delete(DB_HISTORY_PATH);
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-7093"})
    public void shouldRecoverToSyncSchemaWhenAddColumnToEndWithSqlLogBinIsOff() throws Exception {
        this.config = this.DATABASE.defaultConfig().with(BinlogConnectorConfig.INCLUDE_SCHEMA_CHANGES, true).with(BinlogConnectorConfig.SNAPSHOT_MODE, BinlogConnectorConfig.SnapshotMode.NO_DATA).build();
        AtomicReference atomicReference = new AtomicReference();
        start(getConnectorClass(), this.config, (z, str, th) -> {
            atomicReference.set(th);
        });
        waitForSnapshotToBeCompleted(getConnectorName(), this.DATABASE.getServerName());
        boolean equals = System.getProperty("database.port", "3306").equals(System.getProperty("database.replica.port", "3306"));
        if (!equals) {
            Thread.sleep(5000L);
        }
        alterTableWithSqlBinLogOff("ALTER TABLE dbz7093 ADD newcol VARCHAR(20);", equals);
        BinlogTestConnection testDatabaseConnection = getTestDatabaseConnection(this.DATABASE.getDatabaseName());
        try {
            JdbcConnection connect = testDatabaseConnection.connect();
            try {
                connect.execute(new String[]{"INSERT INTO dbz7093(id, age, name, newcol) VALUES (201, 1,'name1','newcol1');"});
                connect.execute(new String[]{"UPDATE dbz7093 SET age=2, name='name2', newcol='newcol2' WHERE id=201"});
                connect.execute(new String[]{"DELETE FROM dbz7093 WHERE id=201"});
                if (connect != null) {
                    connect.close();
                }
                if (testDatabaseConnection != null) {
                    testDatabaseConnection.close();
                }
                waitForEngineShutdown();
                stopConnector();
                if (((Throwable) atomicReference.get()) == null) {
                    Assert.fail();
                }
                Testing.Files.delete(DB_HISTORY_PATH);
                this.config = this.DATABASE.defaultConfig().with(BinlogConnectorConfig.INCLUDE_SCHEMA_CHANGES, true).with(BinlogConnectorConfig.SNAPSHOT_MODE, BinlogConnectorConfig.SnapshotMode.RECOVERY).build();
                start(getConnectorClass(), this.config, (z2, str2, th2) -> {
                    atomicReference.set(th2);
                });
                Assertions.assertThat(consumeRecordsByTopic(INITIAL_EVENT_COUNT).allRecordsInOrder().size()).isEqualTo(INITIAL_EVENT_COUNT);
                Assertions.assertThat(consumeRecordsByTopic(INITIAL_EVENT_COUNT).allRecordsInOrder().size()).isEqualTo(INITIAL_EVENT_COUNT);
                List recordsForTopic = consumeRecordsByTopic(4).recordsForTopic(this.DATABASE.topicForTable("dbz7093"));
                Assertions.assertThat(recordsForTopic.size()).isEqualTo(4);
                SourceRecord sourceRecord = (SourceRecord) recordsForTopic.get(0);
                assertInsert(sourceRecord, "id", 201);
                assertValueField(sourceRecord, "after/age", 1);
                assertValueField(sourceRecord, "after/name", "name1");
                assertValueField(sourceRecord, "after/newcol", "newcol1");
                SourceRecord sourceRecord2 = (SourceRecord) recordsForTopic.get(1);
                assertUpdate(sourceRecord2, "id", 201);
                assertValueField(sourceRecord2, "before/age", 1);
                assertValueField(sourceRecord2, "before/name", "name1");
                assertValueField(sourceRecord2, "before/newcol", "newcol1");
                assertValueField(sourceRecord2, "after/age", 2);
                assertValueField(sourceRecord2, "after/name", "name2");
                assertValueField(sourceRecord2, "after/newcol", "newcol2");
                SourceRecord sourceRecord3 = (SourceRecord) recordsForTopic.get(2);
                assertDelete(sourceRecord3, "id", 201);
                assertValueField(sourceRecord3, "before/age", 2);
                assertValueField(sourceRecord3, "before/name", "name2");
                assertValueField(sourceRecord3, "before/newcol", "newcol2");
                assertTombstone((SourceRecord) recordsForTopic.get(3));
            } finally {
            }
        } catch (Throwable th3) {
            if (testDatabaseConnection != null) {
                try {
                    testDatabaseConnection.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    @Test
    @FixFor({"DBZ-7093"})
    public void shouldRecoverToSyncSchemaWhenAddColumnInMiddleWithSqlLogBinIsOff() throws Exception {
        this.config = this.DATABASE.defaultConfig().with(BinlogConnectorConfig.INCLUDE_SCHEMA_CHANGES, true).with(BinlogConnectorConfig.SNAPSHOT_MODE, BinlogConnectorConfig.SnapshotMode.NO_DATA).build();
        AtomicReference atomicReference = new AtomicReference();
        start(getConnectorClass(), this.config, (z, str, th) -> {
            atomicReference.set(th);
        });
        waitForSnapshotToBeCompleted(getConnectorName(), this.DATABASE.getServerName());
        boolean equals = System.getProperty("database.port", "3306").equals(System.getProperty("database.replica.port", "3306"));
        if (!equals) {
            Thread.sleep(5000L);
        }
        alterTableWithSqlBinLogOff("ALTER TABLE dbz7093 ADD newcol VARCHAR(20) AFTER age;", equals);
        BinlogTestConnection testDatabaseConnection = getTestDatabaseConnection(this.DATABASE.getDatabaseName());
        try {
            JdbcConnection connect = testDatabaseConnection.connect();
            try {
                connect.execute(new String[]{"INSERT INTO dbz7093(id, age, name, newcol) VALUES (201, 1,'name1','newcol1');"});
                connect.execute(new String[]{"UPDATE dbz7093 SET age=2, name='name2', newcol='newcol2' WHERE id=201"});
                connect.execute(new String[]{"DELETE FROM dbz7093 WHERE id=201"});
                if (connect != null) {
                    connect.close();
                }
                if (testDatabaseConnection != null) {
                    testDatabaseConnection.close();
                }
                waitForEngineShutdown();
                stopConnector();
                if (((Throwable) atomicReference.get()) == null) {
                    Assert.fail();
                }
                Testing.Files.delete(DB_HISTORY_PATH);
                this.config = this.DATABASE.defaultConfig().with(BinlogConnectorConfig.INCLUDE_SCHEMA_CHANGES, true).with(BinlogConnectorConfig.SNAPSHOT_MODE, BinlogConnectorConfig.SnapshotMode.RECOVERY).build();
                start(getConnectorClass(), this.config, (z2, str2, th2) -> {
                    atomicReference.set(th2);
                });
                Assertions.assertThat(consumeRecordsByTopic(INITIAL_EVENT_COUNT).allRecordsInOrder().size()).isEqualTo(INITIAL_EVENT_COUNT);
                Assertions.assertThat(consumeRecordsByTopic(INITIAL_EVENT_COUNT).allRecordsInOrder().size()).isEqualTo(INITIAL_EVENT_COUNT);
                List recordsForTopic = consumeRecordsByTopic(4).recordsForTopic(this.DATABASE.topicForTable("dbz7093"));
                Assertions.assertThat(recordsForTopic.size()).isEqualTo(4);
                SourceRecord sourceRecord = (SourceRecord) recordsForTopic.get(0);
                assertInsert(sourceRecord, "id", 201);
                assertValueField(sourceRecord, "after/age", 1);
                assertValueField(sourceRecord, "after/name", "name1");
                assertValueField(sourceRecord, "after/newcol", "newcol1");
                SourceRecord sourceRecord2 = (SourceRecord) recordsForTopic.get(1);
                assertUpdate(sourceRecord2, "id", 201);
                assertValueField(sourceRecord2, "before/age", 1);
                assertValueField(sourceRecord2, "before/name", "name1");
                assertValueField(sourceRecord2, "before/newcol", "newcol1");
                assertValueField(sourceRecord2, "after/age", 2);
                assertValueField(sourceRecord2, "after/name", "name2");
                assertValueField(sourceRecord2, "after/newcol", "newcol2");
                SourceRecord sourceRecord3 = (SourceRecord) recordsForTopic.get(2);
                assertDelete(sourceRecord3, "id", 201);
                assertValueField(sourceRecord3, "before/age", 2);
                assertValueField(sourceRecord3, "before/name", "name2");
                assertValueField(sourceRecord3, "before/newcol", "newcol2");
                assertTombstone((SourceRecord) recordsForTopic.get(3));
            } finally {
            }
        } catch (Throwable th3) {
            if (testDatabaseConnection != null) {
                try {
                    testDatabaseConnection.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    @Test
    @FixFor({"DBZ-7093"})
    public void shouldRecoverToSyncSchemaWhenDropColumnWithSqlLogBinIsOff() throws Exception {
        this.config = this.DATABASE.defaultConfig().with(BinlogConnectorConfig.INCLUDE_SCHEMA_CHANGES, true).with(BinlogConnectorConfig.SNAPSHOT_MODE, BinlogConnectorConfig.SnapshotMode.NO_DATA).build();
        AtomicReference atomicReference = new AtomicReference();
        start(getConnectorClass(), this.config, (z, str, th) -> {
            atomicReference.set(th);
        });
        waitForSnapshotToBeCompleted(getConnectorName(), this.DATABASE.getServerName());
        boolean equals = System.getProperty("database.port", "3306").equals(System.getProperty("database.replica.port", "3306"));
        if (!equals) {
            Thread.sleep(5000L);
        }
        alterTableWithSqlBinLogOff("ALTER TABLE dbz7093 DROP age;", equals);
        BinlogTestConnection testDatabaseConnection = getTestDatabaseConnection(this.DATABASE.getDatabaseName());
        try {
            JdbcConnection connect = testDatabaseConnection.connect();
            try {
                connect.execute(new String[]{"INSERT INTO dbz7093(id, name) VALUES (201, 'name1');"});
                connect.execute(new String[]{"UPDATE dbz7093 SET name='name2' WHERE id=201;"});
                connect.execute(new String[]{"DELETE FROM dbz7093 WHERE id=201;"});
                if (connect != null) {
                    connect.close();
                }
                if (testDatabaseConnection != null) {
                    testDatabaseConnection.close();
                }
                waitForEngineShutdown();
                stopConnector();
                if (((Throwable) atomicReference.get()) == null) {
                    Assert.fail();
                }
                Testing.Files.delete(DB_HISTORY_PATH);
                this.config = this.DATABASE.defaultConfig().with(BinlogConnectorConfig.INCLUDE_SCHEMA_CHANGES, true).with(BinlogConnectorConfig.SNAPSHOT_MODE, BinlogConnectorConfig.SnapshotMode.RECOVERY).build();
                start(getConnectorClass(), this.config, (z2, str2, th2) -> {
                    atomicReference.set(th2);
                });
                Assertions.assertThat(consumeRecordsByTopic(INITIAL_EVENT_COUNT).allRecordsInOrder().size()).isEqualTo(INITIAL_EVENT_COUNT);
                Assertions.assertThat(consumeRecordsByTopic(INITIAL_EVENT_COUNT).allRecordsInOrder().size()).isEqualTo(INITIAL_EVENT_COUNT);
                List recordsForTopic = consumeRecordsByTopic(4).recordsForTopic(this.DATABASE.topicForTable("dbz7093"));
                Assertions.assertThat(recordsForTopic.size()).isEqualTo(4);
                SourceRecord sourceRecord = (SourceRecord) recordsForTopic.get(0);
                assertInsert(sourceRecord, "id", 201);
                assertValueField(sourceRecord, "after/name", "name1");
                SourceRecord sourceRecord2 = (SourceRecord) recordsForTopic.get(1);
                assertUpdate(sourceRecord2, "id", 201);
                assertValueField(sourceRecord2, "before/name", "name1");
                assertValueField(sourceRecord2, "after/name", "name2");
                SourceRecord sourceRecord3 = (SourceRecord) recordsForTopic.get(2);
                assertDelete(sourceRecord3, "id", 201);
                assertValueField(sourceRecord3, "before/name", "name2");
                assertTombstone((SourceRecord) recordsForTopic.get(3));
            } finally {
            }
        } catch (Throwable th3) {
            if (testDatabaseConnection != null) {
                try {
                    testDatabaseConnection.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    @Test
    @FixFor({"DBZ-7093"})
    public void shouldRecoverToSyncSchemaWhenAddColumnToEndWithSqlLogBinIsOffAndColumnInclude() throws Exception {
        this.config = this.DATABASE.defaultConfig().with(BinlogConnectorConfig.INCLUDE_SCHEMA_CHANGES, true).with(BinlogConnectorConfig.COLUMN_INCLUDE_LIST, "dbz7093.id,dbz7093.newcol").with(BinlogConnectorConfig.SNAPSHOT_MODE, BinlogConnectorConfig.SnapshotMode.NO_DATA).build();
        AtomicReference atomicReference = new AtomicReference();
        start(getConnectorClass(), this.config, (z, str, th) -> {
            atomicReference.set(th);
        });
        waitForSnapshotToBeCompleted(getConnectorName(), this.DATABASE.getServerName());
        boolean equals = System.getProperty("database.port", "3306").equals(System.getProperty("database.replica.port", "3306"));
        if (!equals) {
            Thread.sleep(5000L);
        }
        alterTableWithSqlBinLogOff("ALTER TABLE dbz7093 ADD newcol VARCHAR(20);", equals);
        BinlogTestConnection testDatabaseConnection = getTestDatabaseConnection(this.DATABASE.getDatabaseName());
        try {
            JdbcConnection connect = testDatabaseConnection.connect();
            try {
                connect.execute(new String[]{"INSERT INTO dbz7093(id, age, name, newcol) VALUES (201, 1,'name1','newcol1');"});
                connect.execute(new String[]{"UPDATE dbz7093 SET newcol='newcol2' WHERE id=201;"});
                connect.execute(new String[]{"DELETE FROM dbz7093 WHERE id=201;"});
                if (connect != null) {
                    connect.close();
                }
                if (testDatabaseConnection != null) {
                    testDatabaseConnection.close();
                }
                waitForEngineShutdown();
                stopConnector();
                if (((Throwable) atomicReference.get()) == null) {
                    Assert.fail();
                }
                Testing.Files.delete(DB_HISTORY_PATH);
                this.config = this.DATABASE.defaultConfig().with(BinlogConnectorConfig.INCLUDE_SCHEMA_CHANGES, true).with(BinlogConnectorConfig.SNAPSHOT_MODE, BinlogConnectorConfig.SnapshotMode.RECOVERY).build();
                start(getConnectorClass(), this.config, (z2, str2, th2) -> {
                    atomicReference.set(th2);
                });
                Assertions.assertThat(consumeRecordsByTopic(INITIAL_EVENT_COUNT).allRecordsInOrder().size()).isEqualTo(INITIAL_EVENT_COUNT);
                Assertions.assertThat(consumeRecordsByTopic(INITIAL_EVENT_COUNT).allRecordsInOrder().size()).isEqualTo(INITIAL_EVENT_COUNT);
                List recordsForTopic = consumeRecordsByTopic(4).recordsForTopic(this.DATABASE.topicForTable("dbz7093"));
                Assertions.assertThat(recordsForTopic.size()).isEqualTo(4);
                SourceRecord sourceRecord = (SourceRecord) recordsForTopic.get(0);
                assertInsert(sourceRecord, "id", 201);
                assertValueField(sourceRecord, "after/newcol", "newcol1");
                SourceRecord sourceRecord2 = (SourceRecord) recordsForTopic.get(1);
                assertUpdate(sourceRecord2, "id", 201);
                assertValueField(sourceRecord2, "before/newcol", "newcol1");
                assertValueField(sourceRecord2, "after/newcol", "newcol2");
                SourceRecord sourceRecord3 = (SourceRecord) recordsForTopic.get(2);
                assertDelete(sourceRecord3, "id", 201);
                assertValueField(sourceRecord3, "before/newcol", "newcol2");
                assertTombstone((SourceRecord) recordsForTopic.get(3));
            } finally {
            }
        } catch (Throwable th3) {
            if (testDatabaseConnection != null) {
                try {
                    testDatabaseConnection.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    private void alterTableWithSqlBinLogOff(String str, boolean z) throws SQLException {
        BinlogTestConnection testDatabaseConnection = getTestDatabaseConnection(this.DATABASE.getDatabaseName());
        try {
            JdbcConnection connect = testDatabaseConnection.connect();
            try {
                connect.execute(new String[]{"SET SQL_LOG_BIN=OFF;"});
                connect.execute(new String[]{str});
                connect.execute(new String[]{"SET SQL_LOG_BIN=ON;"});
                if (connect != null) {
                    connect.close();
                }
                if (testDatabaseConnection != null) {
                    testDatabaseConnection.close();
                }
                if (z) {
                    return;
                }
                BinlogTestConnection testReplicaDatabaseConnection = getTestReplicaDatabaseConnection(this.DATABASE.getDatabaseName());
                try {
                    connect = testReplicaDatabaseConnection.connect();
                    try {
                        connect.execute(new String[]{"SET SQL_LOG_BIN=OFF;"});
                        connect.execute(new String[]{str});
                        connect.execute(new String[]{"SET SQL_LOG_BIN=ON;"});
                        if (connect != null) {
                            connect.close();
                        }
                        if (testReplicaDatabaseConnection != null) {
                            testReplicaDatabaseConnection.close();
                        }
                    } finally {
                    }
                } catch (Throwable th) {
                    if (testReplicaDatabaseConnection != null) {
                        try {
                            testReplicaDatabaseConnection.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } finally {
                if (connect != null) {
                    try {
                        connect.close();
                    } catch (Throwable th3) {
                        th.addSuppressed(th3);
                    }
                }
            }
        } catch (Throwable th4) {
            if (testDatabaseConnection != null) {
                try {
                    testDatabaseConnection.close();
                } catch (Throwable th5) {
                    th4.addSuppressed(th5);
                }
            }
            throw th4;
        }
    }
}
