package io.debezium.testing.system.tests.jdbc.sink;

import io.debezium.bindings.kafka.KafkaDebeziumSinkRecord;
import io.debezium.connector.jdbc.util.DebeziumSinkRecordFactory;
import io.debezium.connector.jdbc.util.SinkRecordBuilder;
import io.debezium.testing.system.assertions.JdbcAssertions;
import io.debezium.testing.system.tools.kafka.ConnectorConfigBuilder;
import io.debezium.testing.system.tools.kafka.KafkaConnectController;
import io.debezium.testing.system.tools.kafka.KafkaController;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.HashMap;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.json.JsonConverter;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/testing/system/tests/jdbc/sink/JdbcSinkTests.class */
public abstract class JdbcSinkTests {
    protected final KafkaController kafkaController;
    protected final KafkaConnectController connectController;
    protected final JdbcAssertions assertions;
    protected ConnectorConfigBuilder connectorConfig;
    protected Producer<String, String> kafkaProducer;
    Logger LOGGER = LoggerFactory.getLogger(JdbcAssertions.class);

    public JdbcSinkTests(KafkaController kafkaController, KafkaConnectController kafkaConnectController, JdbcAssertions jdbcAssertions, ConnectorConfigBuilder connectorConfigBuilder) {
        this.kafkaController = kafkaController;
        this.connectController = kafkaConnectController;
        this.assertions = jdbcAssertions;
        this.connectorConfig = connectorConfigBuilder;
        this.kafkaProducer = new KafkaProducer(kafkaController.getDefaultProducerProperties());
    }

    private void produceRecordToTopic(String str, String str2, String str3) {
        String createRecord = createRecord(str2, str3);
        this.LOGGER.info("Producing record to topic {}", str);
        this.LOGGER.debug(createRecord);
        this.kafkaProducer.send(new ProducerRecord(str, createRecord));
    }

    private String createRecord(String str, String str2) {
        KafkaDebeziumSinkRecord build = SinkRecordBuilder.update().flat(false).name("jdbc-connector-test").recordSchema(SchemaBuilder.struct().field(str, Schema.STRING_SCHEMA).build()).sourceSchema(new DebeziumSinkRecordFactory().basicSourceSchema()).after(str, str2).before(str, str2).source("ts_ms", Integer.valueOf((int) Instant.now().getEpochSecond())).build();
        JsonConverter jsonConverter = new JsonConverter();
        try {
            HashMap hashMap = new HashMap();
            hashMap.put("converter.type", "value");
            jsonConverter.configure(hashMap);
            byte[] fromConnectData = jsonConverter.fromConnectData((String) null, build.valueSchema(), build.value());
            jsonConverter.close();
            return new String(fromConnectData, StandardCharsets.UTF_8);
        } catch (Throwable th) {
            try {
                jsonConverter.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    @Order(10)
    public void shouldHaveRegisteredConnector() {
        Request build = new Request.Builder().url(this.connectController.getApiURL().resolve("/connectors")).build();
        JdbcAssertions.awaitAssert(() -> {
            Response execute = new OkHttpClient().newCall(build).execute();
            try {
                Assertions.assertThat(execute.body().string()).contains(new CharSequence[]{this.connectorConfig.getConnectorName()});
                if (execute != null) {
                    execute.close();
                }
            } catch (Throwable th) {
                if (execute != null) {
                    try {
                        execute.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        });
    }

    @Test
    @Order(20)
    public void shouldStreamChanges() {
        String asString = this.connectorConfig.getAsString("topics");
        produceRecordToTopic(asString, "name", "Jerry");
        JdbcAssertions.awaitAssert(() -> {
            this.assertions.assertRowsCount(1, asString);
        });
        JdbcAssertions.awaitAssert(() -> {
            this.assertions.assertRowsContain(asString, "name", "Jerry");
        });
    }

    @Test
    @Order(30)
    public void shouldBeDown() throws Exception {
        String asString = this.connectorConfig.getAsString("topics");
        this.connectController.undeployConnector(this.connectorConfig.getConnectorName());
        produceRecordToTopic(asString, "name", "Nibbles");
        JdbcAssertions.awaitAssert(() -> {
            this.assertions.assertRowsCount(1, asString);
        });
    }

    @Test
    @Order(40)
    public void shouldResumeStreamingAfterRedeployment() throws Exception {
        this.connectController.deployConnector(this.connectorConfig);
        String asString = this.connectorConfig.getAsString("topics");
        JdbcAssertions.awaitAssert(() -> {
            this.assertions.assertRowsCount(2, asString);
        });
        JdbcAssertions.awaitAssert(() -> {
            this.assertions.assertRowsContain(asString, "name", "Nibbles");
        });
    }

    @Test
    @Order(50)
    public void shouldBeDownAfterCrash() {
        this.connectController.destroy();
        String asString = this.connectorConfig.getAsString("topics");
        produceRecordToTopic(asString, "name", "Larry");
        JdbcAssertions.awaitAssert(() -> {
            this.assertions.assertRowsCount(2, asString);
        });
    }

    @Test
    @Order(60)
    public void shouldResumeStreamingAfterCrash() throws InterruptedException {
        this.connectController.restore();
        String asString = this.connectorConfig.getAsString("topics");
        JdbcAssertions.awaitAssert(() -> {
            this.assertions.assertRowsCount(3, asString);
        });
        JdbcAssertions.awaitAssert(() -> {
            this.assertions.assertRowsContain(asString, "name", "Larry");
        });
    }
}
