package io.debezium.connector.vitess;

import binlogdata.Binlogdata;
import ch.qos.logback.classic.Level;
import io.debezium.connector.vitess.Vgtid;
import io.debezium.connector.vitess.VitessConnectorConfig;
import io.debezium.connector.vitess.connection.ReplicationMessage;
import io.debezium.connector.vitess.connection.VitessReplicationConnection;
import io.debezium.doc.FixFor;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.schema.DefaultTopicNamingStrategy;
import io.debezium.schema.SchemaNameAdjuster;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionFactory;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/vitess/VitessReplicationConnectionIT.class */
public class VitessReplicationConnectionIT {
    private static final Logger LOGGER = LoggerFactory.getLogger(VitessReplicationConnectionIT.class);
    protected long pollTimeoutInMs = TimeUnit.SECONDS.toMillis(5);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/debezium/connector/vitess/VitessReplicationConnectionIT$MessageAndVgtid.class */
    public static class MessageAndVgtid {
        ReplicationMessage message;
        Vgtid vgtid;

        MessageAndVgtid(ReplicationMessage replicationMessage, Vgtid vgtid) {
            this.message = replicationMessage;
            this.vgtid = vgtid;
        }

        public ReplicationMessage getMessage() {
            return this.message;
        }

        public Vgtid getVgtid() {
            return this.vgtid;
        }
    }

    @Before
    public void beforeEach() {
        TestHelper.execute(TestHelper.SETUP_TABLES_STMT);
        TestHelper.execute("INSERT INTO t1 (int_col) VALUES (1);");
    }

    @Test
    public void shouldErrorOutWhenSkipEnabled() throws Exception {
        new LogInterceptor(VitessReplicationConnection.class).setLoggerLevel(VitessReplicationConnection.class, Level.DEBUG);
        VitessConnectorConfig vitessConnectorConfig = new VitessConnectorConfig(TestHelper.defaultConfig(false, false, 1, -1, -1, null, VitessConnectorConfig.SnapshotMode.NEVER, TestHelper.TEST_SHARD, "1", "skip").build());
        VitessDatabaseSchema vitessDatabaseSchema = new VitessDatabaseSchema(vitessConnectorConfig, SchemaNameAdjuster.create(), DefaultTopicNamingStrategy.create(vitessConnectorConfig));
        AtomicReference atomicReference = new AtomicReference();
        VitessReplicationConnection vitessReplicationConnection = new VitessReplicationConnection(vitessConnectorConfig, vitessDatabaseSchema);
        Vgtid of = Vgtid.of(Binlogdata.VGtid.newBuilder().addShardGtids(Binlogdata.ShardGtid.newBuilder().setKeyspace(vitessConnectorConfig.getKeyspace()).setShard((String) vitessConnectorConfig.getShard().get(0)).setGtid("current").build()).build());
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(100);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        vitessReplicationConnection.startStreaming(of, (replicationMessage, vgtid) -> {
            if (!atomicBoolean.get()) {
                atomicBoolean.set(true);
            }
            arrayBlockingQueue.add(new MessageAndVgtid(replicationMessage, vgtid));
        }, atomicReference);
        Awaitility.await().atMost(Duration.ofSeconds(TestHelper.waitTimeForRecords())).until(() -> {
            return Boolean.valueOf(atomicReference.get() != null);
        });
        Assertions.assertThat((Throwable) atomicReference.get()).isNotNull();
    }

    @Test
    public void shouldErrorOutWhenWarnEnabled() throws Exception {
        new LogInterceptor(VitessReplicationConnection.class);
        VitessConnectorConfig vitessConnectorConfig = new VitessConnectorConfig(TestHelper.defaultConfig(false, false, 1, -1, -1, null, VitessConnectorConfig.SnapshotMode.NEVER, TestHelper.TEST_SHARD, "1", "warn").build());
        VitessDatabaseSchema vitessDatabaseSchema = new VitessDatabaseSchema(vitessConnectorConfig, SchemaNameAdjuster.create(), DefaultTopicNamingStrategy.create(vitessConnectorConfig));
        AtomicReference atomicReference = new AtomicReference();
        VitessReplicationConnection vitessReplicationConnection = new VitessReplicationConnection(vitessConnectorConfig, vitessDatabaseSchema);
        Vgtid of = Vgtid.of(Binlogdata.VGtid.newBuilder().addShardGtids(Binlogdata.ShardGtid.newBuilder().setKeyspace(vitessConnectorConfig.getKeyspace()).setShard((String) vitessConnectorConfig.getShard().get(0)).setGtid("current").build()).build());
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(100);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        vitessReplicationConnection.startStreaming(of, (replicationMessage, vgtid) -> {
            if (!atomicBoolean.get()) {
                atomicBoolean.set(true);
            }
            arrayBlockingQueue.add(new MessageAndVgtid(replicationMessage, vgtid));
        }, atomicReference);
        Awaitility.await().atMost(Duration.ofSeconds(TestHelper.waitTimeForRecords())).until(() -> {
            return Boolean.valueOf(atomicReference.get() != null);
        });
        Assertions.assertThat((Throwable) atomicReference.get()).isNotNull();
    }

    @Test
    public void shouldFailWhenFailEnabled() throws Exception {
        VitessConnectorConfig vitessConnectorConfig = new VitessConnectorConfig(TestHelper.defaultConfig(false, false, 1, -1, -1, null, VitessConnectorConfig.SnapshotMode.NEVER, TestHelper.TEST_SHARD, "1", "fail").build());
        VitessDatabaseSchema vitessDatabaseSchema = new VitessDatabaseSchema(vitessConnectorConfig, SchemaNameAdjuster.create(), DefaultTopicNamingStrategy.create(vitessConnectorConfig));
        AtomicReference atomicReference = new AtomicReference();
        VitessReplicationConnection vitessReplicationConnection = new VitessReplicationConnection(vitessConnectorConfig, vitessDatabaseSchema);
        Vgtid of = Vgtid.of(Binlogdata.VGtid.newBuilder().addShardGtids(Binlogdata.ShardGtid.newBuilder().setKeyspace(vitessConnectorConfig.getKeyspace()).setShard((String) vitessConnectorConfig.getShard().get(0)).setGtid("current").build()).build());
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(100);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        vitessReplicationConnection.startStreaming(of, (replicationMessage, vgtid) -> {
            if (!atomicBoolean.get()) {
                atomicBoolean.set(true);
            }
            arrayBlockingQueue.add(new MessageAndVgtid(replicationMessage, vgtid));
        }, atomicReference);
        Awaitility.await().atMost(Duration.ofSeconds(TestHelper.waitTimeForRecords())).until(() -> {
            return Boolean.valueOf(atomicReference.get() != null);
        });
        Assertions.assertThat((Throwable) atomicReference.get()).isNotNull();
    }

    @Test
    public void shouldFailWhenErrorProcessingModeIsNotSet() throws Exception {
        VitessConnectorConfig vitessConnectorConfig = new VitessConnectorConfig(TestHelper.defaultConfig(false, false, 1, -1, -1, null, VitessConnectorConfig.SnapshotMode.NEVER, TestHelper.TEST_SHARD, "1", null).build());
        vitessConnectorConfig.getEventProcessingFailureHandlingMode();
        VitessDatabaseSchema vitessDatabaseSchema = new VitessDatabaseSchema(vitessConnectorConfig, SchemaNameAdjuster.create(), DefaultTopicNamingStrategy.create(vitessConnectorConfig));
        AtomicReference atomicReference = new AtomicReference();
        VitessReplicationConnection vitessReplicationConnection = new VitessReplicationConnection(vitessConnectorConfig, vitessDatabaseSchema);
        Vgtid of = Vgtid.of(Binlogdata.VGtid.newBuilder().addShardGtids(Binlogdata.ShardGtid.newBuilder().setKeyspace(vitessConnectorConfig.getKeyspace()).setShard((String) vitessConnectorConfig.getShard().get(0)).setGtid("current").build()).build());
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(100);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        vitessReplicationConnection.startStreaming(of, (replicationMessage, vgtid) -> {
            if (!atomicBoolean.get()) {
                atomicBoolean.set(true);
            }
            arrayBlockingQueue.add(new MessageAndVgtid(replicationMessage, vgtid));
        }, atomicReference);
        Awaitility.await().atMost(Duration.ofSeconds(TestHelper.waitTimeForRecords())).until(() -> {
            return Boolean.valueOf(atomicReference.get() != null);
        });
        Assertions.assertThat((Throwable) atomicReference.get()).isNotNull();
    }

    @Test
    public void shouldHaveVgtidInResponse() throws Exception {
        VitessConnectorConfig vitessConnectorConfig = new VitessConnectorConfig(TestHelper.defaultConfig().build());
        VitessDatabaseSchema vitessDatabaseSchema = new VitessDatabaseSchema(vitessConnectorConfig, SchemaNameAdjuster.create(), DefaultTopicNamingStrategy.create(vitessConnectorConfig));
        AtomicReference atomicReference = new AtomicReference();
        try {
            VitessReplicationConnection vitessReplicationConnection = new VitessReplicationConnection(vitessConnectorConfig, vitessDatabaseSchema);
            try {
                Vgtid of = Vgtid.of(Binlogdata.VGtid.newBuilder().addShardGtids(Binlogdata.ShardGtid.newBuilder().setKeyspace(vitessConnectorConfig.getKeyspace()).setShard((String) vitessConnectorConfig.getShard().get(0)).setGtid("current").build()).build());
                ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(100);
                AtomicBoolean atomicBoolean = new AtomicBoolean(false);
                vitessReplicationConnection.startStreaming(of, (replicationMessage, vgtid) -> {
                    if (!atomicBoolean.get()) {
                        atomicBoolean.set(true);
                    }
                    arrayBlockingQueue.add(new MessageAndVgtid(replicationMessage, vgtid));
                }, atomicReference);
                ConditionFactory atMost = Awaitility.await().atMost(Duration.ofSeconds(TestHelper.waitTimeForRecords()));
                Objects.requireNonNull(atomicBoolean);
                atMost.until(atomicBoolean::get);
                arrayBlockingQueue.clear();
                TestHelper.execute("INSERT INTO t1 (int_col) VALUES (1);");
                List<MessageAndVgtid> awaitMessages = awaitMessages(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS, 3, () -> {
                    try {
                        return (MessageAndVgtid) arrayBlockingQueue.poll(this.pollTimeoutInMs, TimeUnit.MILLISECONDS);
                    } catch (InterruptedException e) {
                        return null;
                    }
                });
                awaitMessages.forEach(messageAndVgtid -> {
                    assertValidVgtid(messageAndVgtid.getVgtid(), vitessConnectorConfig.getKeyspace(), (String) vitessConnectorConfig.getShard().get(0));
                });
                Assertions.assertThat(awaitMessages.get(0).getMessage().getOperation().name()).isEqualTo("BEGIN");
                Assertions.assertThat(awaitMessages.get(1).getMessage().getOperation().name()).isEqualTo("INSERT");
                Assertions.assertThat(awaitMessages.get(2).getMessage().getOperation().name()).isEqualTo("COMMIT");
                vitessReplicationConnection.close();
            } finally {
            }
        } finally {
            if (atomicReference.get() != null) {
                LOGGER.error("Error during streaming", (Throwable) atomicReference.get());
            }
        }
    }

    @Test
    public void shouldSendHeartbeatMessage() throws Exception {
        VitessConnectorConfig vitessConnectorConfig = new VitessConnectorConfig(TestHelper.defaultConfig().with(Heartbeat.HEARTBEAT_INTERVAL, 1000).build());
        VitessDatabaseSchema vitessDatabaseSchema = new VitessDatabaseSchema(vitessConnectorConfig, SchemaNameAdjuster.create(), DefaultTopicNamingStrategy.create(vitessConnectorConfig));
        AtomicReference atomicReference = new AtomicReference();
        try {
            VitessReplicationConnection vitessReplicationConnection = new VitessReplicationConnection(vitessConnectorConfig, vitessDatabaseSchema);
            try {
                Vgtid of = Vgtid.of(Binlogdata.VGtid.newBuilder().addShardGtids(Binlogdata.ShardGtid.newBuilder().setKeyspace(vitessConnectorConfig.getKeyspace()).setShard((String) vitessConnectorConfig.getShard().get(0)).setGtid("current").build()).build());
                ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(100);
                AtomicBoolean atomicBoolean = new AtomicBoolean(false);
                vitessReplicationConnection.startStreaming(of, (replicationMessage, vgtid) -> {
                    if (!atomicBoolean.get()) {
                        atomicBoolean.set(true);
                    }
                    arrayBlockingQueue.add(new MessageAndVgtid(replicationMessage, vgtid));
                }, atomicReference);
                ConditionFactory atMost = Awaitility.await().atMost(Duration.ofSeconds(TestHelper.waitTimeForRecords()));
                Objects.requireNonNull(atomicBoolean);
                atMost.until(atomicBoolean::get);
                arrayBlockingQueue.clear();
                Assertions.assertThat(awaitMessages(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS, 1, () -> {
                    try {
                        return (MessageAndVgtid) arrayBlockingQueue.poll(this.pollTimeoutInMs, TimeUnit.MILLISECONDS);
                    } catch (InterruptedException e) {
                        return null;
                    }
                }).get(0).getMessage().getOperation().name()).isEqualTo("HEARTBEAT");
                vitessReplicationConnection.close();
            } finally {
            }
        } finally {
            if (atomicReference.get() != null) {
                LOGGER.error("Error during streaming", (Throwable) atomicReference.get());
            }
        }
    }

    @Test
    public void shouldSendCommitTimestamp() throws Exception {
        VitessConnectorConfig vitessConnectorConfig = new VitessConnectorConfig(TestHelper.defaultConfig().build());
        VitessDatabaseSchema vitessDatabaseSchema = new VitessDatabaseSchema(vitessConnectorConfig, SchemaNameAdjuster.create(), DefaultTopicNamingStrategy.create(vitessConnectorConfig));
        AtomicReference atomicReference = new AtomicReference();
        try {
            VitessReplicationConnection vitessReplicationConnection = new VitessReplicationConnection(vitessConnectorConfig, vitessDatabaseSchema);
            try {
                Vgtid of = Vgtid.of(Binlogdata.VGtid.newBuilder().addShardGtids(Binlogdata.ShardGtid.newBuilder().setKeyspace(vitessConnectorConfig.getKeyspace()).setShard((String) vitessConnectorConfig.getShard().get(0)).setGtid("current").build()).build());
                ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(100);
                AtomicBoolean atomicBoolean = new AtomicBoolean(false);
                vitessReplicationConnection.startStreaming(of, (replicationMessage, vgtid) -> {
                    if (!atomicBoolean.get()) {
                        atomicBoolean.set(true);
                    }
                    arrayBlockingQueue.add(new MessageAndVgtid(replicationMessage, vgtid));
                }, atomicReference);
                ConditionFactory atMost = Awaitility.await().atMost(Duration.ofSeconds(TestHelper.waitTimeForRecords()));
                Objects.requireNonNull(atomicBoolean);
                atMost.until(atomicBoolean::get);
                arrayBlockingQueue.clear();
                MySQLConnection forTestDatabase = MySQLConnection.forTestDatabase(TestHelper.TEST_UNSHARDED_KEYSPACE);
                forTestDatabase.setAutoCommit(false);
                forTestDatabase.executeWithoutCommitting(new String[]{"BEGIN"});
                Thread.sleep(1000L);
                forTestDatabase.executeWithoutCommitting(new String[]{"INSERT INTO t1 (int_col) VALUES (1);"});
                Thread.sleep(1000L);
                forTestDatabase.executeWithoutCommitting(new String[]{"COMMIT"});
                List<MessageAndVgtid> awaitMessages = awaitMessages(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS, 3, () -> {
                    try {
                        return (MessageAndVgtid) arrayBlockingQueue.poll(this.pollTimeoutInMs, TimeUnit.MILLISECONDS);
                    } catch (InterruptedException e) {
                        return null;
                    }
                });
                awaitMessages.forEach(messageAndVgtid -> {
                    assertValidVgtid(messageAndVgtid.getVgtid(), vitessConnectorConfig.getKeyspace(), (String) vitessConnectorConfig.getShard().get(0));
                });
                Assertions.assertThat(awaitMessages.get(0).getMessage().getOperation().name()).isEqualTo("BEGIN");
                Instant commitTime = awaitMessages.get(0).getMessage().getCommitTime();
                Assertions.assertThat(awaitMessages.get(1).getMessage().getOperation().name()).isEqualTo("INSERT");
                Instant commitTime2 = awaitMessages.get(1).getMessage().getCommitTime();
                Assertions.assertThat(awaitMessages.get(2).getMessage().getOperation().name()).isEqualTo("COMMIT");
                Instant commitTime3 = awaitMessages.get(2).getMessage().getCommitTime();
                Assertions.assertThat(commitTime).isNotEqualTo(commitTime3);
                Assertions.assertThat(commitTime2).isEqualTo(commitTime3);
                forTestDatabase.executeWithoutCommitting(new String[]{"BEGIN"});
                Thread.sleep(1000L);
                forTestDatabase.executeWithoutCommitting(new String[]{"INSERT INTO t1 (int_col) VALUES (1);"});
                Thread.sleep(1000L);
                forTestDatabase.executeWithoutCommitting(new String[]{"COMMIT"});
                List<MessageAndVgtid> awaitMessages2 = awaitMessages(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS, 3, () -> {
                    try {
                        return (MessageAndVgtid) arrayBlockingQueue.poll(this.pollTimeoutInMs, TimeUnit.MILLISECONDS);
                    } catch (InterruptedException e) {
                        return null;
                    }
                });
                awaitMessages2.forEach(messageAndVgtid2 -> {
                    assertValidVgtid(messageAndVgtid2.getVgtid(), vitessConnectorConfig.getKeyspace(), (String) vitessConnectorConfig.getShard().get(0));
                });
                Assertions.assertThat(awaitMessages2.get(0).getMessage().getOperation().name()).isEqualTo("BEGIN");
                Instant commitTime4 = awaitMessages2.get(0).getMessage().getCommitTime();
                Assertions.assertThat(awaitMessages2.get(1).getMessage().getOperation().name()).isEqualTo("INSERT");
                Instant commitTime5 = awaitMessages2.get(1).getMessage().getCommitTime();
                Assertions.assertThat(awaitMessages2.get(2).getMessage().getOperation().name()).isEqualTo("COMMIT");
                Instant commitTime6 = awaitMessages2.get(2).getMessage().getCommitTime();
                Assertions.assertThat(commitTime4).isNotEqualTo(commitTime6);
                Assertions.assertThat(commitTime5).isEqualTo(commitTime6);
                Assertions.assertThat(commitTime3).isNotEqualTo(commitTime6);
                vitessReplicationConnection.close();
            } finally {
            }
        } finally {
            if (atomicReference.get() != null) {
                LOGGER.error("Error during streaming", (Throwable) atomicReference.get());
            }
        }
    }

    @Test
    public void shouldCopyAndReplicate() throws Exception {
        VitessConnectorConfig vitessConnectorConfig = new VitessConnectorConfig(TestHelper.defaultConfig().with(RelationalDatabaseConnectorConfig.TABLE_INCLUDE_LIST, "test_unsharded_keyspace.t1").build());
        VitessDatabaseSchema vitessDatabaseSchema = new VitessDatabaseSchema(vitessConnectorConfig, SchemaNameAdjuster.create(), DefaultTopicNamingStrategy.create(vitessConnectorConfig));
        AtomicReference atomicReference = new AtomicReference();
        try {
            VitessReplicationConnection vitessReplicationConnection = new VitessReplicationConnection(vitessConnectorConfig, vitessDatabaseSchema);
            try {
                Vgtid of = Vgtid.of(Binlogdata.VGtid.newBuilder().addShardGtids(Binlogdata.ShardGtid.newBuilder().setKeyspace(vitessConnectorConfig.getKeyspace()).setShard((String) vitessConnectorConfig.getShard().get(0)).setGtid("").build()).build());
                ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(100);
                vitessReplicationConnection.startStreaming(of, (replicationMessage, vgtid) -> {
                    arrayBlockingQueue.add(new MessageAndVgtid(replicationMessage, vgtid));
                }, atomicReference);
                List<MessageAndVgtid> awaitMessages = awaitMessages(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS, 3, () -> {
                    try {
                        return (MessageAndVgtid) arrayBlockingQueue.poll(this.pollTimeoutInMs, TimeUnit.MILLISECONDS);
                    } catch (InterruptedException e) {
                        return null;
                    }
                });
                awaitMessages.forEach(messageAndVgtid -> {
                    assertValidVgtid(messageAndVgtid.getVgtid(), vitessConnectorConfig.getKeyspace(), (String) vitessConnectorConfig.getShard().get(0));
                });
                Assertions.assertThat(awaitMessages.get(0).getMessage().getOperation().name()).isEqualTo("BEGIN");
                Assertions.assertThat(awaitMessages.get(1).getMessage().getOperation().name()).isEqualTo("INSERT");
                Assertions.assertThat(awaitMessages.get(2).getMessage().getOperation().name()).isEqualTo("COMMIT");
                arrayBlockingQueue.clear();
                TestHelper.execute("INSERT INTO t1 (int_col) VALUES (1);");
                List<MessageAndVgtid> awaitMessages2 = awaitMessages(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS, 3, () -> {
                    try {
                        return (MessageAndVgtid) arrayBlockingQueue.poll(this.pollTimeoutInMs, TimeUnit.MILLISECONDS);
                    } catch (InterruptedException e) {
                        return null;
                    }
                });
                awaitMessages2.forEach(messageAndVgtid2 -> {
                    assertValidVgtid(messageAndVgtid2.getVgtid(), vitessConnectorConfig.getKeyspace(), (String) vitessConnectorConfig.getShard().get(0));
                });
                Assertions.assertThat(awaitMessages2.get(0).getMessage().getOperation().name()).isEqualTo("BEGIN");
                Assertions.assertThat(awaitMessages2.get(1).getMessage().getOperation().name()).isEqualTo("INSERT");
                Assertions.assertThat(awaitMessages2.get(2).getMessage().getOperation().name()).isEqualTo("COMMIT");
                vitessReplicationConnection.close();
            } finally {
            }
        } finally {
            if (atomicReference.get() != null) {
                LOGGER.error("Error during streaming", (Throwable) atomicReference.get());
            }
        }
    }

    @Test
    @FixFor({"DBZ-4353"})
    public void shouldReturnUpdatedSchemaWithOnlineDdl() throws Exception {
        VitessConnectorConfig vitessConnectorConfig = new VitessConnectorConfig(TestHelper.defaultConfig().build());
        VitessDatabaseSchema vitessDatabaseSchema = new VitessDatabaseSchema(vitessConnectorConfig, SchemaNameAdjuster.create(), DefaultTopicNamingStrategy.create(vitessConnectorConfig));
        AtomicReference atomicReference = new AtomicReference();
        try {
            VitessReplicationConnection vitessReplicationConnection = new VitessReplicationConnection(vitessConnectorConfig, vitessDatabaseSchema);
            try {
                Vgtid of = Vgtid.of(Binlogdata.VGtid.newBuilder().addShardGtids(Binlogdata.ShardGtid.newBuilder().setKeyspace(vitessConnectorConfig.getKeyspace()).setShard((String) vitessConnectorConfig.getShard().get(0)).setGtid("current").build()).build());
                ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(200);
                AtomicBoolean atomicBoolean = new AtomicBoolean(false);
                vitessReplicationConnection.startStreaming(of, (replicationMessage, vgtid) -> {
                    if (!atomicBoolean.get()) {
                        atomicBoolean.set(true);
                    }
                    arrayBlockingQueue.add(new MessageAndVgtid(replicationMessage, vgtid));
                }, atomicReference);
                ConditionFactory atMost = Awaitility.await().atMost(Duration.ofSeconds(TestHelper.waitTimeForRecords()));
                Objects.requireNonNull(atomicBoolean);
                atMost.until(atomicBoolean::get);
                arrayBlockingQueue.clear();
                String applyOnlineDdl = TestHelper.applyOnlineDdl("ALTER TABLE t1 ADD COLUMN name varchar(64)", TestHelper.TEST_UNSHARDED_KEYSPACE);
                Awaitility.await().atMost(Duration.ofSeconds(TestHelper.waitTimeForRecords())).pollInterval(Duration.ofSeconds(1L)).until(() -> {
                    return Boolean.valueOf(TestHelper.checkOnlineDDL(TestHelper.TEST_UNSHARDED_KEYSPACE, applyOnlineDdl));
                });
                TestHelper.execute("UPDATE t1 SET name='abc' WHERE id=1");
                ReplicationMessage message = awaitOperation(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS, "UPDATE", () -> {
                    try {
                        return (MessageAndVgtid) arrayBlockingQueue.poll(TimeUnit.SECONDS.toMillis(5L), TimeUnit.MILLISECONDS);
                    } catch (InterruptedException e) {
                        return null;
                    }
                }).getMessage();
                Assertions.assertThat(message.getOperation().name()).isEqualTo("UPDATE");
                Assertions.assertThat(((ReplicationMessage.Column) message.getOldTupleList().get(0)).getName()).isEqualTo("id");
                Assertions.assertThat(((ReplicationMessage.Column) message.getOldTupleList().get(1)).getName()).isEqualTo("int_col");
                Assertions.assertThat(((ReplicationMessage.Column) message.getOldTupleList().get(2)).getName()).isEqualTo("name");
                Assertions.assertThat(((ReplicationMessage.Column) message.getOldTupleList().get(2)).getValue(false, TemporalPrecisionMode.ADAPTIVE_TIME_MICROSECONDS)).isNull();
                Assertions.assertThat(((ReplicationMessage.Column) message.getNewTupleList().get(0)).getName()).isEqualTo("id");
                Assertions.assertThat(((ReplicationMessage.Column) message.getNewTupleList().get(1)).getName()).isEqualTo("int_col");
                Assertions.assertThat(((ReplicationMessage.Column) message.getNewTupleList().get(2)).getName()).isEqualTo("name");
                Assertions.assertThat(((ReplicationMessage.Column) message.getNewTupleList().get(2)).getValue(false, TemporalPrecisionMode.ADAPTIVE_TIME_MICROSECONDS)).isEqualTo("abc");
                vitessReplicationConnection.close();
            } finally {
            }
        } finally {
            if (atomicReference.get() != null) {
                LOGGER.error("Error during streaming", (Throwable) atomicReference.get());
            }
        }
    }

    @Test
    public void shouldNotFailWhenTableNameIsReservedKeyword() throws Exception {
        TestHelper.execute((List<String>) Arrays.asList("DROP TABLE IF EXISTS `schemas`;", "CREATE TABLE `schemas` (id BIGINT NOT NULL AUTO_INCREMENT, int_col INT, PRIMARY KEY (id));"));
        TestHelper.execute("INSERT INTO `schemas` (int_col) VALUES (1);");
        VitessConnectorConfig vitessConnectorConfig = new VitessConnectorConfig(TestHelper.defaultConfig().with(RelationalDatabaseConnectorConfig.TABLE_INCLUDE_LIST, "test_unsharded_keyspace.schemas").build());
        VitessDatabaseSchema vitessDatabaseSchema = new VitessDatabaseSchema(vitessConnectorConfig, SchemaNameAdjuster.create(), DefaultTopicNamingStrategy.create(vitessConnectorConfig));
        AtomicReference atomicReference = new AtomicReference();
        VitessReplicationConnection vitessReplicationConnection = new VitessReplicationConnection(vitessConnectorConfig, vitessDatabaseSchema);
        try {
            Vgtid of = Vgtid.of(Binlogdata.VGtid.newBuilder().addShardGtids(Binlogdata.ShardGtid.newBuilder().setKeyspace(vitessConnectorConfig.getKeyspace()).setShard((String) vitessConnectorConfig.getShard().get(0)).setGtid("").build()).build());
            ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(100);
            vitessReplicationConnection.startStreaming(of, (replicationMessage, vgtid) -> {
                arrayBlockingQueue.add(new MessageAndVgtid(replicationMessage, vgtid));
            }, atomicReference);
            List<MessageAndVgtid> awaitMessages = awaitMessages(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS, 3, () -> {
                try {
                    return (MessageAndVgtid) arrayBlockingQueue.poll(this.pollTimeoutInMs, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                    return null;
                }
            });
            awaitMessages.forEach(messageAndVgtid -> {
                assertValidVgtid(messageAndVgtid.getVgtid(), vitessConnectorConfig.getKeyspace(), (String) vitessConnectorConfig.getShard().get(0));
            });
            Assertions.assertThat(awaitMessages.get(0).getMessage().getOperation().name()).isEqualTo("BEGIN");
            Assertions.assertThat(awaitMessages.get(1).getMessage().getOperation().name()).isEqualTo("INSERT");
            Assertions.assertThat(awaitMessages.get(2).getMessage().getOperation().name()).isEqualTo("COMMIT");
            Assertions.assertThat((Throwable) atomicReference.get()).isNull();
            vitessReplicationConnection.close();
        } catch (Throwable th) {
            try {
                vitessReplicationConnection.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    private void assertValidVgtid(Vgtid vgtid, String str, String str2) {
        Assertions.assertThat(vgtid.getShardGtids()).hasSize(1);
        Assertions.assertThat(((Vgtid.ShardGtid) vgtid.getShardGtids().iterator().next()).getKeyspace()).isEqualTo(str);
        Assertions.assertThat(((Vgtid.ShardGtid) vgtid.getShardGtids().iterator().next()).getShard()).isEqualTo(str2);
        String gtid = ((Vgtid.ShardGtid) vgtid.getShardGtids().iterator().next()).getGtid();
        Assertions.assertThat(gtid.startsWith("MySQL") || gtid.startsWith("MariaDB")).isTrue();
    }

    private static List<MessageAndVgtid> awaitMessages(long j, TimeUnit timeUnit, int i, Supplier<MessageAndVgtid> supplier) {
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        Awaitility.await().atMost(Duration.ofMillis(timeUnit.toMillis(j))).until(() -> {
            MessageAndVgtid messageAndVgtid = (MessageAndVgtid) supplier.get();
            if (messageAndVgtid == null) {
                return false;
            }
            if (concurrentLinkedQueue.size() >= i) {
                Assert.fail("received more events than expected");
            } else {
                concurrentLinkedQueue.add(messageAndVgtid);
            }
            return Boolean.valueOf(concurrentLinkedQueue.size() == i);
        });
        if (concurrentLinkedQueue.size() != i) {
            Assert.fail("Consumer is still expecting " + (i - concurrentLinkedQueue.size()) + " records, as it received only " + concurrentLinkedQueue.size());
        }
        return new ArrayList(concurrentLinkedQueue);
    }

    private static MessageAndVgtid awaitOperation(long j, TimeUnit timeUnit, String str, Supplier<MessageAndVgtid> supplier) {
        AtomicReference atomicReference = new AtomicReference();
        Awaitility.await().atMost(Duration.ofMillis(timeUnit.toMillis(j))).until(() -> {
            MessageAndVgtid messageAndVgtid = (MessageAndVgtid) supplier.get();
            if (messageAndVgtid == null || !messageAndVgtid.getMessage().getOperation().name().equals(str)) {
                return false;
            }
            atomicReference.set(messageAndVgtid);
            return true;
        });
        return (MessageAndVgtid) atomicReference.get();
    }
}
