package io.debezium.connector.spanner;

import io.debezium.config.Configuration;
import io.debezium.util.Testing;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalUnit;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:io/debezium/connector/spanner/LowWatermarkRecordIT.class */
public class LowWatermarkRecordIT extends AbstractSpannerConnectorIT {
    private static final String tableName = "low_watermark_record_tests_table";
    private static final String changeStreamName = "lowWatermarkRecordTestChangeStream";

    @BeforeAll
    static void setup() throws InterruptedException, ExecutionException {
        databaseConnection.createTable("low_watermark_record_tests_table(id int64, name string(100)) primary key(id)");
        databaseConnection.createChangeStream(changeStreamName, tableName);
        Testing.print("LowWatermarkRecordIT is ready...");
    }

    @AfterAll
    static void clear() throws InterruptedException {
        databaseConnection.dropChangeStream(changeStreamName);
        databaseConnection.dropTable(tableName);
    }

    @Test
    public void shouldStreamUpdatesToKafka() throws InterruptedException {
        Instant now = Instant.now();
        Configuration build = Configuration.copy(baseConfig).with("gcp.spanner.change.stream", changeStreamName).with("name", "low_watermark_record_tests_table_test").with("gcp.spanner.start.time", DateTimeFormatter.ISO_INSTANT.format(now)).with("gcp.spanner.low-watermark.enabled", true).build();
        initializeConnectorTestFramework();
        start(SpannerConnector.class, build);
        assertConnectorIsRunning();
        databaseConnection.executeUpdate("insert into low_watermark_record_tests_table(id, name) values (1, 'some name')");
        databaseConnection.executeUpdate("update low_watermark_record_tests_table set name = 'test' where id = 1");
        waitForAvailableRecords(waitTimeForRecords(), TimeUnit.SECONDS);
        List<SourceRecord> recordsForTopic = consumeRecordsByTopic(10, false).recordsForTopic(getTopicName(build, tableName));
        List<Long> list = (List) recordsForTopic.stream().map(sourceRecord -> {
            if (sourceRecord.value() != null) {
                return (Long) ((Struct) ((Struct) sourceRecord.value()).get("source")).get("low_watermark");
            }
            return null;
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        }).collect(Collectors.toList());
        Assertions.assertThat(!list.isEmpty());
        Assertions.assertThat(((Long) Collections.max(list)).longValue() > now.plus(2L, (TemporalUnit) ChronoUnit.SECONDS).toEpochMilli());
        validateLowWatermarks(recordsForTopic, list);
        stopConnector();
        assertConnectorNotRunning();
    }

    private void validateLowWatermarks(List<SourceRecord> list, List<Long> list2) {
        for (SourceRecord sourceRecord : list) {
            if (sourceRecord.timestamp() != null) {
                Assert.assertTrue(sourceRecord.timestamp().longValue() > list2.get(0).longValue());
            }
        }
    }
}
