package io.debezium.connector.jdbc.junit.jupiter.e2e.source;

import com.github.dockerjava.api.command.LogContainerCmd;
import io.debezium.connector.jdbc.junit.jupiter.JdbcConnectionProvider;
import io.debezium.connector.jdbc.util.RandomTableNameGenerator;
import io.debezium.testing.testcontainers.Connector;
import io.debezium.testing.testcontainers.ConnectorConfiguration;
import io.debezium.testing.testcontainers.DebeziumContainer;
import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.testcontainers.containers.JdbcDatabaseContainer;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.output.FrameConsumerResultCallback;
import org.testcontainers.containers.output.OutputFrame;
import org.testcontainers.containers.output.WaitingConsumer;
import org.testcontainers.shaded.org.awaitility.Awaitility;

/* loaded from: input_file:io/debezium/connector/jdbc/junit/jupiter/e2e/source/Source.class */
public class Source extends JdbcConnectionProvider {
    private static final AtomicInteger sourceId = new AtomicInteger();
    private static final String ENABLE_TABLE_CDC = "IF EXISTS(select 1 from sys.tables where name = '#' AND is_tracked_by_cdc=0)\nEXEC sys.sp_cdc_enable_table @source_schema = N'%', @source_name = N'#', @role_name = NULL, @supports_net_changes = 0";
    private final Integer id;
    private final SourceType type;
    private final KafkaContainer kafka;
    private final DebeziumContainer connect;
    private final SourceConnectorOptions options;
    private final RandomTableNameGenerator tableNameGenerator;

    /* loaded from: input_file:io/debezium/connector/jdbc/junit/jupiter/e2e/source/Source$SourceConnectionInitializer.class */
    private static class SourceConnectionInitializer implements JdbcConnectionProvider.ConnectionInitializer {
        private final SourceType type;

        SourceConnectionInitializer(SourceType sourceType) {
            this.type = sourceType;
        }

        @Override // io.debezium.connector.jdbc.junit.jupiter.JdbcConnectionProvider.ConnectionInitializer
        public void initialize(Connection connection) throws SQLException {
            if (SourceType.SQLSERVER.is(this.type)) {
                Statement createStatement = connection.createStatement();
                try {
                    createStatement.execute("USE testDB");
                    if (createStatement != null) {
                        createStatement.close();
                    }
                } catch (Throwable th) {
                    if (createStatement != null) {
                        try {
                            createStatement.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            }
        }
    }

    public Source(SourceType sourceType, JdbcDatabaseContainer<?> jdbcDatabaseContainer, KafkaContainer kafkaContainer, DebeziumContainer debeziumContainer, SourceConnectorOptions sourceConnectorOptions, RandomTableNameGenerator randomTableNameGenerator) {
        super(jdbcDatabaseContainer, new SourceConnectionInitializer(sourceType));
        this.type = sourceType;
        this.id = Integer.valueOf(sourceId.getAndIncrement());
        this.kafka = kafkaContainer;
        this.connect = debeziumContainer;
        this.options = sourceConnectorOptions;
        this.tableNameGenerator = randomTableNameGenerator;
    }

    public SourceType getType() {
        return this.type;
    }

    public KafkaContainer getKafka() {
        return this.kafka;
    }

    public int getPort() {
        return getContainer().getFirstMappedPort().intValue();
    }

    public SourceConnectorOptions getOptions() {
        return this.options;
    }

    public String getSourceConnectorName() {
        return "jdbc-source-" + this.id;
    }

    public String randomTableName() {
        return randomObjectName();
    }

    public String randomObjectName() {
        return this.tableNameGenerator.randomName(12);
    }

    public void registerSourceConnector(ConnectorConfiguration connectorConfiguration) {
        waitUntilStreamingStarted(() -> {
            this.connect.registerConnector(getSourceConnectorName(), connectorConfiguration);
        });
    }

    protected void waitUntilStreamingStarted(Runnable runnable) {
        waitUntil("Starting streaming", runnable);
    }

    public void waitUntilDeleted() {
        Awaitility.await("Source connector deleted").atMost(60L, TimeUnit.SECONDS).until(() -> {
            try {
                return Boolean.valueOf(this.connect.getConnectorState(getSourceConnectorName()).equals(Connector.State.UNASSIGNED));
            } catch (IllegalStateException e) {
                if (e.getMessage().contains("No status found for connector jdbc-source")) {
                    return true;
                }
                throw e;
            }
        });
    }

    private void waitUntil(String str, Runnable runnable) {
        WaitingConsumer waitingConsumer = new WaitingConsumer();
        try {
            FrameConsumerResultCallback frameConsumerResultCallback = new FrameConsumerResultCallback();
            try {
                frameConsumerResultCallback.addConsumer(OutputFrame.OutputType.STDOUT, waitingConsumer);
                LogContainerCmd logContainerCmd = this.connect.getDockerClient().logContainerCmd(this.connect.getContainerId());
                try {
                    logContainerCmd.withFollowStream(true).withTail(0).withStdOut(true).exec(frameConsumerResultCallback);
                    if (runnable != null) {
                        try {
                            runnable.run();
                        } catch (Exception e) {
                            throw new IllegalStateException("WaitUntil callback failed", e);
                        }
                    }
                    try {
                        waitingConsumer.waitUntil(outputFrame -> {
                            return outputFrame.getUtf8String().contains(str);
                        }, 20, TimeUnit.SECONDS);
                        if (logContainerCmd != null) {
                            logContainerCmd.close();
                        }
                        frameConsumerResultCallback.close();
                    } catch (TimeoutException e2) {
                        throw new IllegalStateException("Failed to wait for '" + str + "'", e2);
                    }
                } catch (Throwable th) {
                    if (logContainerCmd != null) {
                        try {
                            logContainerCmd.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } finally {
            }
        } catch (IOException e3) {
            throw new RuntimeException("Wait failed for message '" + str + "'", e3);
        }
    }

    public void streamTable(String str) throws Exception {
        if (SourceType.SQLSERVER == this.type) {
            execute(ENABLE_TABLE_CDC.replace("#", str).replace("%", "dbo"));
        } else if (SourceType.ORACLE == this.type) {
            execute("ALTER TABLE " + str + " ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS");
        }
    }

    public void queryContainerTable(String str) throws Exception {
        ArrayList arrayList = new ArrayList();
        arrayList.add("docker");
        arrayList.add("exec");
        arrayList.add("-i");
        arrayList.add("--tty=false");
        arrayList.add(getContainerName());
        switch (getType()) {
            case MYSQL:
                arrayList.add("mysql");
                arrayList.add("--user=" + getUsername());
                arrayList.add("--password=" + getPassword());
                arrayList.add("test");
                arrayList.add("-e");
                arrayList.add("SELECT * FROM " + str);
                break;
            case POSTGRES:
                arrayList.add("psql");
                arrayList.add("-U");
                arrayList.add(getUsername());
                arrayList.add("-w");
                arrayList.add("test");
                arrayList.add("-c");
                arrayList.add("show time zone; select * from public." + str);
                break;
            case ORACLE:
                arrayList.add("bash");
                arrayList.add("-c");
                arrayList.add("echo \"select * from " + str + ";\" | sqlplus debezium/dbz@ORCLPDB1");
                break;
            case SQLSERVER:
                arrayList.add("/opt/mssql-tools/bin/sqlcmd");
                arrayList.add("-U");
                arrayList.add(getUsername());
                arrayList.add("-P");
                arrayList.add(getPassword());
                arrayList.add("-d");
                arrayList.add("testDB");
                arrayList.add("-Q");
                arrayList.add("select * from " + str + ";");
                break;
        }
        queryContainer("Source", arrayList);
    }
}
