package io.debezium.connector.spanner.util;

import com.fasterxml.jackson.databind.node.ObjectNode;
import io.debezium.util.Testing;
import java.io.File;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.admin.AdminClient;
import org.testcontainers.containers.ContainerState;
import org.testcontainers.containers.DockerComposeContainer;
import org.testcontainers.containers.wait.strategy.Wait;

/* loaded from: input_file:io/debezium/connector/spanner/util/KafkaEnvironment.class */
public class KafkaEnvironment {
    private static final String KAFKA_BROKER_SERVICE_NAME = "broker_1";
    private static final int KAFKA_BROKER_SERVICE_API_PORT = 9092;
    public static final Duration STARTUP_TIMEOUT = Duration.ofSeconds(200);
    public static final Duration STARTUP_CONNECTOR_TIMEOUT = Duration.ofSeconds(600);
    public static final Duration CONFIGURE_CONNECTOR_TIMEOUT = Duration.ofSeconds(200);
    public static final String DOCKER_COMPOSE_FILE = "src/test/java/io/debezium/connector/spanner/util/docker-compose.yml";
    public static final KafkaEnvironment TEST_KAFKA_ENVIRONMENT = new KafkaEnvironment(DOCKER_COMPOSE_FILE);
    private boolean isStarted = false;
    private DockerComposeContainer composeContainer;
    private KafkaBrokerApi<ObjectNode, ObjectNode> brokerApiOn;
    private KafkaBrokerApi<GenericRecord, GenericRecord> brokerApiGr;

    public KafkaEnvironment(String str) {
        Testing.Print.enable();
        Testing.print("Initializing kafka environment for IT test...");
        this.composeContainer = new DockerComposeContainer(new File[]{new File(str)}).withExposedService(KAFKA_BROKER_SERVICE_NAME, KAFKA_BROKER_SERVICE_API_PORT, Wait.forListeningPort().withStartupTimeout(STARTUP_TIMEOUT));
        Testing.print("Finished initializing kafka environment.");
    }

    public void start() {
        Testing.print("Starting Kafka environment");
        this.composeContainer.start();
        this.brokerApiOn = KafkaBrokerApi.createKafkaBrokerApiObjectNode((ContainerState) this.composeContainer.getContainerByServiceName(KAFKA_BROKER_SERVICE_NAME).orElseThrow(), KAFKA_BROKER_SERVICE_API_PORT);
    }

    public KafkaBrokerApi<ObjectNode, ObjectNode> kafkaBrokerApiOn() {
        return this.brokerApiOn;
    }

    public boolean isStarted() {
        return this.isStarted;
    }

    public void setStarted() {
        this.isStarted = true;
    }

    public void clearTopics() {
        try {
            AdminClient createAdminClient = kafkaBrokerApiOn().createAdminClient();
            try {
                Set set = (Set) createAdminClient.listTopics().names().get();
                List asList = Arrays.asList("_kafka-connect-configs", "_kafka-connect-offsets", "_kafka-connect-status", "_kafka-connect-status", "_schemas", "_confluent-command", "_confluent_balancer_api_state", "_confluent-metrics", "__consumer_offsets", "_confluent-telemetry-metrics", "_rebalancing_topic_spanner_connector_testing-connector");
                Objects.requireNonNull(set);
                asList.forEach((v1) -> {
                    r1.remove(v1);
                });
                createAdminClient.deleteTopics(set);
                if (createAdminClient != null) {
                    createAdminClient.close();
                }
            } finally {
            }
        } catch (Exception e) {
            Testing.print(e.getMessage());
        }
    }
}
