package io.vertx.kafka.client.tests;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.Context;
import io.vertx.core.Vertx;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.kafka.client.consumer.KafkaReadStream;
import io.vertx.kafka.client.producer.KafkaWriteStream;
import java.util.Collections;
import java.util.Objects;
import java.util.Properties;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:io/vertx/kafka/client/tests/ProducerConsumerContextTest.class */
public class ProducerConsumerContextTest extends KafkaClusterTestBase {
    private Vertx vertx;
    private KafkaWriteStream<String, String> producer;
    private KafkaReadStream<String, String> consumer;

    @Before
    public void beforeTest() {
        this.vertx = Vertx.vertx();
    }

    @After
    public void afterTest(TestContext testContext) {
        close(testContext, this.producer);
        this.vertx.close(testContext.asyncAssertSuccess());
    }

    @Test
    public void testStreamProducerConsumerContexts(final TestContext testContext) throws Exception {
        final String str = "testStreamProduceContexts";
        final Properties producerProperties = kafkaCluster.useTo().getProducerProperties("testStreamProduceContexts_producer");
        producerProperties.put("key.serializer", StringSerializer.class);
        producerProperties.put("value.serializer", StringSerializer.class);
        producerProperties.put("key.deserializer", StringDeserializer.class);
        producerProperties.put("value.deserializer", StringDeserializer.class);
        producerProperties.put("group.id", "testStreamProduceContexts_consumer");
        producerProperties.put("auto.offset.reset", "earliest");
        producerProperties.put("enable.auto.commit", Boolean.TRUE.toString());
        producerProperties.put("client.id", "testStreamProduceContexts_client");
        final Async async = testContext.async(200);
        this.vertx.deployVerticle(new AbstractVerticle() { // from class: io.vertx.kafka.client.tests.ProducerConsumerContextTest.1
            public void start() throws Exception {
                ProducerConsumerContextTest.this.producer = KafkaTestBase.producer(this.vertx, producerProperties);
                KafkaWriteStream kafkaWriteStream = ProducerConsumerContextTest.this.producer;
                TestContext testContext2 = testContext;
                Objects.requireNonNull(testContext2);
                kafkaWriteStream.exceptionHandler(testContext2::fail);
                final Context context = this.context;
                this.vertx.deployVerticle(new AbstractVerticle() { // from class: io.vertx.kafka.client.tests.ProducerConsumerContextTest.1.1
                    public void start() throws Exception {
                        for (int i = 0; i < 100; i++) {
                            ProducerRecord producerRecord = new ProducerRecord(str, 0, "key-" + i, "value-" + i);
                            producerRecord.headers().add("header_key", ("header_value-" + i).getBytes());
                            KafkaWriteStream kafkaWriteStream2 = ProducerConsumerContextTest.this.producer;
                            TestContext testContext3 = testContext;
                            Context context2 = context;
                            Async async2 = async;
                            kafkaWriteStream2.write(producerRecord, asyncResult -> {
                                testContext3.assertEquals(this.context, Vertx.currentContext());
                                testContext3.assertNotEquals(context2, Vertx.currentContext());
                                async2.countDown();
                            });
                        }
                    }
                });
            }
        });
        this.vertx.deployVerticle(new AbstractVerticle() { // from class: io.vertx.kafka.client.tests.ProducerConsumerContextTest.2
            public void start() {
                ProducerConsumerContextTest.this.consumer = KafkaReadStream.create(this.vertx, producerProperties);
                KafkaReadStream kafkaReadStream = ProducerConsumerContextTest.this.consumer;
                TestContext testContext2 = testContext;
                Objects.requireNonNull(testContext2);
                kafkaReadStream.exceptionHandler(testContext2::fail);
                final Context context = this.context;
                this.vertx.deployVerticle(new AbstractVerticle() { // from class: io.vertx.kafka.client.tests.ProducerConsumerContextTest.2.1
                    public void start() {
                        KafkaReadStream kafkaReadStream2 = ProducerConsumerContextTest.this.consumer;
                        TestContext testContext3 = testContext;
                        Context context2 = context;
                        Async async2 = async;
                        kafkaReadStream2.handler(consumerRecord -> {
                            testContext3.assertNotEquals(context2, Vertx.currentContext());
                            testContext3.assertEquals(context2, Vertx.currentContext().unwrap());
                            testContext3.assertNotEquals(this.context, Vertx.currentContext());
                            async2.countDown();
                        });
                        ProducerConsumerContextTest.this.consumer.subscribe(Collections.singleton(str));
                        ProducerConsumerContextTest.this.consumer.resume();
                    }
                });
            }
        });
        async.await();
    }
}
