package io.strimzi.kafka.metrics.prometheus.integration;

import io.strimzi.kafka.metrics.prometheus.ClientMetricsReporter;
import io.strimzi.kafka.metrics.prometheus.MetricsUtils;
import io.strimzi.kafka.metrics.prometheus.http.Listener;
import io.strimzi.test.container.StrimziKafkaCluster;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.GenericContainer;

/* loaded from: input_file:io/strimzi/kafka/metrics/prometheus/integration/TestStreamsMetricsIT.class */
public class TestStreamsMetricsIT {
    private static final int PORT = Listener.parseListener("http://:8080").port;
    private StrimziKafkaCluster cluster;
    private Map<String, String> env;

    @BeforeEach
    public void setUp() throws Exception {
        this.cluster = new StrimziKafkaCluster.StrimziKafkaClusterBuilder().withNumberOfBrokers(1).withSharedNetwork().build();
        this.cluster.start();
        Admin create = Admin.create(Map.of("bootstrap.servers", this.cluster.getBootstrapServers()));
        try {
            create.createTopics(List.of(new NewTopic("source-topic", 1, (short) -1), new NewTopic("target-topic", 1, (short) -1))).all().get();
            if (create != null) {
                create.close();
            }
            KafkaProducer kafkaProducer = new KafkaProducer(Map.of("bootstrap.servers", this.cluster.getBootstrapServers(), "key.serializer", StringSerializer.class.getName(), "value.serializer", StringSerializer.class.getName()));
            for (int i = 0; i < 10; i++) {
                try {
                    kafkaProducer.send(new ProducerRecord("source-topic", "record" + i));
                } catch (Throwable th) {
                    try {
                        kafkaProducer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                    throw th;
                }
            }
            kafkaProducer.flush();
            kafkaProducer.close();
            this.env = new HashMap();
            this.env.put("CLIENT_TYPE", "KafkaStreams");
            this.env.put("BOOTSTRAP_SERVERS", this.cluster.getNetworkBootstrapServers());
            this.env.put("APPLICATION_ID", "my-app-id");
            this.env.put("SOURCE_TOPIC", "source-topic");
            this.env.put("TARGET_TOPIC", "target-topic");
            this.env.put("ADDITIONAL_CONFIG", "metric.reporters=" + ClientMetricsReporter.class.getName());
            this.env.put("CLASSPATH", "/opt/strimzi/metrics-reporter/*");
        } catch (Throwable th3) {
            if (create != null) {
                try {
                    create.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    @AfterEach
    public void tearDown() {
        this.cluster.stop();
    }

    @Test
    public void testStreamsMetrics() {
        GenericContainer<?> clientContainer = MetricsUtils.clientContainer(this.env, PORT);
        try {
            clientContainer.start();
            MetricsUtils.verify(clientContainer, List.of((Object[]) new String[]{"jvm_.*", "process_.*", "kafka_admin_client_app_info_.*", "kafka_admin_client_kafka_metrics_.*", "kafka_admin_client_admin_client_metrics_.*", "kafka_admin_client_admin_client_node_metrics_.*", "kafka_consumer_app_info_.*", "kafka_consumer_kafka_metrics_.*", "kafka_consumer_consumer_metrics_.*", "kafka_consumer_consumer_node_metrics_.*", "kafka_consumer_consumer_coordinator_metrics_.*", "kafka_consumer_consumer_fetch_manager_metrics_.*", "kafka_producer_app_info_.*", "kafka_producer_kafka_metrics_.*", "kafka_producer_producer_metrics_.*", "kafka_producer_producer_node_metrics_.*", "kafka_producer_producer_topic_metrics_.*", "kafka_streams_stream_metrics_.*", "kafka_streams_stream_processor_node_metrics_.*", "kafka_streams_stream_state_updater_metrics_.*", "kafka_streams_stream_task_metrics_.*", "kafka_streams_stream_thread_metrics_.*", "kafka_streams_stream_topic_metrics_.*"}), PORT, list -> {
                Assertions.assertFalse(list.isEmpty());
            });
            if (clientContainer != null) {
                clientContainer.close();
            }
        } catch (Throwable th) {
            if (clientContainer != null) {
                try {
                    clientContainer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testStreamsMetricsWithAllowlist() {
        this.env.put("ADDITIONAL_CONFIG", "metric.reporters=" + ClientMetricsReporter.class.getName() + "\nprometheus.metrics.reporter.allowlist=kafka_consumer_.*,kafka_streams_stream_metrics_.*");
        GenericContainer<?> clientContainer = MetricsUtils.clientContainer(this.env, PORT);
        try {
            clientContainer.start();
            MetricsUtils.verify(clientContainer, List.of("jvm_.*", "process_.*", "kafka_consumer_app_info_.*", "kafka_consumer_kafka_metrics_.*", "kafka_consumer_consumer_metrics_.*", "kafka_consumer_consumer_node_metrics_.*", "kafka_consumer_consumer_coordinator_metrics_.*", "kafka_consumer_consumer_fetch_manager_metrics_.*", "kafka_streams_stream_metrics_.*"), PORT, list -> {
                Assertions.assertFalse(list.isEmpty());
            });
            MetricsUtils.verify(clientContainer, List.of((Object[]) new String[]{"kafka_admin_client_app_info_.*", "kafka_admin_client_kafka_metrics_.*", "kafka_admin_client_admin_client_metrics_.*", "kafka_admin_client_admin_client_node_metrics_.*", "kafka_producer_app_info_.*", "kafka_producer_kafka_metrics_.*", "kafka_producer_producer_metrics_.*", "kafka_producer_producer_node_metrics_.*", "kafka_producer_producer_topic_metrics_.*", "kafka_streams_stream_processor_node_metrics_.*", "kafka_streams_stream_state_updater_metrics_.*", "kafka_streams_stream_task_metrics_.*", "kafka_streams_stream_thread_metrics_.*", "kafka_streams_stream_topic_metrics_.*"}), PORT, list2 -> {
                Assertions.assertTrue(list2.isEmpty());
            });
            if (clientContainer != null) {
                clientContainer.close();
            }
        } catch (Throwable th) {
            if (clientContainer != null) {
                try {
                    clientContainer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }
}
