package io.debezium.connector.sqlserver;

import io.debezium.config.Configuration;
import io.debezium.connector.sqlserver.SqlServerConnectorConfig;
import io.debezium.connector.sqlserver.util.TestHelper;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.pipeline.source.snapshot.incremental.AbstractSnapshotTest;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.history.SchemaHistory;
import io.debezium.util.IoUtil;
import io.debezium.util.Testing;
import java.io.IOException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.AbstractIntegerAssert;
import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:io/debezium/connector/sqlserver/IncrementalSnapshotCollationSortOrderMismatchIT.class */
public class IncrementalSnapshotCollationSortOrderMismatchIT extends AbstractSnapshotTest<SqlServerConnector> {
    private static final int POLLING_INTERVAL = 1;
    private static final String SQL_COLLATION = "SQL_Latin1_General_CP1_CI_AS";
    private static final List<String> ALL_IDS = new ArrayList();
    private static final List<String> SKIPPED_IDS = new ArrayList();
    private SqlServerConnection connection;
    private boolean isSendStringParametersAsUnicode;

    @BeforeClass
    public static void beforeClass() throws IOException {
        ClassLoader classLoader = IncrementalSnapshotCollationSortOrderMismatchIT.class.getClassLoader();
        List<String> list = ALL_IDS;
        Objects.requireNonNull(list);
        IoUtil.readLines("dbz-7359-ids.txt", classLoader, IncrementalSnapshotCollationSortOrderMismatchIT.class, (v1) -> {
            r3.add(v1);
        });
        SKIPPED_IDS.addAll(ALL_IDS.subList(ALL_IDS.indexOf("Y-11-3-4") + POLLING_INTERVAL, ALL_IDS.indexOf("Y1-01-1-1")));
    }

    @Test
    public void orderMismatchPkCharValueIntParamsAsUnicodeFalse() throws Exception {
        orderMismatchPkTypecharValueInt(false, ALL_IDS.size(), "char(50) COLLATE SQL_Latin1_General_CP1_CI_AS");
    }

    @Test
    public void orderMismatchPkCharValueIntParamsAsUnicodeTrueSkip36() throws Exception {
        orderMismatchPkTypecharValueInt(true, ALL_IDS.size() - SKIPPED_IDS.size(), "char(50) COLLATE SQL_Latin1_General_CP1_CI_AS");
    }

    @Test
    public void orderMismatchPkVarcharValueIntParamsAsUnicodeFalse() throws Exception {
        orderMismatchPkTypecharValueInt(false, ALL_IDS.size(), "varchar(50) COLLATE SQL_Latin1_General_CP1_CI_AS");
    }

    @Test
    public void orderMismatchPkVarcharValueIntParamsAsUnicodeTrueSkip36() throws Exception {
        orderMismatchPkTypecharValueInt(true, ALL_IDS.size() - SKIPPED_IDS.size(), "varchar(50) COLLATE SQL_Latin1_General_CP1_CI_AS");
    }

    @Test
    public void orderMismatchPkVarcharValueNvarcharParamsAsUnicodeFalse() throws Exception {
        orderMismatchPkVarcharValueNvarchar(false, ALL_IDS.size());
    }

    @Test
    public void orderMismatchPkVarcharValueNvarcharParamsAsUnicodeTrueSkip36() throws Exception {
        orderMismatchPkVarcharValueNvarchar(true, ALL_IDS.size() - SKIPPED_IDS.size());
    }

    @Test
    public void orderMismatchPkNcharValueNvarcharParamsAsUnicodeFalse() throws Exception {
        orderMismatchPkNtypeValueNvarchar(false, ALL_IDS.size(), "nchar(50)");
    }

    @Test
    public void orderMismatchPkNcharValueNvarcharParamsAsUnicodeTrue() throws Exception {
        orderMismatchPkNtypeValueNvarchar(true, ALL_IDS.size(), "nchar(50)");
    }

    @Test
    public void orderMismatchPkNvarcharValueNvarcharParamsAsUnicodeFalse() throws Exception {
        orderMismatchPkNtypeValueNvarchar(false, ALL_IDS.size(), "nvarchar(50)");
    }

    @Test
    public void orderMismatchPkNvarcharValueNvarcharParamsAsUnicodeTrue() throws Exception {
        orderMismatchPkNtypeValueNvarchar(true, ALL_IDS.size(), "nvarchar(50)");
    }

    protected void orderMismatchPkTypecharValueInt(boolean z, int i, String str) throws Exception {
        runTest(z, i, str, (str2, num) -> {
            return String.format("'%s'", str2);
        }, struct -> {
            return struct.getString(pkFieldName()).trim();
        }, "int", (str3, num2) -> {
            return String.format("%d", num2);
        }, sourceRecord -> {
            return ((Struct) sourceRecord.value()).getStruct("after").getInt32(valueFieldName());
        }, map -> {
            Integer num3;
            boolean z2 = POLLING_INTERVAL;
            for (int i2 = 0; i2 < ALL_IDS.size(); i2 += POLLING_INTERVAL) {
                String str4 = ALL_IDS.get(i2);
                if (!(z && SKIPPED_IDS.contains(str4)) && ((num3 = (Integer) map.get(str4)) == null || num3.intValue() != i2)) {
                    z2 = false;
                    Testing.printError(ALL_IDS.get(i2) + " value is not = " + i2 + ", is = " + num3);
                    break;
                }
            }
            return z2;
        });
    }

    protected void orderMismatchPkVarcharValueNvarchar(boolean z, int i) throws Exception {
        runTest(z, i, "varchar(50) COLLATE SQL_Latin1_General_CP1_CI_AS", (str, num) -> {
            return String.format("'%s'", str);
        }, struct -> {
            return struct.getString(pkFieldName());
        }, "nvarchar(100) not null", (str2, num2) -> {
            return String.format("N'%d Hiragana: の, は, でした, Katakana: コンサート, Kanji: 昨夜, 最高'", num2);
        }, sourceRecord -> {
            return ((Struct) sourceRecord.value()).getStruct("after").getString(valueFieldName());
        }, map -> {
            boolean z2 = POLLING_INTERVAL;
            int i2 = 0;
            while (true) {
                if (i2 >= ALL_IDS.size()) {
                    break;
                }
                String str3 = ALL_IDS.get(i2);
                if (!z || !SKIPPED_IDS.contains(str3)) {
                    String format = String.format("%d Hiragana: の, は, でした, Katakana: コンサート, Kanji: 昨夜, 最高", Integer.valueOf(i2));
                    String str4 = (String) map.get(ALL_IDS.get(i2));
                    if (!format.equals(str4)) {
                        z2 = false;
                        Testing.printError(ALL_IDS.get(i2) + " value is not = " + format + ", is = " + str4);
                        break;
                    }
                }
                i2 += POLLING_INTERVAL;
            }
            return z2;
        });
    }

    protected void orderMismatchPkNtypeValueNvarchar(boolean z, int i, String str) throws Exception {
        runTest(z, i, str, (str2, num) -> {
            return String.format("N'の %s'", str2);
        }, struct -> {
            return struct.getString(pkFieldName()).trim();
        }, "nvarchar(100) not null", (str3, num2) -> {
            return String.format("N'%d Hiragana: の, は, でした, Katakana: コンサート, Kanji: 昨夜, 最高'", num2);
        }, sourceRecord -> {
            return ((Struct) sourceRecord.value()).getStruct("after").getString(valueFieldName());
        }, map -> {
            boolean z2 = POLLING_INTERVAL;
            int i2 = 0;
            while (true) {
                if (i2 >= ALL_IDS.size()) {
                    break;
                }
                String format = String.format("%d Hiragana: の, は, でした, Katakana: コンサート, Kanji: 昨夜, 最高", Integer.valueOf(i2));
                String str4 = (String) map.get(String.format("の %s", ALL_IDS.get(i2)));
                if (!format.equals(str4)) {
                    z2 = false;
                    System.err.println(ALL_IDS.get(i2) + " value is not = " + format + ", is = " + str4);
                    break;
                }
                i2 += POLLING_INTERVAL;
            }
            return z2;
        });
    }

    protected <P, V> void runTest(boolean z, int i, String str, BiFunction<String, Integer, String> biFunction, Function<Struct, P> function, String str2, BiFunction<String, Integer, String> biFunction2, Function<SourceRecord, V> function2, Predicate<Map<P, V>> predicate) throws Exception {
        this.isSendStringParametersAsUnicode = z;
        TestHelper.createTestDatabase();
        SqlServerConnection testConnection = TestHelper.testConnection(TestHelper.TEST_DATABASE_1);
        try {
            this.connection = testConnection;
            testConnection.execute(new String[]{String.format("CREATE TABLE %s (%s %s primary key, %s %s)", tableName(), pkFieldName(), str, valueFieldName(), str2), "CREATE TABLE debezium_signal (id varchar(64), type varchar(32), data varchar(2048))"});
            TestHelper.enableTableCdc(testConnection, "debezium_signal");
            TestHelper.adjustCdcPollingInterval(testConnection, POLLING_INTERVAL);
            initializeConnectorTestFramework();
            Testing.Files.delete(TestHelper.SCHEMA_HISTORY_PATH);
            populateTable(testConnection, biFunction, biFunction2);
            TestHelper.enableTableCdc(testConnection, tableName());
            startConnector();
            sendAdHocSnapshotSignal(new String[]{tableName()});
            testIncrementalSnapshotConsumed(i, function, function2, predicate);
            if (testConnection != null) {
                testConnection.close();
            }
        } catch (Throwable th) {
            if (testConnection != null) {
                try {
                    testConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    protected void populateTable(SqlServerConnection sqlServerConnection, BiFunction<String, Integer, String> biFunction, BiFunction<String, Integer, String> biFunction2) throws SQLException {
        sqlServerConnection.setAutoCommit(false);
        for (int i = 0; i < ALL_IDS.size(); i += POLLING_INTERVAL) {
            String str = ALL_IDS.get(i);
            sqlServerConnection.executeWithoutCommitting(new String[]{String.format("INSERT INTO %s (%s, %s) VALUES(%s, %s)", tableName(), pkFieldName(), valueFieldName(), biFunction.apply(str, Integer.valueOf(i)), biFunction2.apply(str, Integer.valueOf(i)))});
        }
        sqlServerConnection.commit();
    }

    protected <P, V> void testIncrementalSnapshotConsumed(int i, Function<Struct, P> function, Function<SourceRecord, V> function2, Predicate<Map<P, V>> predicate) throws InterruptedException {
        Map<P, V> consumeIncrementalSnapshot = consumeIncrementalSnapshot(i, entry -> {
            return true;
        }, function, function2, topicName(), null, true);
        Assertions.assertThat(consumeIncrementalSnapshot).hasSize(i);
        Assert.assertTrue(predicate.test(consumeIncrementalSnapshot));
    }

    protected <P, V> Map<P, V> consumeIncrementalSnapshot(int i, Predicate<Map.Entry<P, V>> predicate, Function<Struct, P> function, Function<SourceRecord, V> function2, String str, Consumer<List<SourceRecord>> consumer, boolean z) throws InterruptedException {
        HashMap hashMap = new HashMap();
        int i2 = 0;
        while (true) {
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(POLLING_INTERVAL, z);
            List<SourceRecord> recordsForTopic = consumeRecordsByTopic.recordsForTopic(str);
            if (consumeRecordsByTopic.allRecordsInOrder().isEmpty()) {
                i2 += POLLING_INTERVAL;
                ((AbstractIntegerAssert) Assertions.assertThat(i2).describedAs(String.format("Too many no data record results, %d < %d", Integer.valueOf(hashMap.size()), Integer.valueOf(i)), new Object[0])).isLessThanOrEqualTo(5);
            } else {
                i2 = 0;
                if (recordsForTopic != null && !recordsForTopic.isEmpty()) {
                    recordsForTopic.forEach(sourceRecord -> {
                        hashMap.put(function.apply((Struct) sourceRecord.key()), function2.apply(sourceRecord));
                    });
                    if (consumer != null) {
                        consumer.accept(recordsForTopic);
                    }
                    if (hashMap.size() >= i && hashMap.entrySet().stream().noneMatch(predicate.negate())) {
                        return hashMap;
                    }
                }
            }
        }
    }

    protected Class<SqlServerConnector> connectorClass() {
        return SqlServerConnector.class;
    }

    protected JdbcConnection databaseConnection() {
        return this.connection;
    }

    protected String topicName() {
        return "server1.testDB1.dbo.c";
    }

    protected String tableName() {
        return "testDB1.dbo.c";
    }

    protected List<String> topicNames() {
        throw new UnsupportedOperationException();
    }

    protected List<String> tableNames() {
        throw new UnsupportedOperationException();
    }

    protected String signalTableName() {
        return "dbo.debezium_signal";
    }

    protected Configuration.Builder config() {
        return TestHelper.defaultConfig().with("database.sendStringParametersAsUnicode", this.isSendStringParametersAsUnicode).with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.SCHEMA_ONLY).with(SqlServerConnectorConfig.SIGNAL_DATA_COLLECTION, "testDB1.dbo.debezium_signal").with(SqlServerConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 250);
    }

    protected Configuration.Builder mutableConfig(boolean z, boolean z2) {
        return TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, SqlServerConnectorConfig.SnapshotMode.INITIAL).with(SqlServerConnectorConfig.SIGNAL_DATA_COLLECTION, "testDB1.dbo.debezium_signal").with(SqlServerConnectorConfig.TABLE_INCLUDE_LIST, z ? "dbo.b" : "dbo.a,dbo.b").with(SqlServerConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 250).with(SqlServerConnectorConfig.INCREMENTAL_SNAPSHOT_ALLOW_SCHEMA_CHANGES, true).with(RelationalDatabaseConnectorConfig.MSG_KEY_COLUMNS, "dbo.a42:pk1,pk2,pk3,pk4").with(SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL, z2);
    }

    protected String connector() {
        return "sql_server";
    }

    protected String server() {
        return TestHelper.TEST_SERVER_NAME;
    }
}
