package io.debezium.connector.spanner;

import io.debezium.config.Configuration;
import io.debezium.util.Testing;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:io/debezium/connector/spanner/PgDataTypesIT.class */
public class PgDataTypesIT extends AbstractSpannerConnectorIT {
    private static final String pgsqlTableName = "pgembedded_data_types_tests_table";
    private static final String pgsqlChangeStreamName = "pgembeddeddatatypestestchangestream";

    @BeforeAll
    static void setup() throws InterruptedException, ExecutionException {
        pgDatabaseConnection.createTable("pgembedded_data_types_tests_table(id bigint,  boolcol boolean,  int64col bigint,  float64col float8,  timestampcol timestamptz, datecol date, stringcol varchar, bytescol bytea, arrcol varchar[], PRIMARY KEY (id))");
        pgDatabaseConnection.createChangeStream(pgsqlChangeStreamName, pgsqlTableName);
        Testing.print("PgDataTypesIT is ready...");
    }

    @AfterAll
    static void clear() throws InterruptedException {
        pgDatabaseConnection.dropChangeStream(pgsqlChangeStreamName);
        pgDatabaseConnection.dropTable(pgsqlTableName);
    }

    @Test
    public void shouldStreamUpdatesToKafkaWithTheCorrectType() throws InterruptedException, ExecutionException {
        Configuration build = Configuration.copy(basePgConfig).with("gcp.spanner.change.stream", pgsqlChangeStreamName).with("name", "pgembedded_data_types_tests_table_test").with("gcp.spanner.start.time", DateTimeFormatter.ISO_INSTANT.format(Instant.now())).build();
        initializeConnectorTestFramework();
        start(SpannerConnector.class, build);
        assertConnectorIsRunning();
        Assert.assertEquals(1L, pgDatabaseConnection.executeUpdate("INSERT INTO pgembedded_data_types_tests_table(id, boolcol, int64col, float64col, timestampcol, datecol, stringcol, bytescol, arrcol) VALUES (1, true, 42, 2.71, '1970-01-01 00:00:00 UTC', '1970-01-01', 'stringVal', bytea 'bytesVal', ARRAY['a', 'b'])").longValue());
        Assert.assertTrue(waitForAvailableRecords(waitTimeForRecords(), TimeUnit.SECONDS));
        List recordsForTopic = consumeRecordsByTopic(10, false).recordsForTopic(getTopicName(build, pgsqlTableName));
        Assertions.assertThat(recordsForTopic).hasSize(1);
        Struct struct = (Struct) ((SourceRecord) recordsForTopic.get(0)).value();
        Assertions.assertThat(struct.get("op")).isEqualTo("c");
        Assertions.assertThat(struct.schema().field("after")).isNotNull();
        Struct struct2 = struct.getStruct("after");
        Assert.assertTrue(struct2.getBoolean("boolcol").booleanValue());
        Assertions.assertThat(struct2.getInt64("int64col")).isEqualTo(42L);
        Assertions.assertThat(struct2.getFloat64("float64col")).isEqualTo(2.71d);
        Assertions.assertThat(struct2.getString("timestampcol")).isEqualTo("1970-01-01T00:00:00Z");
        Assertions.assertThat(struct2.getString("datecol")).isEqualTo("1970-01-01");
        Assertions.assertThat(struct2.getString("stringcol")).isEqualTo("stringVal");
        Assertions.assertThat(struct2.getBytes("bytescol")).isEqualTo("bytesVal".getBytes());
        Assertions.assertThat(struct2.getArray("arrcol")).containsExactly(new Object[]{"a", "b"});
        stopConnector();
        assertConnectorNotRunning();
    }
}
