package io.debezium.connector.spanner;

import io.debezium.config.Configuration;
import io.debezium.util.Testing;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:io/debezium/connector/spanner/KafkaTopicPartitionIT.class */
public class KafkaTopicPartitionIT extends AbstractSpannerConnectorIT {
    private static final String tableName = "kafka_topic_partition_tests_table";
    private static final String changeStreamName = "kafkaTopicPartitionChangeStream";

    @BeforeAll
    static void setup() throws InterruptedException, ExecutionException {
        databaseConnection.createTable("kafka_topic_partition_tests_table(id int64, name string(100),time TIMESTAMP,\n  date DATE,\n  byt BYTES(2000),\n  bool BOOL, long_time int64) primary key(id)");
        databaseConnection.createChangeStream(changeStreamName, tableName);
        Testing.print("KafkaTopicPartitionIT is ready...");
    }

    @AfterAll
    static void clear() throws InterruptedException {
        databaseConnection.dropChangeStream(changeStreamName);
        databaseConnection.dropTable(tableName);
    }

    @Test
    public void checkRecordsWithSameKeyAreInSamePartition() throws InterruptedException {
        Configuration build = Configuration.copy(baseConfig).with("gcp.spanner.change.stream", changeStreamName).with("name", "kafka_topic_partition_tests_table_test").with("gcp.spanner.start.time", DateTimeFormatter.ISO_INSTANT.format(Instant.now())).build();
        initializeConnectorTestFramework();
        start(SpannerConnector.class, build);
        assertConnectorIsRunning();
        databaseConnection.executeUpdate("insert into kafka_topic_partition_tests_table(id, name) values (1, 'some name')");
        databaseConnection.executeUpdate("update kafka_topic_partition_tests_table set name = 'test' where id = 1");
        databaseConnection.executeUpdate("insert into kafka_topic_partition_tests_table(id, name) values (2, 'test name')");
        databaseConnection.executeUpdate("update kafka_topic_partition_tests_table set bool = true where id = 2");
        waitForAvailableRecords(waitTimeForRecords(), TimeUnit.SECONDS);
        List recordsForTopic = consumeRecordsByTopic(10, false).recordsForTopic(getTopicName(build, tableName));
        Assertions.assertThat(recordsForTopic).hasSize(4);
        Map map = (Map) recordsForTopic.stream().collect(Collectors.groupingBy((v0) -> {
            return v0.key();
        }));
        Assertions.assertThat(map).hasSize(2);
        map.values().forEach(list -> {
            Assert.assertEquals(2L, list.size());
            org.junit.jupiter.api.Assertions.assertTrue(((Long) ((Struct) ((SourceRecord) list.get(0)).value()).get("ts_ms")).longValue() <= ((Long) ((Struct) ((SourceRecord) list.get(1)).value()).get("ts_ms")).longValue());
            Assert.assertEquals(1L, ((Set) list.stream().map((v0) -> {
                return v0.sourcePartition();
            }).collect(Collectors.toSet())).size());
        });
        stopConnector();
        assertConnectorNotRunning();
    }
}
