package io.strimzi.kafka.metrics.prometheus.integration;

import io.strimzi.kafka.metrics.prometheus.ClientMetricsReporter;
import io.strimzi.kafka.metrics.prometheus.MetricsUtils;
import io.strimzi.kafka.metrics.prometheus.http.Listener;
import io.strimzi.test.container.StrimziConnectCluster;
import io.strimzi.test.container.StrimziKafkaCluster;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
import org.testcontainers.utility.MountableFile;

/* loaded from: input_file:io/strimzi/kafka/metrics/prometheus/integration/TestMirrorMakerMetricsIT.class */
public class TestMirrorMakerMetricsIT {
    private static final int PORT = Listener.parseListener("http://:8080").port;
    private static final String CONNECT_ID = "my-cluster";
    private static final String TOPIC = "input";
    private static final String GROUP = "my-group";
    private static final String SOURCE_CONNECTOR = "source";
    private static final String CHECKPOINT_CONNECTOR = "checkpoint";
    private StrimziKafkaCluster kafka;
    private StrimziConnectCluster connect;

    @BeforeEach
    public void setUp() throws Exception {
        this.kafka = new StrimziKafkaCluster.StrimziKafkaClusterBuilder().withNumberOfBrokers(1).withSharedNetwork().build();
        this.kafka.start();
        this.connect = new StrimziConnectCluster.StrimziConnectClusterBuilder().withGroupId(CONNECT_ID).withKafkaCluster(this.kafka).withAdditionalConnectConfiguration(Map.of("metric.reporters", ClientMetricsReporter.class.getName())).build();
        Iterator it = this.connect.getWorkers().iterator();
        while (it.hasNext()) {
            ((GenericContainer) it.next()).withCopyFileToContainer(MountableFile.forHostPath(MetricsUtils.REPORTER_JARS), MetricsUtils.MOUNT_PATH).withExposedPorts(new Integer[]{8083, Integer.valueOf(PORT)}).withEnv(Map.of("CLASSPATH", "/opt/strimzi/metrics-reporter/*")).waitingFor(new HttpWaitStrategy().forPath("/health").forStatusCode(200));
        }
        this.connect.start();
        Admin create = Admin.create(Map.of("bootstrap.servers", this.kafka.getBootstrapServers()));
        try {
            create.createTopics(List.of(new NewTopic(TOPIC, 2, (short) -1))).all().get();
            create.alterConsumerGroupOffsets(GROUP, Map.of(new TopicPartition(TOPIC, 0), new OffsetAndMetadata(1L))).all().get();
            create.alterConsumerGroupOffsets("my-group-2", Map.of(new TopicPartition(TOPIC, 0), new OffsetAndMetadata(1L))).all().get();
            if (create != null) {
                create.close();
            }
            KafkaProducer kafkaProducer = new KafkaProducer(Map.of("bootstrap.servers", this.kafka.getBootstrapServers(), "key.serializer", StringSerializer.class.getName(), "value.serializer", StringSerializer.class.getName()));
            for (int i = 0; i < 5; i++) {
                try {
                    kafkaProducer.send(new ProducerRecord(TOPIC, Integer.valueOf(i % 2), (Object) null, "record" + i));
                } catch (Throwable th) {
                    try {
                        kafkaProducer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                    throw th;
                }
            }
            kafkaProducer.close();
        } catch (Throwable th3) {
            if (create != null) {
                try {
                    create.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    @AfterEach
    public void tearDown() {
        if (this.connect != null) {
            this.connect.stop();
        }
        if (this.kafka != null) {
            this.kafka.stop();
        }
    }

    @Test
    public void testMirrorMakerConnectorMetrics() {
        List<String> of = List.of((Object[]) new String[]{"kafka_connect_mirror_mirrorsourceconnector_byte_count" + ".*partition=\"\\d+\",source=\"source\",target=\"target\",topic=\"source.input\".*", "kafka_connect_mirror_mirrorsourceconnector_byte_rate" + ".*partition=\"\\d+\",source=\"source\",target=\"target\",topic=\"source.input\".*", "kafka_connect_mirror_mirrorsourceconnector_record_age_ms" + ".*partition=\"\\d+\",source=\"source\",target=\"target\",topic=\"source.input\".*", "kafka_connect_mirror_mirrorsourceconnector_record_age_ms_avg" + ".*partition=\"\\d+\",source=\"source\",target=\"target\",topic=\"source.input\".*", "kafka_connect_mirror_mirrorsourceconnector_record_age_ms_max" + ".*partition=\"\\d+\",source=\"source\",target=\"target\",topic=\"source.input\".*", "kafka_connect_mirror_mirrorsourceconnector_record_age_ms_min" + ".*partition=\"\\d+\",source=\"source\",target=\"target\",topic=\"source.input\".*", "kafka_connect_mirror_mirrorsourceconnector_record_count" + ".*partition=\"\\d+\",source=\"source\",target=\"target\",topic=\"source.input\".*", "kafka_connect_mirror_mirrorsourceconnector_record_rate" + ".*partition=\"\\d+\",source=\"source\",target=\"target\",topic=\"source.input\".*", "kafka_connect_mirror_mirrorsourceconnector_replication_latency_ms" + ".*partition=\"\\d+\",source=\"source\",target=\"target\",topic=\"source.input\".*", "kafka_connect_mirror_mirrorsourceconnector_replication_latency_ms_avg" + ".*partition=\"\\d+\",source=\"source\",target=\"target\",topic=\"source.input\".*", "kafka_connect_mirror_mirrorsourceconnector_replication_latency_ms_max" + ".*partition=\"\\d+\",source=\"source\",target=\"target\",topic=\"source.input\".*", "kafka_connect_mirror_mirrorsourceconnector_replication_latency_ms_min" + ".*partition=\"\\d+\",source=\"source\",target=\"target\",topic=\"source.input\".*"});
        MetricsUtils.startConnector(this.connect, SOURCE_CONNECTOR, "{\n  \"name\": \"source\",\n  \"connector.class\": \"org.apache.kafka.connect.mirror.MirrorSourceConnector\",\n  \"tasks.max\": \"2\",\n  \"key.converter\": \"org.apache.kafka.connect.converters.ByteArrayConverter\",\n  \"value.converter\": \"org.apache.kafka.connect.converters.ByteArrayConverter\",\n  \"source.cluster.alias\": \"source\",\n  \"target.cluster.alias\": \"target\",\n  \"source.cluster.bootstrap.servers\": \"" + this.kafka.getNetworkBootstrapServers() + "\",\n  \"target.cluster.bootstrap.servers\": \"" + this.kafka.getNetworkBootstrapServers() + "\",\n  \"replication.factor\": \"-1\",\n  \"offset-syncs.topic.replication.factor\": \"-1\",\n  \"refresh.topics.interval.seconds\": \"1\",\n  \"topics\": \"input\",\n  \"metric.reporters\": \"" + ClientMetricsReporter.class.getName() + "\",\n  \"prometheus.metrics.reporter.listener.enable\": \"false\"}", 2);
        checkMetricsExist(of);
        List<String> of2 = List.of("kafka_connect_mirror_mirrorcheckpointconnector_checkpoint_latency_ms" + ".*group=\".*\",partition=\"\\d+\",source=\"source\",target=\"target\",topic=\"source.input\".*", "kafka_connect_mirror_mirrorcheckpointconnector_checkpoint_latency_ms_avg" + ".*group=\".*\",partition=\"\\d+\",source=\"source\",target=\"target\",topic=\"source.input\".*", "kafka_connect_mirror_mirrorcheckpointconnector_checkpoint_latency_ms_max" + ".*group=\".*\",partition=\"\\d+\",source=\"source\",target=\"target\",topic=\"source.input\".*", "kafka_connect_mirror_mirrorcheckpointconnector_checkpoint_latency_ms_min" + ".*group=\".*\",partition=\"\\d+\",source=\"source\",target=\"target\",topic=\"source.input\".*");
        MetricsUtils.startConnector(this.connect, CHECKPOINT_CONNECTOR, "{\n  \"name\": \"checkpoint\",\n  \"connector.class\": \"org.apache.kafka.connect.mirror.MirrorCheckpointConnector\",\n  \"tasks.max\": \"2\",\n  \"key.converter\": \"org.apache.kafka.connect.converters.ByteArrayConverter\",\n  \"value.converter\": \"org.apache.kafka.connect.converters.ByteArrayConverter\",\n  \"source.cluster.alias\": \"source\",\n  \"target.cluster.alias\": \"target\",\n  \"source.cluster.bootstrap.servers\": \"" + this.kafka.getNetworkBootstrapServers() + "\",\n  \"target.cluster.bootstrap.servers\": \"" + this.kafka.getNetworkBootstrapServers() + "\",\n  \"checkpoints.topic.replication.factor\": \"-1\",\n  \"emit.checkpoints.interval.seconds\": \"1\",\n  \"refresh.groups.interval.seconds\": \"1\",\n  \"metric.reporters\": \"" + ClientMetricsReporter.class.getName() + "\",\n  \"prometheus.metrics.reporter.listener.enable\": \"false\"}", 2);
        checkMetricsExist(of2);
    }

    private void checkMetricsExist(List<String> list) {
        Iterator it = this.connect.getWorkers().iterator();
        while (it.hasNext()) {
            MetricsUtils.verify((GenericContainer) it.next(), list, PORT, list2 -> {
                Assertions.assertFalse(list2.isEmpty());
            });
        }
    }
}
