package io.debezium.connector.sqlserver;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.connector.sqlserver.SqlServerConnectorConfig;
import io.debezium.connector.sqlserver.util.TestHelper;
import io.debezium.pipeline.notification.AbstractNotificationsIT;
import io.debezium.pipeline.notification.channels.SinkNotificationChannel;
import io.debezium.util.Testing;
import java.sql.SQLException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.Assertions;
import org.assertj.core.data.Percentage;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:io/debezium/connector/sqlserver/NotificationsIT.class */
public class NotificationsIT extends AbstractNotificationsIT<SqlServerConnector> {
    @Before
    public void before() throws SQLException {
        TestHelper.createTestDatabase();
        SqlServerConnection testConnection = TestHelper.testConnection();
        testConnection.execute(new String[]{"CREATE TABLE tablea (id int primary key, cola varchar(30))", "CREATE TABLE tableb (id int primary key, colb varchar(30))", "INSERT INTO tablea VALUES(1, 'a')"});
        TestHelper.enableTableCdc(testConnection, "tablea");
        initializeConnectorTestFramework();
        Testing.Files.delete(TestHelper.SCHEMA_HISTORY_PATH);
    }

    @After
    public void after() {
        stopConnector();
        TestHelper.dropTestDatabase();
    }

    protected List<String> collections() {
        return (List) List.of("dbo.tablea", "dbo.tableb").stream().map(str -> {
            return String.format("%s.%s", database(), str);
        }).collect(Collectors.toList());
    }

    protected Class<SqlServerConnector> connectorClass() {
        return SqlServerConnector.class;
    }

    protected Configuration.Builder config() {
        return TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.INITIAL);
    }

    protected String connector() {
        return "sql_server";
    }

    protected String server() {
        return TestHelper.TEST_SERVER_NAME;
    }

    protected String task() {
        return "0";
    }

    protected String database() {
        return TestHelper.TEST_DATABASE_1;
    }

    protected String snapshotStatusResult() {
        return "COMPLETED";
    }

    @Test
    public void completeReadingFromACaptureInstanceNotificationEmitted() throws SQLException {
        startConnector(builder -> {
            return builder.with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.NO_DATA).with(SinkNotificationChannel.NOTIFICATION_TOPIC, "io.debezium.notification").with(CommonConnectorConfig.NOTIFICATION_ENABLED_CHANNELS, "sink");
        });
        assertConnectorIsRunning();
        TestHelper.waitForStreamingStarted();
        SqlServerConnection testConnection = TestHelper.testConnection();
        testConnection.execute(new String[]{"INSERT INTO tablea VALUES(2, 'b')"});
        testConnection.execute(new String[]{"ALTER TABLE tablea ADD colb int NULL"});
        TestHelper.enableTableCdc(testConnection, "tablea", "tablea_c2");
        testConnection.execute(new String[]{"INSERT INTO tablea VALUES(3, 'c', 3)"});
        testConnection.close();
        ArrayList arrayList = new ArrayList();
        Awaitility.await().atMost(60L, TimeUnit.SECONDS).until(() -> {
            consumeAvailableRecords(sourceRecord -> {
                if (sourceRecord.topic().equals("io.debezium.notification")) {
                    arrayList.add(sourceRecord);
                }
            });
            return Boolean.valueOf(arrayList.size() == 3);
        });
        Assertions.assertThat(arrayList).hasSize(3);
        SourceRecord sourceRecord = (SourceRecord) arrayList.get(2);
        Assertions.assertThat(sourceRecord.topic()).isEqualTo("io.debezium.notification");
        Struct struct = (Struct) sourceRecord.value();
        Assertions.assertThat(struct.getString("aggregate_type")).isEqualTo("Capture Instance");
        Assertions.assertThat(struct.getString("type")).isEqualTo("COMPLETED");
        Assertions.assertThat(struct.getInt64("timestamp")).isCloseTo(Instant.now().toEpochMilli(), Percentage.withPercentage(1.0d));
        Map map = struct.getMap("additional_data");
        Assertions.assertThat((String) map.get("server")).isEqualTo(TestHelper.TEST_SERVER_NAME);
        Assertions.assertThat((String) map.get("database")).isEqualTo(TestHelper.TEST_DATABASE_1);
        Assertions.assertThat((String) map.get("capture_instance")).isEqualTo("dbo_tablea");
        Lsn valueOf = Lsn.valueOf((String) map.get("start_lsn"));
        Lsn valueOf2 = Lsn.valueOf((String) map.get("stop_lsn"));
        Lsn valueOf3 = Lsn.valueOf((String) map.get("commit_lsn"));
        Assertions.assertThat(valueOf).isLessThan(valueOf2);
        Assertions.assertThat(valueOf2).isLessThan(valueOf3);
        SqlServerConnection testConnection2 = TestHelper.testConnection();
        testConnection2.execute(new String[]{"EXEC sys.sp_cdc_disable_table @source_schema = N'dbo', @source_name = N'tablea', @capture_instance = 'dbo_tablea'"});
        testConnection2.execute(new String[]{"ALTER TABLE tablea ADD colc int NULL"});
        TestHelper.enableTableCdc(testConnection2, "tablea", "tablea_c3");
        testConnection2.execute(new String[]{"INSERT INTO tablea VALUES(4, 'c', 4, 4)"});
        testConnection2.close();
        ArrayList arrayList2 = new ArrayList();
        Awaitility.await().atMost(60L, TimeUnit.SECONDS).until(() -> {
            consumeAvailableRecords(sourceRecord2 -> {
                if (sourceRecord2.topic().equals("io.debezium.notification")) {
                    arrayList2.add(sourceRecord2);
                }
            });
            return Boolean.valueOf(arrayList2.size() == 1);
        });
        Assertions.assertThat(arrayList2).hasSize(1);
        SourceRecord sourceRecord2 = (SourceRecord) arrayList2.get(0);
        Assertions.assertThat(sourceRecord2.topic()).isEqualTo("io.debezium.notification");
        Struct struct2 = (Struct) sourceRecord2.value();
        Assertions.assertThat(struct2.getString("aggregate_type")).isEqualTo("Capture Instance");
        Assertions.assertThat(struct2.getString("type")).isEqualTo("COMPLETED");
        Assertions.assertThat(struct2.getInt64("timestamp")).isCloseTo(Instant.now().toEpochMilli(), Percentage.withPercentage(1.0d));
        Map map2 = struct2.getMap("additional_data");
        Assertions.assertThat((String) map2.get("server")).isEqualTo(TestHelper.TEST_SERVER_NAME);
        Assertions.assertThat((String) map2.get("database")).isEqualTo(TestHelper.TEST_DATABASE_1);
        Assertions.assertThat((String) map2.get("capture_instance")).isEqualTo("tablea_c2");
        Lsn valueOf4 = Lsn.valueOf((String) map2.get("start_lsn"));
        Lsn valueOf5 = Lsn.valueOf((String) map2.get("stop_lsn"));
        Lsn valueOf6 = Lsn.valueOf((String) map2.get("commit_lsn"));
        Assertions.assertThat(valueOf4).isLessThan(valueOf5);
        Assertions.assertThat(valueOf5).isLessThan(valueOf6);
    }
}
