package io.vertx.kafka.client.tests;

import io.debezium.kafka.KafkaCluster;
import io.vertx.core.Context;
import io.vertx.core.Vertx;
import io.vertx.core.impl.ContextInternal;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.kafka.client.producer.KafkaProducer;
import io.vertx.kafka.client.producer.KafkaProducerRecord;
import io.vertx.kafka.client.producer.KafkaWriteStream;
import io.vertx.kafka.client.producer.impl.KafkaProducerImpl;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BooleanSupplier;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:io/vertx/kafka/client/tests/ProducerTest.class */
public class ProducerTest extends KafkaClusterTestBase {
    private Vertx vertx;
    private KafkaWriteStream<String, String> producer;

    /* loaded from: input_file:io/vertx/kafka/client/tests/ProducerTest$TestInterceptor.class */
    public static class TestInterceptor implements ProducerInterceptor<String, String> {
        static final List<Context> list = Collections.synchronizedList(new ArrayList());

        public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {
            list.add(Vertx.currentContext());
            return producerRecord;
        }

        public void onAcknowledgement(RecordMetadata recordMetadata, Exception exc) {
        }

        public void close() {
        }

        public void configure(Map<String, ?> map) {
        }
    }

    @Before
    public void beforeTest() {
        this.vertx = Vertx.vertx();
    }

    @After
    public void afterTest(TestContext testContext) {
        close(testContext, this.producer);
        this.vertx.close(testContext.asyncAssertSuccess());
    }

    @Test
    public void testStreamProduce(TestContext testContext) throws Exception {
        Properties producerProperties = kafkaCluster.useTo().getProducerProperties("testStreamProduce_producer");
        producerProperties.put("key.serializer", StringSerializer.class);
        producerProperties.put("value.serializer", StringSerializer.class);
        this.producer = producer(Vertx.vertx(), producerProperties);
        KafkaWriteStream<String, String> kafkaWriteStream = this.producer;
        Objects.requireNonNull(testContext);
        kafkaWriteStream.exceptionHandler(testContext::fail);
        for (int i = 0; i < 100000; i++) {
            ProducerRecord producerRecord = new ProducerRecord("testStreamProduce", 0, "key-" + i, "value-" + i);
            producerRecord.headers().add("header_key", ("header_value-" + i).getBytes());
            this.producer.write(producerRecord);
        }
        assertReceiveMessages(testContext, "testStreamProduce", 100000);
    }

    @Test
    public void testProducerProduce(TestContext testContext) throws Exception {
        Properties producerProperties = kafkaCluster.useTo().getProducerProperties("testProducerProduce_producer");
        producerProperties.put("key.serializer", StringSerializer.class);
        producerProperties.put("value.serializer", StringSerializer.class);
        this.producer = producer(Vertx.vertx(), producerProperties);
        KafkaWriteStream<String, String> kafkaWriteStream = this.producer;
        Objects.requireNonNull(testContext);
        kafkaWriteStream.exceptionHandler(testContext::fail);
        KafkaProducerImpl kafkaProducerImpl = new KafkaProducerImpl(this.vertx, this.producer);
        for (int i = 0; i < 100000; i++) {
            kafkaProducerImpl.write(KafkaProducerRecord.create("testProducerProduce", "key-" + i, "value-" + i, 0).addHeader("header_key", "header_value-" + i));
        }
        assertReceiveMessages(testContext, "testProducerProduce", 100000);
    }

    private void assertReceiveMessages(TestContext testContext, String str, int i) {
        Async async = testContext.async();
        AtomicInteger atomicInteger = new AtomicInteger();
        KafkaCluster.Usage useTo = kafkaCluster.useTo();
        BooleanSupplier booleanSupplier = () -> {
            return atomicInteger.get() < i;
        };
        Objects.requireNonNull(async);
        useTo.consumeStrings(booleanSupplier, async::complete, Collections.singleton(str), consumerRecord -> {
            int andIncrement = atomicInteger.getAndIncrement();
            testContext.assertEquals("key-" + andIncrement, consumerRecord.key());
            testContext.assertEquals("value-" + andIncrement, consumerRecord.value());
            testContext.assertEquals("header_value-" + andIncrement, new String(((Header) consumerRecord.headers().headers("header_key").iterator().next()).value()));
        });
    }

    @Test
    public void testBlockingBroker(TestContext testContext) throws Exception {
        Async async = testContext.async();
        this.vertx.createNetServer().connectHandler(netSocket -> {
        }).listen(9091, testContext.asyncAssertSuccess(netServer -> {
            async.complete();
        }));
        async.awaitSuccess(10000L);
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:" + 9091);
        properties.setProperty("acks", "1");
        properties.put("key.serializer", StringSerializer.class);
        properties.put("value.serializer", StringSerializer.class);
        properties.put("max.block.ms", 2000);
        this.producer = producer(Vertx.vertx(), properties);
        this.producer.write(new ProducerRecord("testBlockingBroker", 0, "key", "value"), testContext.asyncAssertFailure());
    }

    @Test
    public void testBrokerConnectionError(TestContext testContext) throws Exception {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9091");
        properties.setProperty("acks", "1");
        properties.put("key.serializer", StringSerializer.class);
        properties.put("value.serializer", StringSerializer.class);
        properties.put("max.block.ms", 2000);
        this.producer = producer(Vertx.vertx(), properties);
        this.producer.write(new ProducerRecord("testBrokerConnectionError", 0, "key", "value"), testContext.asyncAssertFailure());
    }

    @Test
    public void testExceptionHandler(TestContext testContext) throws Exception {
        Async async = testContext.async();
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.put("key.serializer", StringSerializer.class);
        properties.put("value.serializer", StringSerializer.class);
        KafkaProducer.create(Vertx.vertx(), properties).exceptionHandler(th -> {
            async.complete();
        }).write(KafkaProducerRecord.create("topic", "key", new Date()));
    }

    @Test
    public void testNotExistingPartition(TestContext testContext) {
        Async async = testContext.async(2);
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.put("key.serializer", StringSerializer.class);
        properties.put("value.serializer", StringSerializer.class);
        KafkaProducer.create(Vertx.vertx(), properties).exceptionHandler(th -> {
            async.countDown();
        }).send(KafkaProducerRecord.create("topic", (Object) null, "value", 1000), asyncResult -> {
            async.countDown();
        });
    }

    @Test
    public void testInterceptor(TestContext testContext) throws Exception {
        Properties producerProperties = kafkaCluster.useTo().getProducerProperties("testStreamProduce_producer");
        producerProperties.put("key.serializer", StringSerializer.class);
        producerProperties.put("value.serializer", StringSerializer.class);
        producerProperties.put("interceptor.classes", Collections.singletonList(TestInterceptor.class));
        this.producer = producer(this.vertx, producerProperties);
        KafkaWriteStream<String, String> kafkaWriteStream = this.producer;
        Objects.requireNonNull(testContext);
        kafkaWriteStream.exceptionHandler(testContext::fail);
        ProducerRecord producerRecord = new ProducerRecord("testInterceptor", 0, "key-0", "value-0");
        ContextInternal duplicate = this.vertx.getOrCreateContext().duplicate();
        Async async = testContext.async();
        duplicate.runOnContext(r11 -> {
            TestInterceptor.list.clear();
            this.producer.write(producerRecord).onComplete(testContext.asyncAssertSuccess(r8 -> {
                testContext.assertEquals(1, Integer.valueOf(TestInterceptor.list.size()));
                testContext.assertEquals(duplicate, TestInterceptor.list.get(0));
                async.complete();
            }));
        });
    }
}
