package io.vertx.kafka.client.tests;

import io.vertx.core.Context;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import io.vertx.kafka.client.common.impl.CloseHandler;
import io.vertx.kafka.client.consumer.KafkaReadStream;
import io.vertx.kafka.client.producer.KafkaProducer;
import io.vertx.kafka.client.producer.KafkaProducerRecord;
import io.vertx.kafka.client.producer.KafkaWriteStream;
import io.vertx.kafka.client.producer.impl.KafkaProducerImpl;
import java.util.Collections;
import java.util.HashMap;
import java.util.Properties;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;

@RunWith(VertxUnitRunner.class)
/* loaded from: input_file:io/vertx/kafka/client/tests/ProducerMockTest.class */
public class ProducerMockTest {
    private Vertx vertx;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/vertx/kafka/client/tests/ProducerMockTest$SimulatedWriteException.class */
    public static class SimulatedWriteException extends Exception {
        private SimulatedWriteException() {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/vertx/kafka/client/tests/ProducerMockTest$TestProducer.class */
    public static class TestProducer extends MockProducer<String, String> {
        public TestProducer() {
            super(false, new StringSerializer(), new StringSerializer());
        }

        public void assertCompleteNext() {
            while (!completeNext()) {
                Thread.yield();
            }
        }

        public void assertErrorNext(RuntimeException runtimeException) {
            while (!errorNext(runtimeException)) {
                Thread.yield();
            }
        }
    }

    /* loaded from: input_file:io/vertx/kafka/client/tests/ProducerMockTest$TestProducerWriteError.class */
    private static class TestProducerWriteError extends MockProducer<String, String> {
        public TestProducerWriteError() {
            super(false, new StringSerializer(), new StringSerializer());
        }

        public synchronized Future<RecordMetadata> send(ProducerRecord<String, String> producerRecord, Callback callback) {
            callback.onCompletion((RecordMetadata) null, new SimulatedWriteException());
            return super.send(producerRecord, callback);
        }
    }

    @Before
    public void beforeTest() {
        this.vertx = Vertx.vertx();
    }

    @After
    public void afterTest(TestContext testContext) {
        this.vertx.close(testContext.asyncAssertSuccess());
    }

    @Test
    public void testProducerDrain(TestContext testContext) throws Exception {
        testProducerDrain(testContext, null);
    }

    @Test
    public void testProducerFailureDrain(TestContext testContext) throws Exception {
        testProducerDrain(testContext, new RuntimeException());
    }

    private void testProducerDrain(TestContext testContext, RuntimeException runtimeException) throws Exception {
        TestProducer testProducer = new TestProducer();
        KafkaWriteStream producer = ProducerTest.producer(Vertx.vertx(), (Producer) testProducer);
        int i = 0;
        while (!producer.writeQueueFull()) {
            producer.write(new ProducerRecord("the_topic", 0, 0L, "abc", "def"));
            i++;
        }
        Async async = testContext.async();
        producer.drainHandler(r5 -> {
            testContext.assertTrue(Context.isOnVertxThread());
            testContext.assertTrue(Context.isOnEventLoopThread());
            async.complete();
        });
        for (int i2 = 0; i2 < i / 2; i2++) {
            if (runtimeException != null) {
                testProducer.assertErrorNext(runtimeException);
            } else {
                testProducer.assertCompleteNext();
            }
        }
        if (runtimeException != null) {
            testProducer.assertErrorNext(runtimeException);
        } else {
            testProducer.assertCompleteNext();
        }
        Assert.assertFalse(producer.writeQueueFull());
    }

    @Test
    public void testProducerError(TestContext testContext) throws Exception {
        TestProducer testProducer = new TestProducer();
        KafkaWriteStream producer = ProducerTest.producer(Vertx.vertx(), (Producer) testProducer);
        producer.write(new ProducerRecord("the_topic", 0, 0L, "abc", "def"));
        RuntimeException runtimeException = new RuntimeException();
        Async async = testContext.async();
        producer.exceptionHandler(th -> {
            testContext.assertEquals(runtimeException, th);
            async.complete();
        });
        testProducer.assertErrorNext(runtimeException);
    }

    public void testProducerConsumer(TestContext testContext) throws Exception {
        int i = 100;
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        KafkaWriteStream producer = ProducerTest.producer(this.vertx, properties);
        for (int i2 = 0; i2 < 100; i2++) {
            producer.write(new ProducerRecord("abc-def", 0, 0L, "the_key_" + i2, "the_value_" + i2));
        }
        producer.close();
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", "localhost:9092");
        hashMap.put("zookeeper.connect", "localhost:2181");
        hashMap.put("group.id", "test_group_2");
        hashMap.put("enable.auto.commit", "false");
        hashMap.put("auto.offset.reset", "earliest");
        KafkaReadStream create = KafkaReadStream.create(this.vertx, hashMap);
        create.subscribe(Collections.singleton("abc-def"));
        AtomicInteger atomicInteger = new AtomicInteger();
        Async async = testContext.async();
        create.handler(consumerRecord -> {
            if (atomicInteger.incrementAndGet() == i) {
                async.complete();
            }
        });
    }

    @Test
    public void testWriteWithSimulatedError(TestContext testContext) throws Exception {
        Async async = testContext.async();
        KafkaProducer create = KafkaProducer.create(this.vertx, new TestProducerWriteError());
        KafkaProducerRecord create2 = KafkaProducerRecord.create("myTopic", "test");
        Context orCreateContext = this.vertx.getOrCreateContext();
        orCreateContext.exceptionHandler(th -> {
            if (th instanceof SimulatedWriteException) {
                async.complete();
            } else {
                testContext.fail(th);
            }
        });
        create.write(create2);
        try {
            async.awaitSuccess();
            orCreateContext.exceptionHandler((Handler) null);
        } catch (Throwable th2) {
            orCreateContext.exceptionHandler((Handler) null);
            throw th2;
        }
    }

    @Test
    public void testCloseHandlerTimeout(TestContext testContext) {
        long j = 100;
        Async async = testContext.async(1);
        CloseHandler closeHandler = new CloseHandler((l, handler) -> {
            testContext.assertEquals(Long.valueOf(j), l);
            async.countDown();
        });
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", "localhost:9092");
        new KafkaProducerImpl(this.vertx, KafkaWriteStream.create(this.vertx, hashMap, new StringSerializer(), new StringSerializer()), closeHandler).close(100L, asyncResult -> {
            testContext.assertEquals(0, Integer.valueOf(async.count()));
            async.complete();
        });
    }
}
