package io.vertx.kafka.client.tests;

import io.vertx.core.Vertx;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.kafka.client.consumer.KafkaReadStream;
import io.vertx.kafka.client.producer.KafkaWriteStream;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:io/vertx/kafka/client/tests/TransactionalProducerTest.class */
public class TransactionalProducerTest extends KafkaClusterTestBase {
    private Vertx vertx;
    private KafkaWriteStream<String, String> producer;

    @BeforeClass
    public static void setUp() throws IOException {
        kafkaCluster = kafkaCluster(false).deleteDataPriorToStartup(true).addBrokers(3).startup();
    }

    @Before
    public void beforeTest() {
        this.vertx = Vertx.vertx();
    }

    @After
    public void afterTest(TestContext testContext) {
        close(testContext, this.producer);
        this.vertx.close().onComplete(testContext.asyncAssertSuccess());
    }

    @Before
    public void init(TestContext testContext) {
        Properties producerProperties = kafkaCluster.useTo().getProducerProperties("testTransactional_producer");
        producerProperties.put("key.serializer", StringSerializer.class);
        producerProperties.put("value.serializer", StringSerializer.class);
        producerProperties.put("transactional.id", "producer-1");
        producerProperties.put("enable.idempotence", "true");
        producerProperties.put("acks", "all");
        this.producer = producer(Vertx.vertx(), producerProperties);
        KafkaWriteStream<String, String> kafkaWriteStream = this.producer;
        Objects.requireNonNull(testContext);
        kafkaWriteStream.exceptionHandler(testContext::fail);
    }

    @Test
    public void producedRecordsAreSeenAfterTheyHaveBeenCommitted(TestContext testContext) {
        int i = 1000;
        Async async = testContext.async();
        AtomicInteger atomicInteger = new AtomicInteger();
        KafkaReadStream consumer = consumer("transactionalProduce");
        Objects.requireNonNull(testContext);
        consumer.exceptionHandler(testContext::fail);
        consumer.handler(consumerRecord -> {
            int andIncrement = atomicInteger.getAndIncrement();
            testContext.assertEquals("key-" + andIncrement, consumerRecord.key());
            testContext.assertEquals("value-" + andIncrement, consumerRecord.value());
            testContext.assertEquals("header_value-" + andIncrement, new String(((Header) consumerRecord.headers().headers("header_key").iterator().next()).value()));
            if (andIncrement == i) {
                async.complete();
            }
        });
        consumer.subscribe(Collections.singleton("transactionalProduce"));
        this.producer.initTransactions().onComplete(testContext.asyncAssertSuccess());
        this.producer.beginTransaction().onComplete(testContext.asyncAssertSuccess());
        for (int i2 = 0; i2 <= 1000; i2++) {
            this.producer.write(createRecord("transactionalProduce", i2)).onComplete(testContext.asyncAssertSuccess());
        }
        this.producer.commitTransaction().onComplete(testContext.asyncAssertSuccess());
    }

    @Test
    public void abortTransactionKeepsTopicEmpty(TestContext testContext) {
        Async async = testContext.async();
        this.producer.initTransactions().onComplete(testContext.asyncAssertSuccess());
        this.producer.beginTransaction().onComplete(testContext.asyncAssertSuccess());
        this.producer.write(createRecord("transactionalProduceAbort", 0)).onComplete(asyncResult -> {
            this.producer.abortTransaction().onComplete(testContext.asyncAssertSuccess());
            KafkaReadStream consumer = consumer("transactionalProduceAbort");
            Objects.requireNonNull(testContext);
            consumer.exceptionHandler(testContext::fail);
            consumer.subscribe(Collections.singleton("transactionalProduceAbort"));
            consumer.poll(Duration.ofSeconds(5L)).onComplete(asyncResult -> {
                testContext.assertTrue(((ConsumerRecords) asyncResult.result()).isEmpty());
                async.complete();
            });
        });
    }

    @Test
    public void transactionHandlingFailsIfInitWasNotCalled(TestContext testContext) {
        this.producer.beginTransaction().onComplete(testContext.asyncAssertFailure(th -> {
            testContext.assertTrue(th instanceof IllegalStateException);
        }));
        this.producer.commitTransaction().onComplete(testContext.asyncAssertFailure(th2 -> {
            testContext.assertTrue(th2 instanceof IllegalStateException);
        }));
        this.producer.abortTransaction().onComplete(testContext.asyncAssertFailure(th3 -> {
            testContext.assertTrue(th3 instanceof IllegalStateException);
        }));
    }

    @Test
    public void initTransactionsFailsOnWrongConfig(TestContext testContext) {
        Properties producerProperties = kafkaCluster.useTo().getProducerProperties("nonTransactionalProducer");
        producerProperties.put("key.serializer", StringSerializer.class);
        producerProperties.put("value.serializer", StringSerializer.class);
        KafkaWriteStream producer = producer(Vertx.vertx(), producerProperties);
        Objects.requireNonNull(testContext);
        producer.exceptionHandler(testContext::fail);
        producer.initTransactions().onComplete(testContext.asyncAssertFailure(th -> {
            testContext.assertTrue(th instanceof IllegalStateException);
        }));
    }

    private <K, V> KafkaReadStream<K, V> consumer(String str) {
        Properties consumerProperties = kafkaCluster.useTo().getConsumerProperties("group-" + str, "consumer-" + str, OffsetResetStrategy.EARLIEST);
        consumerProperties.put("key.deserializer", StringDeserializer.class);
        consumerProperties.put("value.deserializer", StringDeserializer.class);
        consumerProperties.put("isolation.level", "read_committed");
        return KafkaReadStream.create(this.vertx, consumerProperties);
    }

    private ProducerRecord<String, String> createRecord(String str, int i) {
        ProducerRecord<String, String> producerRecord = new ProducerRecord<>(str, 0, "key-" + i, "value-" + i);
        producerRecord.headers().add("header_key", ("header_value-" + i).getBytes());
        return producerRecord;
    }
}
