package io.vertx.kafka.client.tests;

import io.debezium.kafka.KafkaCluster;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.kafka.client.common.TopicPartition;
import io.vertx.kafka.client.consumer.KafkaConsumer;
import io.vertx.kafka.client.producer.KafkaProducer;
import io.vertx.kafka.client.producer.KafkaProducerRecord;
import java.util.LinkedList;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BooleanSupplier;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:io/vertx/kafka/client/tests/CleanupTest.class */
public class CleanupTest extends KafkaClusterTestBase {
    private Vertx vertx;
    private int numKafkaProducerNetworkThread;
    private int numKafkaConsumerNetworkThread;
    private int numVertxKafkaConsumerThread;

    /* loaded from: input_file:io/vertx/kafka/client/tests/CleanupTest$TheVerticle.class */
    public static class TheVerticle extends AbstractVerticle {
        public void start(Promise<Void> promise) throws Exception {
            Properties properties = new Properties();
            properties.putAll(this.context.config().getMap());
            KafkaProducer.createShared(this.vertx, "the-name", properties).write(KafkaProducerRecord.create("the_topic", "the_value")).onComplete(asyncResult -> {
                promise.handle(asyncResult.map((Void) null));
            });
        }
    }

    @Before
    public void beforeTest() {
        this.vertx = Vertx.vertx();
        this.numKafkaProducerNetworkThread = countThreads("kafka-producer-network-thread");
        this.numKafkaConsumerNetworkThread = countThreads("kafka-consumer-network-thread");
        this.numKafkaConsumerNetworkThread = countThreads("vert.x-kafka-consumer-thread");
    }

    @After
    public void afterTest(TestContext testContext) {
        this.vertx.close().onComplete(testContext.asyncAssertSuccess());
    }

    private int countThreads(String str) {
        return (int) Thread.getAllStackTraces().keySet().stream().filter(thread -> {
            return thread.getName().contains(str);
        }).count();
    }

    private void assertNoThreads(TestContext testContext, String str) {
        testContext.assertEquals(0, Integer.valueOf(countThreads(str)));
    }

    private void waitUntil(BooleanSupplier booleanSupplier) {
        waitUntil(null, booleanSupplier);
    }

    private void waitUntil(String str, BooleanSupplier booleanSupplier) {
        long currentTimeMillis = System.currentTimeMillis();
        while (!booleanSupplier.getAsBoolean()) {
            Assert.assertTrue(str, System.currentTimeMillis() - currentTimeMillis < 10000);
        }
    }

    @Test
    public void testSharedProducer(TestContext testContext) {
        Properties producerProperties = kafkaCluster.useTo().getProducerProperties("the_producer");
        producerProperties.put("key.serializer", StringSerializer.class.getName());
        producerProperties.put("value.serializer", StringSerializer.class.getName());
        Async async = testContext.async(3);
        LinkedList linkedList = new LinkedList();
        for (int i = 0; i < 3; i++) {
            KafkaProducer createShared = KafkaProducer.createShared(this.vertx, "the-name", producerProperties);
            createShared.write(KafkaProducerRecord.create("the_topic", "the_value")).onComplete(testContext.asyncAssertSuccess(r3 -> {
                async.countDown();
            }));
            linkedList.add(createShared);
        }
        async.awaitSuccess(10000L);
        Async async2 = testContext.async();
        kafkaCluster.useTo().consumeStrings("the_topic", 3, 10L, TimeUnit.SECONDS, () -> {
            Objects.requireNonNull(async2);
            close(testContext, linkedList, async2::complete);
        });
        async2.awaitSuccess(10000L);
        waitUntil(() -> {
            return countThreads("kafka-producer-network-thread") == this.numKafkaProducerNetworkThread;
        });
    }

    private void close(TestContext testContext, LinkedList<KafkaProducer<String, String>> linkedList, Runnable runnable) {
        close(this.numKafkaProducerNetworkThread, testContext, linkedList, runnable);
    }

    private void close(int i, TestContext testContext, LinkedList<KafkaProducer<String, String>> linkedList, Runnable runnable) {
        if (linkedList.size() <= 0) {
            runnable.run();
        } else {
            testContext.assertEquals(Integer.valueOf(i + 1), Integer.valueOf(countThreads("kafka-producer-network-thread")));
            linkedList.removeFirst().close().onComplete(testContext.asyncAssertSuccess(r9 -> {
                close(testContext, linkedList, runnable);
            }));
        }
    }

    @Test
    public void testSharedProducerCleanupInVerticle(TestContext testContext) {
        Properties producerProperties = kafkaCluster.useTo().getProducerProperties("the_producer");
        producerProperties.put("key.serializer", StringSerializer.class.getName());
        producerProperties.put("value.serializer", StringSerializer.class.getName());
        Async async = testContext.async(3);
        AtomicReference atomicReference = new AtomicReference();
        this.vertx.deployVerticle(TheVerticle.class.getName(), new DeploymentOptions().setInstances(3).setConfig(new JsonObject(producerProperties))).onComplete(testContext.asyncAssertSuccess(str -> {
            atomicReference.set(str);
            async.complete();
        }));
        async.awaitSuccess(10000L);
        Async async2 = testContext.async();
        kafkaCluster.useTo().consumeStrings("the_topic", 3, 10L, TimeUnit.SECONDS, () -> {
            this.vertx.undeploy((String) atomicReference.get()).onComplete(testContext.asyncAssertSuccess(r3 -> {
                async2.complete();
            }));
        });
        async2.awaitSuccess(10000L);
        waitUntil(() -> {
            return countThreads("kafka-producer-network-thread") == this.numKafkaProducerNetworkThread;
        });
    }

    @Test
    public void testCleanupInProducer(final TestContext testContext) {
        final Properties producerProperties = kafkaCluster.useTo().getProducerProperties("the_producer");
        producerProperties.put("key.serializer", StringSerializer.class);
        producerProperties.put("value.serializer", StringSerializer.class);
        Async async = testContext.async();
        final AtomicReference atomicReference = new AtomicReference();
        AtomicReference atomicReference2 = new AtomicReference();
        this.vertx.deployVerticle(new AbstractVerticle() { // from class: io.vertx.kafka.client.tests.CleanupTest.1
            public void start() throws Exception {
                KafkaProducer create = KafkaProducer.create(this.vertx, producerProperties);
                atomicReference.set(create);
                create.write(KafkaProducerRecord.create("the_topic", "the_value")).onComplete(testContext.asyncAssertSuccess());
            }
        }).onComplete(testContext.asyncAssertSuccess(str -> {
            atomicReference2.set(str);
            async.complete();
        }));
        async.awaitSuccess(15000L);
        Async async2 = testContext.async();
        kafkaCluster.useTo().consumeStrings("the_topic", 1, 10L, TimeUnit.SECONDS, () -> {
            this.vertx.undeploy((String) atomicReference2.get()).onComplete(testContext.asyncAssertSuccess(r3 -> {
                async2.complete();
            }));
        });
        async2.awaitSuccess(10000L);
        waitUntil(() -> {
            return countThreads("kafka-producer-network-thread") == this.numKafkaProducerNetworkThread;
        });
    }

    @Test
    public void testCleanupInConsumer(final TestContext testContext) {
        final String str = "testCleanupInConsumer";
        final Properties consumerProperties = kafkaCluster.useTo().getConsumerProperties("testCleanupInConsumer_consumer", "testCleanupInConsumer_consumer", OffsetResetStrategy.EARLIEST);
        consumerProperties.put("key.deserializer", StringDeserializer.class);
        consumerProperties.put("value.deserializer", StringDeserializer.class);
        final Async async = testContext.async(2);
        Async async2 = testContext.async();
        this.vertx.deployVerticle(new AbstractVerticle() { // from class: io.vertx.kafka.client.tests.CleanupTest.2
            boolean deployed = false;

            public void start(Promise<Void> promise) {
                KafkaConsumer create = KafkaConsumer.create(this.vertx, consumerProperties);
                this.deployed = true;
                TestContext testContext2 = testContext;
                Async async3 = async;
                create.handler(kafkaConsumerRecord -> {
                    if (this.deployed) {
                        this.deployed = false;
                        this.vertx.undeploy(this.context.deploymentID()).onComplete(testContext2.asyncAssertSuccess(r3 -> {
                            async3.countDown();
                        }));
                    }
                });
                create.assign(new TopicPartition(str, 0)).onComplete(promise);
            }
        }).onComplete(testContext.asyncAssertSuccess(str2 -> {
            async2.complete();
        }));
        async2.awaitSuccess(10000L);
        KafkaCluster.Usage useTo = kafkaCluster.useTo();
        StringSerializer stringSerializer = new StringSerializer();
        StringSerializer stringSerializer2 = new StringSerializer();
        Objects.requireNonNull(async);
        useTo.produce("testCleanupInConsumer_producer", 100, stringSerializer, stringSerializer2, async::countDown, () -> {
            return new ProducerRecord(str, "the_value");
        });
        async.awaitSuccess(10000L);
        waitUntil("Expected " + countThreads("vert.x-kafka-consumer-thread") + " == " + this.numVertxKafkaConsumerThread, () -> {
            return countThreads("vert.x-kafka-consumer-thread") == this.numKafkaConsumerNetworkThread;
        });
    }

    @Test
    public void testUndeployUnassignedConsumer(TestContext testContext) {
        final Properties consumerProperties = kafkaCluster.useTo().getConsumerProperties("testUndeployUnassignedConsumer_consumer", "testUndeployUnassignedConsumer_consumer", OffsetResetStrategy.EARLIEST);
        consumerProperties.put("key.deserializer", StringDeserializer.class);
        consumerProperties.put("value.deserializer", StringDeserializer.class);
        Async async = testContext.async(1);
        this.vertx.deployVerticle(new AbstractVerticle() { // from class: io.vertx.kafka.client.tests.CleanupTest.3
            public void start() {
                KafkaConsumer.create(this.vertx, consumerProperties);
            }
        }).onComplete(testContext.asyncAssertSuccess(str -> {
            this.vertx.undeploy(str).onComplete(testContext.asyncAssertSuccess(r3 -> {
                async.complete();
            }));
        }));
        async.awaitSuccess(10000L);
        waitUntil("Expected " + countThreads("vert.x-kafka-consumer-thread") + " == " + this.numVertxKafkaConsumerThread, () -> {
            return countThreads("vert.x-kafka-consumer-thread") == this.numKafkaConsumerNetworkThread;
        });
    }
}
