package io.strimzi.kafka.metrics.prometheus.integration;

import io.strimzi.kafka.metrics.prometheus.ClientMetricsReporter;
import io.strimzi.kafka.metrics.prometheus.MetricsUtils;
import io.strimzi.kafka.metrics.prometheus.http.Listener;
import io.strimzi.test.container.StrimziConnectCluster;
import io.strimzi.test.container.StrimziKafkaCluster;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
import org.testcontainers.utility.MountableFile;

/* loaded from: input_file:io/strimzi/kafka/metrics/prometheus/integration/TestConnectMetricsIT.class */
public class TestConnectMetricsIT {
    private static final String GROUP_ID = "my-cluster";
    private static final String CLIENT_IDS_PATTERN = "client_id=\"my-cluster-(configs|offsets|statuses)\".*";
    private static final String ADMIN_ID_PATTERN = "client_id=\"my-cluster-shared-admin\".*";
    private static final String CONNECT_ID_PATTERN = "connect.*";
    private static final String TOPIC = "topic-to-export";
    private static final String FILE = "/tmp/file";
    private static final String SINK_CONNECTOR = "file-sink";
    private static final String SINK_CONNECTOR_PATTERN = "connector=\"file-sink\".*";
    private static final String SINK_CONSUMER_ID = "client_id=\"connector-consumer-file-sink.*";
    private static final String SOURCE_CONNECTOR = "file-source";
    private static final String SOURCE_CONNECTOR_PATTERN = "connector=\"file-source\".*";
    private static final String SOURCE_PRODUCER_ID = "client_id=\"connector-producer-file-source.*";
    private StrimziKafkaCluster kafka;
    private StrimziConnectCluster connect;
    private static final int PORT = Listener.parseListener("http://:8080").port;
    private static final List<String> CONNECT_PATTERNS = List.of((Object[]) new String[]{"jvm_.*", "process_.*", "kafka_admin_client_app_info_.*client_id=\"my-cluster-shared-admin\".*", "kafka_admin_client_kafka_metrics_.*client_id=\"my-cluster-shared-admin\".*", "kafka_admin_client_admin_client_metrics_.*client_id=\"my-cluster-shared-admin\".*", "kafka_admin_client_admin_client_node_metrics_.*client_id=\"my-cluster-shared-admin\".*", "kafka_consumer_app_info_.*client_id=\"my-cluster-(configs|offsets|statuses)\".*", "kafka_consumer_kafka_metrics_.*client_id=\"my-cluster-(configs|offsets|statuses)\".*", "kafka_consumer_consumer_metrics_.*client_id=\"my-cluster-(configs|offsets|statuses)\".*", "kafka_consumer_consumer_node_metrics_.*client_id=\"my-cluster-(configs|offsets|statuses)\".*", "kafka_consumer_consumer_coordinator_metrics_.*client_id=\"my-cluster-(configs|offsets|statuses)\".*", "kafka_consumer_consumer_fetch_manager_metrics_.*client_id=\"my-cluster-(configs|offsets|statuses)\".*", "kafka_producer_app_info_.*client_id=\"my-cluster-(configs|offsets|statuses)\".*", "kafka_producer_kafka_metrics_.*client_id=\"my-cluster-(configs|offsets|statuses)\".*", "kafka_producer_producer_metrics_.*client_id=\"my-cluster-(configs|offsets|statuses)\".*", "kafka_producer_producer_node_metrics_.*client_id=\"my-cluster-(configs|offsets|statuses)\".*", "kafka_producer_producer_topic_metrics_.*client_id=\"my-cluster-(configs|offsets|statuses)\".*", "kafka_connect_app_info_.*connect.*", "kafka_connect_connect_coordinator_metrics_.*connect.*", "kafka_connect_connect_metrics_.*connect.*", "kafka_connect_connect_node_metrics_.*connect.*", "kafka_connect_connect_worker_metrics_.*connect.*", "kafka_connect_connect_worker_rebalance_metrics_.*connect.*", "kafka_connect_kafka_metrics_.*connect.*"});
    private static final List<String> SINK_PATTERNS = List.of((Object[]) new String[]{"kafka_connect_connector_metrics_.*connector=\"file-sink\".*", "kafka_connect_connector_task_metrics_.*connector=\"file-sink\".*", "kafka_connect_sink_task_metrics_.*connector=\"file-sink\".*", "kafka_connect_task_error_metrics_.*connector=\"file-sink\".*", "kafka_connect_connect_worker_metrics_connector_count 1.0", "kafka_consumer_app_info_.*client_id=\"connector-consumer-file-sink.*", "kafka_consumer_kafka_metrics_.*client_id=\"connector-consumer-file-sink.*", "kafka_consumer_consumer_metrics_.*client_id=\"connector-consumer-file-sink.*", "kafka_consumer_consumer_node_metrics_.*client_id=\"connector-consumer-file-sink.*", "kafka_consumer_consumer_coordinator_metrics_.*client_id=\"connector-consumer-file-sink.*", "kafka_consumer_consumer_fetch_manager_metrics_.*client_id=\"connector-consumer-file-sink.*"});
    private static final List<String> SOURCE_PATTERNS = List.of("kafka_connect_connector_metrics_.*connector=\"file-source\".*", "kafka_connect_connector_task_metrics_.*connector=\"file-source\".*", "kafka_connect_source_task_metrics_.*connector=\"file-source\".*", "kafka_connect_task_error_metrics_.*connector=\"file-source\".*", "kafka_connect_connect_worker_metrics_connector_count 2.0", "kafka_producer_app_info_.*client_id=\"connector-producer-file-source.*", "kafka_producer_kafka_metrics_.*client_id=\"connector-producer-file-source.*", "kafka_producer_producer_metrics_.*client_id=\"connector-producer-file-source.*", "kafka_producer_producer_node_metrics_.*client_id=\"connector-producer-file-source.*", "kafka_producer_producer_topic_metrics_.*client_id=\"connector-producer-file-source.*");

    @BeforeEach
    public void setUp() {
        this.kafka = new StrimziKafkaCluster.StrimziKafkaClusterBuilder().withNumberOfBrokers(1).withSharedNetwork().build();
        this.kafka.start();
        Admin create = Admin.create(Map.of("bootstrap.servers", this.kafka.getBootstrapServers()));
        try {
            create.createTopics(List.of(new NewTopic(TOPIC, 1, (short) -1)));
            if (create != null) {
                create.close();
            }
            KafkaProducer kafkaProducer = new KafkaProducer(Map.of("bootstrap.servers", this.kafka.getBootstrapServers(), "key.serializer", StringSerializer.class.getName(), "value.serializer", StringSerializer.class.getName()));
            for (int i = 0; i < 5; i++) {
                try {
                    kafkaProducer.send(new ProducerRecord(TOPIC, "record" + i));
                } catch (Throwable th) {
                    try {
                        kafkaProducer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                    throw th;
                }
            }
            kafkaProducer.close();
        } catch (Throwable th3) {
            if (create != null) {
                try {
                    create.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    @AfterEach
    public void tearDown() {
        if (this.connect != null) {
            this.connect.stop();
        }
        if (this.kafka != null) {
            this.kafka.stop();
        }
    }

    private void setupConnect(Map<String, String> map) {
        HashMap hashMap = new HashMap(map);
        hashMap.put("metric.reporters", ClientMetricsReporter.class.getName());
        this.connect = new StrimziConnectCluster.StrimziConnectClusterBuilder().withGroupId(GROUP_ID).withKafkaCluster(this.kafka).withAdditionalConnectConfiguration(hashMap).build();
        Iterator it = this.connect.getWorkers().iterator();
        while (it.hasNext()) {
            ((GenericContainer) it.next()).withCopyFileToContainer(MountableFile.forHostPath(MetricsUtils.REPORTER_JARS), MetricsUtils.MOUNT_PATH).withExposedPorts(new Integer[]{8083, Integer.valueOf(PORT)}).withEnv(Map.of("CLASSPATH", "/opt/strimzi/metrics-reporter/*")).waitingFor(new HttpWaitStrategy().forPath("/health").forStatusCode(200));
        }
        this.connect.start();
    }

    @Test
    public void testConnectMetrics() {
        setupConnect(Map.of("consumer.metric.reporters", ClientMetricsReporter.class.getName(), "producer.metric.reporters", ClientMetricsReporter.class.getName(), "admin.metric.reporters", ClientMetricsReporter.class.getName()));
        checkMetricsExist(CONNECT_PATTERNS);
        MetricsUtils.startConnector(this.connect, SINK_CONNECTOR, "{\n  \"connector.class\":\"org.apache.kafka.connect.file.FileStreamSinkConnector\",\n  \"topics\": \"topic-to-export\",\n  \"file\": \"/tmp/file\"\n}", 1);
        checkMetricsExist(SINK_PATTERNS);
        MetricsUtils.startConnector(this.connect, SOURCE_CONNECTOR, "{\n  \"connector.class\":\"org.apache.kafka.connect.file.FileStreamSourceConnector\",\n  \"topic\": \"topic-to-export\",\n  \"file\": \"/tmp/file\"\n}", 1);
        checkMetricsExist(SOURCE_PATTERNS);
    }

    @Test
    public void testConnectMetricsWithAllowlist() {
        setupConnect(Map.of("consumer.metric.reporters", ClientMetricsReporter.class.getName(), "producer.metric.reporters", ClientMetricsReporter.class.getName(), "admin.metric.reporters", ClientMetricsReporter.class.getName(), "prometheus.metrics.reporter.allowlist", "kafka_connect_connect_worker_.*,kafka_connect_connector_metrics_.*,kafka_admin_client_admin_client_metrics_.*,kafka_consumer_kafka_metrics_.*,kafka_producer_producer_node_metrics_.*", "admin.prometheus.metrics.reporter.allowlist", "kafka_admin_client_admin_client_metrics.*", "consumer.prometheus.metrics.reporter.allowlist", "kafka_consumer_app_info_.*", "producer.prometheus.metrics.reporter.allowlist", "kafka_producer_producer_metrics_.*"));
        List<String> of = List.of("jvm_.*", "process_.*", "kafka_admin_client_admin_client_metrics_.*client_id=\"my-cluster-shared-admin\".*", "kafka_consumer_kafka_metrics_.*client_id=\"my-cluster-(configs|offsets|statuses)\".*", "kafka_producer_producer_node_metrics_.*client_id=\"my-cluster-(configs|offsets|statuses)\".*", "kafka_connect_connect_worker_metrics_.*connect.*", "kafka_connect_connect_worker_rebalance_metrics_.*connect.*");
        checkMetricsExist(of);
        ArrayList arrayList = new ArrayList(CONNECT_PATTERNS);
        arrayList.removeAll(of);
        checkMetricsDontExist(arrayList);
        MetricsUtils.startConnector(this.connect, SINK_CONNECTOR, "{\n  \"connector.class\":\"org.apache.kafka.connect.file.FileStreamSinkConnector\",\n  \"topics\": \"topic-to-export\",\n  \"file\": \"/tmp/file\"\n}", 1);
        List<String> of2 = List.of("kafka_connect_connector_metrics_.*connector=\"file-sink\".*", "kafka_connect_connect_worker_metrics_connector_count 1.0", "kafka_consumer_app_info_.*client_id=\"connector-consumer-file-sink.*");
        checkMetricsExist(of2);
        ArrayList arrayList2 = new ArrayList(SINK_PATTERNS);
        arrayList2.removeAll(of2);
        checkMetricsDontExist(arrayList2);
        MetricsUtils.startConnector(this.connect, SOURCE_CONNECTOR, "{\n  \"connector.class\":\"org.apache.kafka.connect.file.FileStreamSourceConnector\",\n  \"topic\": \"topic-to-export\",\n  \"file\": \"/tmp/file\"\n}", 1);
        List<String> of3 = List.of("kafka_connect_connector_metrics_.*connector=\"file-source\".*", "kafka_connect_connect_worker_metrics_connector_count 2.0", "kafka_producer_producer_metrics_.*client_id=\"connector-producer-file-source.*");
        checkMetricsExist(of3);
        ArrayList arrayList3 = new ArrayList(of3);
        arrayList3.removeAll(of3);
        checkMetricsDontExist(arrayList3);
    }

    private void checkMetricsExist(List<String> list) {
        Iterator it = this.connect.getWorkers().iterator();
        while (it.hasNext()) {
            MetricsUtils.verify((GenericContainer) it.next(), list, PORT, list2 -> {
                Assertions.assertFalse(list2.isEmpty());
            });
        }
    }

    private void checkMetricsDontExist(List<String> list) {
        Iterator it = this.connect.getWorkers().iterator();
        while (it.hasNext()) {
            MetricsUtils.verify((GenericContainer) it.next(), list, PORT, list2 -> {
                Assertions.assertTrue(list2.isEmpty());
            });
        }
    }
}
