package io.debezium.connector.jdbc;

import io.debezium.connector.jdbc.dialect.DatabaseDialect;
import io.debezium.connector.jdbc.junit.jupiter.SinkRecordFactoryArgumentsProvider;
import io.debezium.connector.jdbc.type.Type;
import io.debezium.connector.jdbc.util.SinkRecordFactory;
import io.debezium.sink.SinkConnectorConfig;
import java.time.Instant;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Predicate;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

@Tag("UnitTests")
/* loaded from: input_file:io/debezium/connector/jdbc/ReducedRecordBufferTest.class */
class ReducedRecordBufferTest extends AbstractRecordBufferTest {
    ReducedRecordBufferTest() {
    }

    @BeforeEach
    void setUp() {
        this.dialect = (DatabaseDialect) Mockito.mock(DatabaseDialect.class);
        Type type = (Type) Mockito.mock(Type.class);
        Mockito.when(type.getTypeName((DatabaseDialect) ArgumentMatchers.eq(this.dialect), (Schema) ArgumentMatchers.any(), ArgumentMatchers.anyBoolean())).thenReturn("");
        Mockito.when(this.dialect.getSchemaType((Schema) ArgumentMatchers.any())).thenReturn(type);
    }

    protected JdbcSinkConnectorConfig getJdbcConnectorConfig(SinkConnectorConfig.PrimaryKeyMode primaryKeyMode, String str) {
        return null == str ? new JdbcSinkConnectorConfig(Map.of("batch.size", "5", "primary.key.mode", primaryKeyMode.getValue())) : new JdbcSinkConnectorConfig(Map.of("batch.size", "5", "primary.key.mode", primaryKeyMode.getValue(), "primary.key.fields", str));
    }

    protected JdbcSinkConnectorConfig getJdbcConnectorConfig() {
        return getJdbcConnectorConfig(SinkConnectorConfig.PrimaryKeyMode.RECORD_KEY, "id");
    }

    @ArgumentsSource(SinkRecordFactoryArgumentsProvider.class)
    @DisplayName("When 10 sink records arrives and buffer size is 5 then the buffer will be flushed 2 times")
    @ParameterizedTest
    void correctlyBuffer(SinkRecordFactory sinkRecordFactory) {
        JdbcSinkConnectorConfig jdbcConnectorConfig = getJdbcConnectorConfig();
        ReducedRecordBuffer reducedRecordBuffer = new ReducedRecordBuffer(jdbcConnectorConfig);
        Stream stream = ((List) IntStream.range(0, 10).mapToObj(i -> {
            return createRecordPkFieldId(sinkRecordFactory, (byte) i, jdbcConnectorConfig);
        }).collect(Collectors.toList())).stream();
        Objects.requireNonNull(reducedRecordBuffer);
        Assertions.assertThat(stream.map(reducedRecordBuffer::add).filter(Predicate.not((v0) -> {
            return v0.isEmpty();
        })).toList().size()).isEqualTo(2);
    }

    @ArgumentsSource(SinkRecordFactoryArgumentsProvider.class)
    @DisplayName("When key schema changes then the buffer will be flushed")
    @ParameterizedTest
    void keySchemaChange(SinkRecordFactory sinkRecordFactory) {
        JdbcSinkConnectorConfig jdbcConnectorConfig = getJdbcConnectorConfig();
        ReducedRecordBuffer reducedRecordBuffer = new ReducedRecordBuffer(jdbcConnectorConfig);
        List list = (List) IntStream.range(0, 3).mapToObj(i -> {
            return createRecordPkFieldId(sinkRecordFactory, (byte) i, jdbcConnectorConfig);
        }).collect(Collectors.toList());
        list.add(createRecord(sinkRecordFactory.updateBuilder().name("prefix").topic("topic").keySchema(sinkRecordFactory.keySchema(UnaryOperator.identity(), Schema.INT16_SCHEMA)).recordSchema(SchemaBuilder.struct().field("id", Schema.INT8_SCHEMA)).sourceSchema(sinkRecordFactory.basicSourceSchema()).key("id", (short) 1).before("id", (byte) 1).after("id", (byte) 1).source("ts_ms", Integer.valueOf((int) Instant.now().getEpochSecond())).build(), jdbcConnectorConfig));
        Stream stream = list.stream();
        Objects.requireNonNull(reducedRecordBuffer);
        Assertions.assertThat(stream.map(reducedRecordBuffer::add).filter(Predicate.not((v0) -> {
            return v0.isEmpty();
        })).toList().size()).isEqualTo(1);
    }

    @ArgumentsSource(SinkRecordFactoryArgumentsProvider.class)
    @DisplayName("When value schema changes then the buffer will be flushed")
    @ParameterizedTest
    void valueSchemaChange(SinkRecordFactory sinkRecordFactory) {
        JdbcSinkConnectorConfig jdbcConnectorConfig = getJdbcConnectorConfig();
        ReducedRecordBuffer reducedRecordBuffer = new ReducedRecordBuffer(jdbcConnectorConfig);
        List list = (List) IntStream.range(0, 3).mapToObj(i -> {
            return createRecordPkFieldId(sinkRecordFactory, (byte) i, jdbcConnectorConfig);
        }).collect(Collectors.toList());
        list.add(createRecord(sinkRecordFactory.updateBuilder().name("prefix").topic("topic").keySchema(sinkRecordFactory.basicKeySchema()).recordSchema(SchemaBuilder.struct().field("id", Schema.INT16_SCHEMA)).sourceSchema(sinkRecordFactory.basicSourceSchema()).key("id", (byte) 1).before("id", (short) 1).after("id", (short) 1).source("ts_ms", Integer.valueOf((int) Instant.now().getEpochSecond())).build(), jdbcConnectorConfig));
        Stream stream = list.stream();
        Objects.requireNonNull(reducedRecordBuffer);
        Assertions.assertThat(stream.map(reducedRecordBuffer::add).filter(Predicate.not((v0) -> {
            return v0.isEmpty();
        })).toList().size()).isEqualTo(1);
    }

    @ArgumentsSource(SinkRecordFactoryArgumentsProvider.class)
    @DisplayName("When 10 sink records arrives and buffer size is 5 with every alternate duplicate sink record then the buffer will be flushed 1 time")
    @ParameterizedTest
    void correctlyBufferWithDuplicate(SinkRecordFactory sinkRecordFactory) {
        JdbcSinkConnectorConfig jdbcConnectorConfig = getJdbcConnectorConfig();
        ReducedRecordBuffer reducedRecordBuffer = new ReducedRecordBuffer(jdbcConnectorConfig);
        Stream stream = ((List) IntStream.range(0, 10).mapToObj(i -> {
            return createRecordPkFieldId(sinkRecordFactory, (byte) (i % 2 == 0 ? i : i - 1), jdbcConnectorConfig);
        }).collect(Collectors.toList())).stream();
        Objects.requireNonNull(reducedRecordBuffer);
        List list = stream.map(reducedRecordBuffer::add).filter(Predicate.not((v0) -> {
            return v0.isEmpty();
        })).toList();
        Assertions.assertThat(list.size()).isEqualTo(1);
        Assertions.assertThat(((List) list.get(0)).size()).isEqualTo(5);
    }

    @ArgumentsSource(SinkRecordFactoryArgumentsProvider.class)
    @DisplayName("When primary key mode is none then reduced buffer should raise exception")
    @ParameterizedTest
    void raiseExceptionWithoutPrimaryKey(SinkRecordFactory sinkRecordFactory) {
        JdbcSinkConnectorConfig jdbcConnectorConfig = getJdbcConnectorConfig(SinkConnectorConfig.PrimaryKeyMode.NONE, null);
        ReducedRecordBuffer reducedRecordBuffer = new ReducedRecordBuffer(jdbcConnectorConfig);
        Stream stream = ((List) IntStream.range(0, 10).mapToObj(i -> {
            return createRecordNoPkFields(sinkRecordFactory, (byte) i, jdbcConnectorConfig);
        }).collect(Collectors.toList())).stream();
        Objects.requireNonNull(reducedRecordBuffer);
        Stream filter = stream.map(reducedRecordBuffer::add).filter(Predicate.not((v0) -> {
            return v0.isEmpty();
        }));
        Objects.requireNonNull(filter);
        Assertions.assertThat(((Exception) org.junit.jupiter.api.Assertions.assertThrows(ConnectException.class, filter::toList)).getMessage()).isEqualTo("No struct-based primary key defined for record key/value, reduction buffer require struct based primary key");
    }

    @ArgumentsSource(SinkRecordFactoryArgumentsProvider.class)
    @DisplayName("When primary key columns are in record value then reduced buffer should work as expected")
    @ParameterizedTest
    void primaryKeyInValue(SinkRecordFactory sinkRecordFactory) {
        JdbcSinkConnectorConfig jdbcConnectorConfig = getJdbcConnectorConfig(SinkConnectorConfig.PrimaryKeyMode.RECORD_VALUE, "value_id");
        ReducedRecordBuffer reducedRecordBuffer = new ReducedRecordBuffer(jdbcConnectorConfig);
        Stream stream = ((List) IntStream.range(0, 10).mapToObj(i -> {
            List<String> of = List.of("value_id", "name");
            List<Schema> of2 = List.of(SchemaBuilder.type(Schema.INT8_SCHEMA.type()).optional().build(), SchemaBuilder.type(Schema.STRING_SCHEMA.type()).optional().build());
            Object[] objArr = new Object[2];
            objArr[0] = Byte.valueOf((byte) (i % 2 == 0 ? i : i - 1));
            objArr[1] = "John Doe " + i;
            return new JdbcKafkaSinkRecord(sinkRecordFactory.createRecordWithSchemaValue("topic", (byte) 1, of, of2, Arrays.asList(objArr)).getOriginalKafkaRecord(), jdbcConnectorConfig.getPrimaryKeyMode(), jdbcConnectorConfig.getPrimaryKeyFields(), jdbcConnectorConfig.getFieldFilter(), this.dialect);
        }).collect(Collectors.toList())).stream();
        Objects.requireNonNull(reducedRecordBuffer);
        List list = stream.map(reducedRecordBuffer::add).filter(Predicate.not((v0) -> {
            return v0.isEmpty();
        })).toList();
        Assertions.assertThat(list.size()).isEqualTo(1);
        Assertions.assertThat(((List) list.get(0)).size()).isEqualTo(5);
        ((List) list.get(0)).forEach(jdbcSinkRecord -> {
            Struct keyStruct = jdbcSinkRecord.getKeyStruct(SinkConnectorConfig.PrimaryKeyMode.RECORD_VALUE, Set.of("value_id"));
            Assertions.assertThat(keyStruct).isNotNull();
            Assertions.assertThat(keyStruct.schema().fields()).hasSize(1);
            Assertions.assertThat(keyStruct.schema().field("value_id")).isNotNull();
            byte byteValue = jdbcSinkRecord.getPayload().getInt8("value_id").byteValue();
            Assertions.assertThat(keyStruct.get("value_id")).isEqualTo(Byte.valueOf(byteValue));
            if (byteValue < 8) {
                Assertions.assertThat(jdbcSinkRecord.getPayload().getString("name")).isEqualTo("John Doe " + (byteValue + 1));
            }
        });
    }
}
