package io.debezium.testing.system.tests.binlog;

import io.debezium.testing.system.assertions.KafkaAssertions;
import io.debezium.testing.system.tests.ConnectorTest;
import io.debezium.testing.system.tools.databases.SqlDatabaseController;
import io.debezium.testing.system.tools.kafka.ConnectorConfigBuilder;
import io.debezium.testing.system.tools.kafka.KafkaConnectController;
import io.debezium.testing.system.tools.kafka.KafkaController;
import java.sql.SQLException;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:io/debezium/testing/system/tests/binlog/BinlogDBTests.class */
public abstract class BinlogDBTests extends ConnectorTest {
    public BinlogDBTests(KafkaController kafkaController, KafkaConnectController kafkaConnectController, ConnectorConfigBuilder connectorConfigBuilder, KafkaAssertions<?, ?> kafkaAssertions) {
        super(kafkaController, kafkaConnectController, connectorConfigBuilder, kafkaAssertions);
    }

    public abstract String getDbUserName();

    public abstract String getDbPassword();

    public abstract SqlDatabaseController getDbController();

    public abstract void waitForSnapshot();

    public void insertCustomer(String str, String str2, String str3) throws SQLException {
        insertCustomer(getDbController(), str, str2, str3);
    }

    public void insertCustomer(SqlDatabaseController sqlDatabaseController, String str, String str2, String str3) throws SQLException {
        sqlDatabaseController.getDatabaseClient(getDbUserName(), getDbPassword()).execute("inventory", "INSERT INTO customers VALUES  (default, '" + str + "', '" + str2 + "', '" + str3 + "')");
    }

    public void renameCustomer(String str, String str2) throws SQLException {
        renameCustomer(getDbController(), str, str2);
    }

    public void renameCustomer(SqlDatabaseController sqlDatabaseController, String str, String str2) throws SQLException {
        sqlDatabaseController.getDatabaseClient(getDbUserName(), getDbPassword()).execute("inventory", "UPDATE customers SET first_name = '" + str2 + "' WHERE first_name = '" + str + "'");
    }

    public int getCustomerCount() throws SQLException {
        return getCustomerCount(getDbController());
    }

    public int getCustomerCount(SqlDatabaseController sqlDatabaseController) throws SQLException {
        return ((Integer) sqlDatabaseController.getDatabaseClient(getDbUserName(), getDbPassword()).executeQuery("inventory", "SELECT count(*) FROM customers", resultSet -> {
            try {
                resultSet.next();
                return Integer.valueOf(resultSet.getInt(1));
            } catch (SQLException e) {
                throw new RuntimeException(e);
            }
        })).intValue();
    }

    @Test
    @Order(10)
    public void shouldHaveRegisteredConnector() {
        Request build = new Request.Builder().url(this.connectController.getApiURL().resolve("/connectors")).build();
        KafkaAssertions.awaitAssert(() -> {
            Response execute = new OkHttpClient().newCall(build).execute();
            try {
                Assertions.assertThat(execute.body().string()).contains(new CharSequence[]{this.connectorConfig.getConnectorName()});
                if (execute != null) {
                    execute.close();
                }
            } catch (Throwable th) {
                if (execute != null) {
                    try {
                        execute.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        });
    }

    @Test
    @Order(20)
    public void shouldCreateKafkaTopics() {
        String dbServerName = this.connectorConfig.getDbServerName();
        this.assertions.assertTopicsExist(dbServerName + ".inventory.addresses", dbServerName + ".inventory.customers", dbServerName + ".inventory.geom", dbServerName + ".inventory.orders", dbServerName + ".inventory.products", dbServerName + ".inventory.products_on_hand");
    }

    @Test
    @Order(30)
    public void shouldSnapshotChanges() {
        waitForSnapshot();
        String str = this.connectorConfig.getDbServerName() + ".inventory.customers";
        KafkaAssertions.awaitAssert(() -> {
            this.assertions.assertRecordsCount(str, 4);
        });
    }

    @Test
    @Order(40)
    public void shouldStreamChanges() throws SQLException {
        insertCustomer("Tom", "Tester", "tom@test.com");
        String str = this.connectorConfig.getDbServerName() + ".inventory.customers";
        KafkaAssertions.awaitAssert(() -> {
            this.assertions.assertRecordsCount(str, 5);
        });
        KafkaAssertions.awaitAssert(() -> {
            this.assertions.assertRecordsContain(str, "tom@test.com");
        });
    }

    @Test
    @Order(41)
    public void shouldRerouteUpdates() throws SQLException {
        renameCustomer("Tom", "Thomas");
        String dbServerName = this.connectorConfig.getDbServerName();
        String str = dbServerName + ".u.customers";
        KafkaAssertions.awaitAssert(() -> {
            this.assertions.assertRecordsCount(dbServerName + ".inventory.customers", 5);
        });
        KafkaAssertions.awaitAssert(() -> {
            this.assertions.assertRecordsCount(str, 1);
        });
        KafkaAssertions.awaitAssert(() -> {
            this.assertions.assertRecordsContain(str, "Thomas");
        });
    }

    @Test
    @Order(50)
    public void shouldBeDown() throws Exception {
        this.connectController.undeployConnector(this.connectorConfig.getConnectorName());
        insertCustomer("Jerry", "Tester", "jerry@test.com");
        String str = this.connectorConfig.getDbServerName() + ".inventory.customers";
        KafkaAssertions.awaitAssert(() -> {
            this.assertions.assertRecordsCount(str, 5);
        });
    }

    @Test
    @Order(60)
    public void shouldResumeStreamingAfterRedeployment() throws Exception {
        this.connectController.deployConnector(this.connectorConfig);
        String str = this.connectorConfig.getDbServerName() + ".inventory.customers";
        KafkaAssertions.awaitAssert(() -> {
            this.assertions.assertRecordsCount(str, 6);
        });
        KafkaAssertions.awaitAssert(() -> {
            this.assertions.assertRecordsContain(str, "jerry@test.com");
        });
    }

    @Test
    @Order(70)
    public void shouldBeDownAfterCrash() throws SQLException {
        this.connectController.destroy();
        insertCustomer("Nibbles", "Tester", "nibbles@test.com");
        String str = this.connectorConfig.getDbServerName() + ".inventory.customers";
        KafkaAssertions.awaitAssert(() -> {
            this.assertions.assertRecordsCount(str, 6);
        });
    }

    @Test
    @Order(80)
    public void shouldResumeStreamingAfterCrash() throws InterruptedException {
        this.connectController.restore();
        String str = this.connectorConfig.getDbServerName() + ".inventory.customers";
        KafkaAssertions.awaitAssert(() -> {
            this.assertions.assertMinimalRecordsCount(str, 7);
        });
        KafkaAssertions.awaitAssert(() -> {
            this.assertions.assertRecordsContain(str, "nibbles@test.com");
        });
    }

    @Test
    @Order(90)
    public void shouldExtractNewRecordState() throws Exception {
        this.connectController.undeployConnector(this.connectorConfig.getConnectorName());
        this.connectorConfig = this.connectorConfig.addJdbcUnwrapSMT();
        this.connectController.deployConnector(this.connectorConfig);
        insertCustomer("Eaton", "Beaver", "ebeaver@test.com");
        String str = this.connectorConfig.getDbServerName() + ".inventory.customers";
        KafkaAssertions.awaitAssert(() -> {
            this.assertions.assertMinimalRecordsCount(str, 8);
        });
        KafkaAssertions.awaitAssert(() -> {
            this.assertions.assertRecordIsUnwrapped(str, 1);
        });
    }
}
