package io.debezium.connector.binlog;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.connector.binlog.BinlogConnectorConfig;
import io.debezium.connector.binlog.util.TestHelper;
import io.debezium.connector.binlog.util.UniqueDatabase;
import io.debezium.engine.DebeziumEngine;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.kafka.KafkaCluster;
import io.debezium.pipeline.signal.actions.snapshotting.ExecuteSnapshot;
import io.debezium.pipeline.signal.channels.KafkaSignalChannel;
import io.debezium.util.Collect;
import io.debezium.util.Testing;
import java.io.File;
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.connect.source.SourceConnector;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:io/debezium/connector/binlog/BinlogSignalsIT.class */
public abstract class BinlogSignalsIT<C extends SourceConnector> extends AbstractBinlogConnectorIT<C> {
    protected static final String SERVER_NAME = "is_test";
    protected static final Path SCHEMA_HISTORY_PATH = Testing.Files.createTestingPath("file-schema-history-is.txt").toAbsolutePath();
    protected final UniqueDatabase DATABASE = TestHelper.getUniqueDatabase(SERVER_NAME, "incremental_snapshot-test").withDbHistoryPath(SCHEMA_HISTORY_PATH);
    protected static KafkaCluster kafka;

    @BeforeClass
    public static void startKafka() throws Exception {
        File createTestingDirectory = Testing.Files.createTestingDirectory("signal_cluster");
        Testing.Files.delete(createTestingDirectory);
        kafka = new KafkaCluster().usingDirectory(createTestingDirectory).deleteDataPriorToStartup(true).deleteDataUponShutdown(true).addBrokers(1).withKafkaConfiguration(Collect.propertiesOf("auto.create.topics.enable", "true", "zookeeper.session.timeout.ms", "20000")).startup();
    }

    @AfterClass
    public static void stopKafka() {
        if (kafka != null) {
            kafka.shutdown();
        }
    }

    @Before
    public void before() throws SQLException {
        stopConnector();
        this.DATABASE.createAndInitialize();
        initializeConnectorTestFramework();
        Testing.Files.delete(SCHEMA_HISTORY_PATH);
    }

    @After
    public void after() {
        try {
            stopConnector();
            Testing.Files.delete(SCHEMA_HISTORY_PATH);
        } catch (Throwable th) {
            Testing.Files.delete(SCHEMA_HISTORY_PATH);
            throw th;
        }
    }

    protected Configuration.Builder config() {
        return this.DATABASE.defaultConfig().with(BinlogConnectorConfig.INCLUDE_SQL_QUERY, true).with(BinlogConnectorConfig.USER, "mysqluser").with(BinlogConnectorConfig.PASSWORD, "mysqlpw").with(BinlogConnectorConfig.SNAPSHOT_MODE, BinlogConnectorConfig.SnapshotMode.NO_DATA.getValue()).with(BinlogConnectorConfig.INCLUDE_SCHEMA_CHANGES, false).with(BinlogConnectorConfig.SIGNAL_DATA_COLLECTION, this.DATABASE.qualifiedTableName("debezium_signal")).with(CommonConnectorConfig.SIGNAL_POLL_INTERVAL_MS, 1).with(BinlogConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 10).with(BinlogConnectorConfig.INCREMENTAL_SNAPSHOT_ALLOW_SCHEMA_CHANGES, true).with(CommonConnectorConfig.SCHEMA_NAME_ADJUSTMENT_MODE, CommonConnectorConfig.SchemaNameAdjustmentMode.AVRO);
    }

    @Test
    public void givenOffsetCommitDisabledAndASignalSentWithConnectorRunning_whenConnectorComesBackUp_thenAllSignalsAreCorrectlyProcessed() throws ExecutionException, InterruptedException {
        LogInterceptor logInterceptor = new LogInterceptor(ExecuteSnapshot.class);
        startConnector(builder -> {
            return builder.with(CommonConnectorConfig.SIGNAL_ENABLED_CHANNELS, "source,kafka").with(KafkaSignalChannel.SIGNAL_TOPIC, "signals_topic-1").with(KafkaSignalChannel.BOOTSTRAP_SERVERS, kafka.brokerList());
        });
        assertConnectorIsRunning();
        sendExecuteSnapshotKafkaSignal(BinlogReadOnlyIncrementalSnapshotIT.EXCLUDED_TABLE, "signals_topic-1");
        waitForAvailableRecords(1000L, TimeUnit.MILLISECONDS);
        Thread.sleep(5000L);
        Assertions.assertThat(consumeRecordsByTopic(2).allRecordsInOrder()).hasSize(2);
        Assertions.assertThat(logInterceptor.containsMessage("Requested 'INCREMENTAL' snapshot of data collections '[b]'")).isTrue();
    }

    @Test
    public void givenOffsetCommitEnabledAndASignalSentWithConnectorRunning_whenConnectorComesBackUp_thenAllSignalsAreCorrectlyProcessed() throws ExecutionException, InterruptedException {
        LogInterceptor logInterceptor = new LogInterceptor(ExecuteSnapshot.class);
        startConnector(builder -> {
            return builder.with(CommonConnectorConfig.SIGNAL_ENABLED_CHANNELS, "source,kafka").with(KafkaSignalChannel.SIGNAL_TOPIC, "signals_topic-3").with(KafkaSignalChannel.BOOTSTRAP_SERVERS, kafka.brokerList());
        });
        assertConnectorIsRunning();
        sendExecuteSnapshotKafkaSignal(BinlogReadOnlyIncrementalSnapshotIT.EXCLUDED_TABLE, "signals_topic-3");
        waitForAvailableRecords(1000L, TimeUnit.MILLISECONDS);
        Assertions.assertThat(logInterceptor.containsMessage("Requested 'INCREMENTAL' snapshot of data collections '[b]'")).isTrue();
    }

    @Test
    public void givenOffsetCommitEnabledAndMultipleSignalsSentWithConnectorRunning_whenConnectorComesBackUp_thenAllSignalsAreCorrectlyProcessed() throws ExecutionException, InterruptedException {
        LogInterceptor logInterceptor = new LogInterceptor(ExecuteSnapshot.class);
        sendExecuteSnapshotKafkaSignal(BinlogReadOnlyIncrementalSnapshotIT.EXCLUDED_TABLE, "signals_topic-4");
        startConnector(builder -> {
            return builder.with(CommonConnectorConfig.SIGNAL_ENABLED_CHANNELS, "source,kafka").with(KafkaSignalChannel.SIGNAL_TOPIC, "signals_topic-4").with(KafkaSignalChannel.BOOTSTRAP_SERVERS, kafka.brokerList());
        });
        assertConnectorIsRunning();
        waitForAvailableRecords(1000L, TimeUnit.MILLISECONDS);
        stopConnector();
        sendExecuteSnapshotKafkaSignal("c", "signals_topic-4");
        startConnector(builder2 -> {
            return builder2.with(CommonConnectorConfig.SIGNAL_ENABLED_CHANNELS, "source,kafka").with(KafkaSignalChannel.SIGNAL_TOPIC, "signals_topic-4").with(KafkaSignalChannel.BOOTSTRAP_SERVERS, kafka.brokerList());
        });
        assertConnectorIsRunning();
        waitForStreamingRunning(getConnectorName(), this.DATABASE.getServerName());
        waitForAvailableRecords(1000L, TimeUnit.MILLISECONDS);
        Assertions.assertThat(logInterceptor.getLogEntriesThatContainsMessage("Requested 'INCREMENTAL' snapshot of data collections '[b]'")).hasSize(1);
        Assertions.assertThat(logInterceptor.getLogEntriesThatContainsMessage("Requested 'INCREMENTAL' snapshot of data collections '[c]'")).hasSize(1);
    }

    @Test
    public void givenOffsetCommitEnabledAndASignalSentWithConnectorNotRunning_whenConnectorComesBackUp_thenAllSignalsAreCorrectlyProcessed() throws ExecutionException, InterruptedException {
        LogInterceptor logInterceptor = new LogInterceptor(ExecuteSnapshot.class);
        sendExecuteSnapshotKafkaSignal(BinlogReadOnlyIncrementalSnapshotIT.EXCLUDED_TABLE, "signals_topic-5");
        startConnector(builder -> {
            return builder.with(CommonConnectorConfig.SIGNAL_ENABLED_CHANNELS, "source,kafka").with(KafkaSignalChannel.SIGNAL_TOPIC, "signals_topic-5").with(KafkaSignalChannel.BOOTSTRAP_SERVERS, kafka.brokerList());
        });
        assertConnectorIsRunning();
        Assertions.assertThat(logInterceptor.containsMessage("Requested 'INCREMENTAL' snapshot of data collections '[b]'")).isTrue();
    }

    protected void startConnector(Function<Configuration.Builder, Configuration.Builder> function) {
        startConnector(function, loggingCompletion());
    }

    protected void startConnector(Function<Configuration.Builder, Configuration.Builder> function, DebeziumEngine.CompletionCallback completionCallback) {
        start(getConnectorClass(), function.apply(config()).build(), completionCallback);
        assertConnectorIsRunning();
        waitForAvailableRecords(5L, TimeUnit.SECONDS);
    }

    protected void sendExecuteSnapshotKafkaSignal(String str, String str2) throws ExecutionException, InterruptedException {
        sendKafkaSignal(String.format("{\"type\":\"execute-snapshot\",\"data\": {\"data-collections\": [\"%s\"], \"type\": \"INCREMENTAL\"}}", str), str2);
    }

    protected void sendKafkaSignal(String str, String str2) throws ExecutionException, InterruptedException {
        ProducerRecord producerRecord = new ProducerRecord(str2, 0, SERVER_NAME, str);
        KafkaProducer kafkaProducer = new KafkaProducer(Configuration.create().withDefault("bootstrap.servers", kafka.brokerList()).withDefault("client.id", "signals").withDefault("key.serializer", StringSerializer.class).withDefault("value.serializer", StringSerializer.class).build().asProperties());
        try {
            kafkaProducer.send(producerRecord).get();
            kafkaProducer.close();
        } catch (Throwable th) {
            try {
                kafkaProducer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }
}
