package io.debezium.connector.jdbc.integration;

import io.debezium.bindings.kafka.KafkaDebeziumSinkRecord;
import io.debezium.connector.jdbc.JdbcSinkConnectorConfig;
import io.debezium.connector.jdbc.junit.TestHelper;
import io.debezium.connector.jdbc.junit.jupiter.Sink;
import io.debezium.connector.jdbc.junit.jupiter.SinkRecordFactoryArgumentsProvider;
import io.debezium.connector.jdbc.transforms.ConvertCloudEventToSaveableForm;
import io.debezium.connector.jdbc.util.SinkRecordFactory;
import io.debezium.converters.spi.SerializerType;
import io.debezium.sink.SinkConnectorConfig;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.assertj.db.api.TableAssert;
import org.assertj.db.type.ValueType;
import org.fest.assertions.Assertions;
import org.fest.assertions.Index;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;

/* loaded from: input_file:io/debezium/connector/jdbc/integration/AbstractJdbcSinkSaveConvertedCloudEventTest.class */
public abstract class AbstractJdbcSinkSaveConvertedCloudEventTest extends AbstractJdbcSinkTest {
    public AbstractJdbcSinkSaveConvertedCloudEventTest(Sink sink) {
        super(sink);
    }

    @ArgumentsSource(SinkRecordFactoryArgumentsProvider.class)
    @ParameterizedTest
    public void testSaveConvertedCloudEventRecordFromJson(SinkRecordFactory sinkRecordFactory) {
        ConvertCloudEventToSaveableForm convertCloudEventToSaveableForm = new ConvertCloudEventToSaveableForm();
        HashMap hashMap = new HashMap();
        hashMap.put("fields.mapping", "id,source:created_by,data:payload");
        hashMap.put("serializer.type", "json");
        convertCloudEventToSaveableForm.configure(hashMap);
        Map<String, String> defaultSinkConfig = getDefaultSinkConfig();
        defaultSinkConfig.put("schema.evolution", JdbcSinkConnectorConfig.SchemaEvolutionMode.BASIC.getValue());
        defaultSinkConfig.put("primary.key.mode", SinkConnectorConfig.PrimaryKeyMode.RECORD_VALUE.getValue());
        defaultSinkConfig.put("primary.key.fields", "id");
        startSinkConnector(defaultSinkConfig);
        assertSinkConnectorIsRunning();
        KafkaDebeziumSinkRecord kafkaDebeziumSinkRecord = new KafkaDebeziumSinkRecord(convertCloudEventToSaveableForm.apply(sinkRecordFactory.cloudEventRecord(topicName("server1", "schema", randomTableName()), SerializerType.withName("json"), null).getOriginalKafkaRecord()));
        consume(kafkaDebeziumSinkRecord);
        String destinationTableName = destinationTableName(kafkaDebeziumSinkRecord);
        TableAssert assertTable = TestHelper.assertTable(assertDbConnection(), destinationTableName);
        assertTable.exists().hasNumberOfRows(1).hasNumberOfColumns(3);
        getSink().assertColumnType(assertTable, "id", ValueType.TEXT);
        getSink().assertColumnType(assertTable, "created_by", ValueType.TEXT, "test_ce_source");
        getSink().assertColumnType(assertTable, "payload", ValueType.TEXT);
        assertHasPrimaryKeyColumns(destinationTableName, "id");
        convertCloudEventToSaveableForm.close();
    }

    @ArgumentsSource(SinkRecordFactoryArgumentsProvider.class)
    @ParameterizedTest
    public void testSaveConvertedCloudEventRecordFromAvro(SinkRecordFactory sinkRecordFactory) {
        ConvertCloudEventToSaveableForm convertCloudEventToSaveableForm = new ConvertCloudEventToSaveableForm();
        HashMap hashMap = new HashMap();
        hashMap.put("fields.mapping", "id,source:created_by,data:payload");
        hashMap.put("serializer.type", "avro");
        convertCloudEventToSaveableForm.configure(hashMap);
        Map<String, String> defaultSinkConfig = getDefaultSinkConfig();
        defaultSinkConfig.put("schema.evolution", JdbcSinkConnectorConfig.SchemaEvolutionMode.BASIC.getValue());
        defaultSinkConfig.put("primary.key.mode", SinkConnectorConfig.PrimaryKeyMode.RECORD_VALUE.getValue());
        defaultSinkConfig.put("primary.key.fields", "id");
        startSinkConnector(defaultSinkConfig);
        assertSinkConnectorIsRunning();
        KafkaDebeziumSinkRecord kafkaDebeziumSinkRecord = new KafkaDebeziumSinkRecord(convertCloudEventToSaveableForm.apply(sinkRecordFactory.cloudEventRecord(topicName("server1", "schema", randomTableName()), SerializerType.withName("avro"), null).getOriginalKafkaRecord()));
        consume(kafkaDebeziumSinkRecord);
        String destinationTableName = destinationTableName(kafkaDebeziumSinkRecord);
        TableAssert assertTable = TestHelper.assertTable(assertDbConnection(), destinationTableName);
        assertTable.exists().hasNumberOfRows(1).hasNumberOfColumns(3);
        getSink().assertColumnType(assertTable, "id", ValueType.TEXT);
        getSink().assertColumnType(assertTable, "created_by", ValueType.TEXT, "test_ce_source");
        getSink().assertColumnType(assertTable, "payload", ValueType.TEXT);
        assertHasPrimaryKeyColumns(destinationTableName, "id");
        convertCloudEventToSaveableForm.close();
    }

    @ArgumentsSource(SinkRecordFactoryArgumentsProvider.class)
    @ParameterizedTest
    public void testSaveConvertedCloudEventRecordFromAvroWithCloudEventsSchemaCustomName(SinkRecordFactory sinkRecordFactory) {
        ConvertCloudEventToSaveableForm convertCloudEventToSaveableForm = new ConvertCloudEventToSaveableForm();
        HashMap hashMap = new HashMap();
        hashMap.put("fields.mapping", "id,source:created_by,data:payload");
        hashMap.put("serializer.type", "avro");
        hashMap.put("schema.cloudevents.name", "TestCESchemaCustomName");
        convertCloudEventToSaveableForm.configure(hashMap);
        Map<String, String> defaultSinkConfig = getDefaultSinkConfig();
        defaultSinkConfig.put("schema.evolution", JdbcSinkConnectorConfig.SchemaEvolutionMode.BASIC.getValue());
        defaultSinkConfig.put("primary.key.mode", SinkConnectorConfig.PrimaryKeyMode.RECORD_VALUE.getValue());
        defaultSinkConfig.put("primary.key.fields", "id");
        startSinkConnector(defaultSinkConfig);
        assertSinkConnectorIsRunning();
        KafkaDebeziumSinkRecord kafkaDebeziumSinkRecord = new KafkaDebeziumSinkRecord(convertCloudEventToSaveableForm.apply(sinkRecordFactory.cloudEventRecord(topicName("server1", "schema", randomTableName()), SerializerType.withName("avro"), "TestCESchemaCustomName").getOriginalKafkaRecord()));
        consume(kafkaDebeziumSinkRecord);
        String destinationTableName = destinationTableName(kafkaDebeziumSinkRecord);
        TableAssert assertTable = TestHelper.assertTable(assertDbConnection(), destinationTableName);
        assertTable.exists().hasNumberOfRows(1).hasNumberOfColumns(3);
        getSink().assertColumnType(assertTable, "id", ValueType.TEXT);
        getSink().assertColumnType(assertTable, "created_by", ValueType.TEXT, "test_ce_source");
        getSink().assertColumnType(assertTable, "payload", ValueType.TEXT);
        assertHasPrimaryKeyColumns(destinationTableName, "id");
        convertCloudEventToSaveableForm.close();
    }

    protected void assertHasPrimaryKeyColumns(String str, String... strArr) {
        assertHasPrimaryKeyColumns(str, true, strArr);
    }

    protected void assertHasPrimaryKeyColumns(String str, boolean z, String... strArr) {
        List<String> primaryKeyColumnNames = TestHelper.getPrimaryKeyColumnNames(dataSource(), str);
        if (strArr.length == 0) {
            Assertions.assertThat(primaryKeyColumnNames).isEmpty();
            return;
        }
        if (!z) {
            Assertions.assertThat(primaryKeyColumnNames).containsExactly(strArr);
            return;
        }
        List list = (List) primaryKeyColumnNames.stream().map((v0) -> {
            return v0.toLowerCase();
        }).collect(Collectors.toList());
        for (int i = 0; i < strArr.length; i++) {
            Assertions.assertThat(list).contains(strArr[i].toLowerCase(), Index.atIndex(i));
        }
    }
}
