package io.vertx.kafka.client.tests;

import io.debezium.kafka.KafkaCluster;
import io.vertx.core.Context;
import io.vertx.core.Vertx;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.kafka.client.consumer.KafkaConsumer;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
import io.vertx.kafka.client.consumer.KafkaConsumerRecords;
import io.vertx.kafka.client.consumer.KafkaReadStream;
import io.vertx.kafka.client.consumer.impl.KafkaConsumerImpl;
import io.vertx.kafka.client.producer.KafkaHeader;
import java.io.PrintStream;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InvalidGroupIdException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:io/vertx/kafka/client/tests/ConsumerTestBase.class */
public abstract class ConsumerTestBase extends KafkaClusterTestBase {
    private Vertx vertx;
    private KafkaReadStream<String, String> consumer;
    private KafkaReadStream<String, String> consumer2;

    @Before
    public void beforeTest() {
        this.vertx = Vertx.vertx();
    }

    @After
    public void afterTest(TestContext testContext) {
        close(testContext, this.consumer);
        close(testContext, this.consumer2);
        this.consumer = null;
        this.consumer2 = null;
        this.vertx.close(testContext.asyncAssertSuccess());
    }

    @Test
    public void testConsume(TestContext testContext) throws Exception {
        String str = "testConsume-" + getClass().getName();
        Async async = testContext.async();
        AtomicInteger atomicInteger = new AtomicInteger();
        KafkaCluster.Usage useTo = kafkaCluster.useTo();
        Objects.requireNonNull(async);
        useTo.produceStrings(1000, async::complete, () -> {
            return new ProducerRecord(str, 0, "key-" + atomicInteger.get(), "value-" + atomicInteger.getAndIncrement());
        });
        async.awaitSuccess(20000L);
        Properties consumerProperties = kafkaCluster.useTo().getConsumerProperties(str, str, OffsetResetStrategy.EARLIEST);
        consumerProperties.put("key.deserializer", StringDeserializer.class);
        consumerProperties.put("value.deserializer", StringDeserializer.class);
        this.consumer = createConsumer(this.vertx, consumerProperties);
        Async async2 = testContext.async();
        AtomicInteger atomicInteger2 = new AtomicInteger(1000);
        KafkaReadStream<String, String> kafkaReadStream = this.consumer;
        Objects.requireNonNull(testContext);
        kafkaReadStream.exceptionHandler(testContext::fail);
        this.consumer.handler(consumerRecord -> {
            if (atomicInteger2.decrementAndGet() == 0) {
                async2.complete();
            }
        });
        this.consumer.subscribe(Collections.singleton(str));
    }

    @Test
    public void testConsumePattern(TestContext testContext) throws Exception {
        String str = "testConsumePattern1-" + getClass().getName();
        String str2 = "testConsumePattern2-" + getClass().getName();
        String str3 = str + "-" + str2;
        Async async = testContext.async(2);
        AtomicInteger atomicInteger = new AtomicInteger();
        KafkaCluster.Usage useTo = kafkaCluster.useTo();
        Objects.requireNonNull(async);
        useTo.produceStrings(500, async::countDown, () -> {
            return new ProducerRecord(str, 0, "key-" + atomicInteger.get(), "value-" + atomicInteger.getAndIncrement());
        });
        KafkaCluster.Usage useTo2 = kafkaCluster.useTo();
        Objects.requireNonNull(async);
        useTo2.produceStrings(500, async::countDown, () -> {
            return new ProducerRecord(str2, 0, "key-" + atomicInteger.get(), "value-" + atomicInteger.getAndIncrement());
        });
        async.awaitSuccess(20000L);
        Properties consumerProperties = kafkaCluster.useTo().getConsumerProperties(str3, str3, OffsetResetStrategy.EARLIEST);
        consumerProperties.put("key.deserializer", StringDeserializer.class);
        consumerProperties.put("value.deserializer", StringDeserializer.class);
        this.consumer = createConsumer(this.vertx, consumerProperties);
        Async async2 = testContext.async();
        AtomicInteger atomicInteger2 = new AtomicInteger(500 * 2);
        KafkaReadStream<String, String> kafkaReadStream = this.consumer;
        Objects.requireNonNull(testContext);
        kafkaReadStream.exceptionHandler(testContext::fail);
        this.consumer.handler(consumerRecord -> {
            if (atomicInteger2.decrementAndGet() == 0) {
                async2.complete();
            }
        });
        this.consumer.subscribe(Pattern.compile("testConsumePattern\\d-.*"));
    }

    @Test
    public void testStreamWithHeader(TestContext testContext) {
        String str = "testStreamWithHeader-" + getClass().getName();
        this.consumer = createConsumer(this.vertx, setupConsumeWithHeaders(testContext, 1000, str));
        Async async = testContext.async();
        AtomicInteger atomicInteger = new AtomicInteger(1000);
        AtomicInteger atomicInteger2 = new AtomicInteger();
        KafkaReadStream<String, String> kafkaReadStream = this.consumer;
        Objects.requireNonNull(testContext);
        kafkaReadStream.exceptionHandler(testContext::fail);
        this.consumer.handler(consumerRecord -> {
            Header[] array = consumerRecord.headers().toArray();
            testContext.assertEquals(1, Integer.valueOf(array.length));
            Header header = array[0];
            testContext.assertEquals("header_key" + atomicInteger2.get(), header.key());
            testContext.assertEquals("header_value" + atomicInteger2.getAndIncrement(), new String(header.value()));
            if (atomicInteger.decrementAndGet() == 0) {
                async.complete();
            }
        });
        this.consumer.subscribe(Collections.singleton(str));
    }

    @Test
    public void testConsumerWithHeader(TestContext testContext) {
        String str = "testConsumerWithHeader-" + getClass().getName();
        this.consumer = createConsumer(this.vertx, setupConsumeWithHeaders(testContext, 1000, str));
        KafkaConsumerImpl kafkaConsumerImpl = new KafkaConsumerImpl(this.consumer);
        Async async = testContext.async();
        AtomicInteger atomicInteger = new AtomicInteger(1000);
        AtomicInteger atomicInteger2 = new AtomicInteger();
        Objects.requireNonNull(testContext);
        kafkaConsumerImpl.exceptionHandler(testContext::fail);
        kafkaConsumerImpl.handler(kafkaConsumerRecord -> {
            List headers = kafkaConsumerRecord.headers();
            testContext.assertEquals(1, Integer.valueOf(headers.size()));
            KafkaHeader kafkaHeader = (KafkaHeader) headers.get(0);
            testContext.assertEquals("header_key" + atomicInteger2.get(), kafkaHeader.key());
            testContext.assertEquals("header_value" + atomicInteger2.getAndIncrement(), kafkaHeader.value().toString());
            if (atomicInteger.decrementAndGet() == 0) {
                async.complete();
            }
        });
        kafkaConsumerImpl.subscribe(Collections.singleton(str));
    }

    private Properties setupConsumeWithHeaders(TestContext testContext, int i, String str) {
        Async async = testContext.async();
        AtomicInteger atomicInteger = new AtomicInteger();
        KafkaCluster.Usage useTo = kafkaCluster.useTo();
        Objects.requireNonNull(async);
        useTo.produceStrings(i, async::complete, () -> {
            return new ProducerRecord(str, 0, "key-" + atomicInteger.get(), "value-" + atomicInteger.get(), Collections.singletonList(new RecordHeader("header_key" + atomicInteger.get(), ("header_value" + atomicInteger.getAndIncrement()).getBytes())));
        });
        async.awaitSuccess(20000L);
        Properties consumerProperties = kafkaCluster.useTo().getConsumerProperties(str, str, OffsetResetStrategy.EARLIEST);
        consumerProperties.put("key.deserializer", StringDeserializer.class);
        consumerProperties.put("value.deserializer", StringDeserializer.class);
        return consumerProperties;
    }

    @Test
    public void testPause(TestContext testContext) throws Exception {
        String str = "testPause-" + getClass().getName();
        Async async = testContext.async();
        AtomicInteger atomicInteger = new AtomicInteger();
        int i = 1000;
        KafkaCluster.Usage useTo = kafkaCluster.useTo();
        Objects.requireNonNull(async);
        useTo.produceStrings(1000, async::complete, () -> {
            return new ProducerRecord(str, 0, "key-" + atomicInteger.get(), "value-" + atomicInteger.getAndIncrement());
        });
        async.awaitSuccess(20000L);
        Properties consumerProperties = kafkaCluster.useTo().getConsumerProperties(str, str, OffsetResetStrategy.EARLIEST);
        consumerProperties.put("key.deserializer", StringDeserializer.class);
        consumerProperties.put("value.deserializer", StringDeserializer.class);
        this.consumer = createConsumer(this.vertx, consumerProperties);
        Async async2 = testContext.async();
        AtomicInteger atomicInteger2 = new AtomicInteger(1000);
        KafkaReadStream<String, String> kafkaReadStream = this.consumer;
        Objects.requireNonNull(testContext);
        kafkaReadStream.exceptionHandler(testContext::fail);
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        this.consumer.handler(consumerRecord -> {
            testContext.assertFalse(atomicBoolean.get());
            int decrementAndGet = atomicInteger2.decrementAndGet();
            if (decrementAndGet == i / 2) {
                atomicBoolean.set(true);
                this.consumer.pause();
                this.vertx.setTimer(500L, l -> {
                    atomicBoolean.set(false);
                    this.consumer.resume();
                });
            }
            if (decrementAndGet == 0) {
                async2.complete();
            }
        });
        this.consumer.subscribe(Collections.singleton(str));
    }

    @Test
    public void testFetch(TestContext testContext) throws Exception {
        String str = "testPause-" + getClass().getName();
        Async async = testContext.async();
        AtomicInteger atomicInteger = new AtomicInteger();
        KafkaCluster.Usage useTo = kafkaCluster.useTo();
        Objects.requireNonNull(async);
        useTo.produceStrings(1000, async::complete, () -> {
            return new ProducerRecord(str, 0, "key-" + atomicInteger.get(), "value-" + atomicInteger.getAndIncrement());
        });
        async.awaitSuccess(20000L);
        Properties consumerProperties = kafkaCluster.useTo().getConsumerProperties(str, str, OffsetResetStrategy.EARLIEST);
        consumerProperties.put("key.deserializer", StringDeserializer.class);
        consumerProperties.put("value.deserializer", StringDeserializer.class);
        this.consumer = createConsumer(this.vertx, consumerProperties);
        Async async2 = testContext.async();
        AtomicInteger atomicInteger2 = new AtomicInteger(1000);
        KafkaReadStream<String, String> kafkaReadStream = this.consumer;
        Objects.requireNonNull(testContext);
        kafkaReadStream.exceptionHandler(testContext::fail);
        AtomicLong atomicLong = new AtomicLong();
        long j = 200;
        this.consumer.handler(consumerRecord -> {
            long decrementAndGet = atomicLong.decrementAndGet();
            testContext.assertTrue(decrementAndGet >= 0);
            if (decrementAndGet == 0) {
                this.vertx.setTimer(500L, l -> {
                    atomicLong.set(j);
                    this.consumer.fetch(j);
                });
            }
            if (atomicInteger2.decrementAndGet() == 0) {
                async2.complete();
            }
        });
        this.consumer.pause();
        atomicLong.set(200L);
        this.consumer.fetch(200L);
        this.consumer.subscribe(Collections.singleton(str));
    }

    @Test
    public void testPauseSingleTopic(TestContext testContext) throws Exception {
        String str = "testPauseSingleTopic-" + getClass().getName();
        Async async = testContext.async();
        AtomicInteger atomicInteger = new AtomicInteger();
        int i = 5000;
        KafkaCluster.Usage useTo = kafkaCluster.useTo();
        Objects.requireNonNull(async);
        useTo.produceStrings(5000, async::complete, () -> {
            return new ProducerRecord(str, 0, "key-" + atomicInteger.get(), "value-" + atomicInteger.getAndIncrement());
        });
        async.awaitSuccess(20000L);
        Properties consumerProperties = kafkaCluster.useTo().getConsumerProperties(str, str, OffsetResetStrategy.EARLIEST);
        consumerProperties.put("key.deserializer", StringDeserializer.class);
        consumerProperties.put("value.deserializer", StringDeserializer.class);
        this.consumer = createConsumer(this.vertx, consumerProperties);
        Async async2 = testContext.async();
        TopicPartition topicPartition = new TopicPartition(str, 0);
        AtomicInteger atomicInteger2 = new AtomicInteger(5000);
        KafkaReadStream<String, String> kafkaReadStream = this.consumer;
        Objects.requireNonNull(testContext);
        kafkaReadStream.exceptionHandler(testContext::fail);
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        this.consumer.batchHandler(consumerRecords -> {
            testContext.assertFalse(atomicBoolean.get());
        });
        this.consumer.handler(consumerRecord -> {
            int decrementAndGet = atomicInteger2.decrementAndGet();
            if (decrementAndGet == i / 3) {
                atomicBoolean.set(true);
                this.consumer.pause(Collections.singleton(topicPartition));
                this.vertx.setTimer(500L, l -> {
                    atomicBoolean.set(false);
                    this.consumer.resume(Collections.singleton(topicPartition));
                });
            }
            if (decrementAndGet == 0) {
                async2.complete();
            }
        });
        this.consumer.assign(Collections.singleton(topicPartition));
    }

    @Test
    public void testCommit(TestContext testContext) throws Exception {
        String str = "testCommit-" + getClass().getName();
        Async async = testContext.async();
        AtomicInteger atomicInteger = new AtomicInteger();
        int i = 500;
        KafkaCluster.Usage useTo = kafkaCluster.useTo();
        Objects.requireNonNull(async);
        useTo.produceStrings(500, async::complete, () -> {
            return new ProducerRecord(str, 0, "key-" + atomicInteger.get(), "value-" + atomicInteger.getAndIncrement());
        });
        async.awaitSuccess(10000L);
        Properties consumerProperties = kafkaCluster.useTo().getConsumerProperties(str, str, OffsetResetStrategy.EARLIEST);
        consumerProperties.put("key.deserializer", StringDeserializer.class);
        consumerProperties.put("value.deserializer", StringDeserializer.class);
        this.consumer = createConsumer(this.vertx, consumerProperties);
        KafkaReadStream<String, String> kafkaReadStream = this.consumer;
        Objects.requireNonNull(testContext);
        kafkaReadStream.exceptionHandler(testContext::fail);
        Async async2 = testContext.async();
        AtomicInteger atomicInteger2 = new AtomicInteger();
        this.consumer.handler(consumerRecord -> {
            int andIncrement = atomicInteger2.getAndIncrement();
            testContext.assertEquals("key-" + andIncrement, consumerRecord.key());
            testContext.assertEquals("value-" + andIncrement, consumerRecord.value());
            if (andIncrement == i - 1) {
                this.consumer.commit(testContext.asyncAssertSuccess(map -> {
                    this.consumer.close(asyncResult -> {
                        async2.complete();
                    });
                }));
            }
        });
        this.consumer.assign(Collections.singleton(new TopicPartition(str, 0)));
        async2.awaitSuccess(10000L);
        Async async3 = testContext.async();
        KafkaCluster.Usage useTo2 = kafkaCluster.useTo();
        Objects.requireNonNull(async3);
        useTo2.produceStrings(500, async3::complete, () -> {
            return new ProducerRecord(str, 0, "key-" + atomicInteger.get(), "value-" + atomicInteger.getAndIncrement());
        });
        async3.awaitSuccess(10000L);
        this.consumer = createConsumer(this.vertx, consumerProperties);
        KafkaReadStream<String, String> kafkaReadStream2 = this.consumer;
        Objects.requireNonNull(testContext);
        kafkaReadStream2.exceptionHandler(testContext::fail);
        Async async4 = testContext.async();
        this.consumer.handler(consumerRecord2 -> {
            int andIncrement = atomicInteger2.getAndIncrement();
            testContext.assertEquals("key-" + andIncrement, consumerRecord2.key());
            testContext.assertEquals("value-" + andIncrement, consumerRecord2.value());
            if (andIncrement == (i * 2) - 1) {
                this.consumer.commit(testContext.asyncAssertSuccess(map -> {
                    async4.complete();
                }));
            }
        });
        this.consumer.subscribe(Collections.singleton(str));
    }

    @Test
    public void testCommitWithOffsets(TestContext testContext) throws Exception {
        String str = "testCommitWithOffsets-" + getClass().getName();
        Async async = testContext.async();
        AtomicInteger atomicInteger = new AtomicInteger();
        int i = 500;
        KafkaCluster.Usage useTo = kafkaCluster.useTo();
        Objects.requireNonNull(async);
        useTo.produceStrings(500, async::complete, () -> {
            return new ProducerRecord(str, 0, "key-" + atomicInteger.get(), "value-" + atomicInteger.getAndIncrement());
        });
        async.awaitSuccess(10000L);
        Properties consumerProperties = kafkaCluster.useTo().getConsumerProperties(str, str, OffsetResetStrategy.EARLIEST);
        consumerProperties.put("key.deserializer", StringDeserializer.class);
        consumerProperties.put("value.deserializer", StringDeserializer.class);
        this.consumer = createConsumer(this.vertx, consumerProperties);
        KafkaReadStream<String, String> kafkaReadStream = this.consumer;
        Objects.requireNonNull(testContext);
        kafkaReadStream.exceptionHandler(testContext::fail);
        Async async2 = testContext.async(2);
        AtomicInteger atomicInteger2 = new AtomicInteger();
        this.consumer.handler(consumerRecord -> {
            switch (atomicInteger2.incrementAndGet()) {
                case 101:
                    this.consumer.commit(Collections.singletonMap(new TopicPartition(str, 0), new OffsetAndMetadata(consumerRecord.offset())), testContext.asyncAssertSuccess(map -> {
                        async2.countDown();
                    }));
                    return;
                case 500:
                    async2.countDown();
                    return;
                default:
                    return;
            }
        });
        this.consumer.assign(Collections.singleton(new TopicPartition(str, 0)));
        async2.awaitSuccess(10000L);
        Async async3 = testContext.async();
        this.consumer.close(asyncResult -> {
            async3.complete();
        });
        async3.awaitSuccess(10000L);
        this.consumer = createConsumer(this.vertx, consumerProperties);
        atomicInteger2.set(100);
        Async async4 = testContext.async();
        this.consumer.handler(consumerRecord2 -> {
            int andIncrement = atomicInteger2.getAndIncrement();
            testContext.assertEquals("key-" + andIncrement, consumerRecord2.key());
            testContext.assertEquals("value-" + andIncrement, consumerRecord2.value());
            if (andIncrement == i - 1) {
                async4.complete();
            }
        });
        this.consumer.subscribe(Collections.singleton(str));
    }

    @Test
    public void testCommitAfterPoll(TestContext testContext) throws Exception {
        String str = "testCommitAfterPoll-" + getClass().getName();
        Async async = testContext.async();
        AtomicInteger atomicInteger = new AtomicInteger();
        KafkaCluster.Usage useTo = kafkaCluster.useTo();
        Objects.requireNonNull(async);
        useTo.produceStrings(10, async::complete, () -> {
            return new ProducerRecord(str, 0, "key-" + atomicInteger.get(), "value-" + atomicInteger.getAndIncrement());
        });
        async.awaitSuccess(10000L);
        Properties consumerProperties = kafkaCluster.useTo().getConsumerProperties(str, str, OffsetResetStrategy.EARLIEST);
        consumerProperties.put("key.deserializer", StringDeserializer.class);
        consumerProperties.put("value.deserializer", StringDeserializer.class);
        this.consumer = createConsumer(this.vertx, consumerProperties);
        KafkaReadStream<String, String> kafkaReadStream = this.consumer;
        Objects.requireNonNull(testContext);
        kafkaReadStream.exceptionHandler(testContext::fail);
        Async async2 = testContext.async();
        this.consumer.subscribe(Collections.singleton(str), asyncResult -> {
            async2.complete();
        });
        async2.await();
        Async async3 = testContext.async();
        this.consumer.poll(Duration.ofSeconds(10L), asyncResult2 -> {
            if (((ConsumerRecords) asyncResult2.result()).count() == 10) {
                async3.countDown();
            }
        });
        async3.await();
        Async async4 = testContext.async();
        this.consumer.commit(Collections.singletonMap(new TopicPartition(str, 0), new OffsetAndMetadata(10L)), asyncResult3 -> {
            async4.complete();
        });
        async4.await();
    }

    @Test
    public void testRebalance(TestContext testContext) throws Exception {
        String str = "testRebalance-" + getClass().getName();
        kafkaCluster.createTopic(str, 2, 1);
        Properties consumerProperties = kafkaCluster.useTo().getConsumerProperties(str, str, OffsetResetStrategy.EARLIEST);
        consumerProperties.put("key.deserializer", StringDeserializer.class);
        consumerProperties.put("value.deserializer", StringDeserializer.class);
        Context orCreateContext = this.vertx.getOrCreateContext();
        this.consumer = createConsumer(orCreateContext, consumerProperties);
        consumerProperties.setProperty("client.id", "the_consumer2");
        this.consumer2 = createConsumer(this.vertx, consumerProperties);
        this.consumer.handler(consumerRecord -> {
        });
        this.consumer2.handler(consumerRecord2 -> {
        });
        Async async = testContext.async(2);
        AtomicInteger atomicInteger = new AtomicInteger();
        this.consumer.partitionsAssignedHandler(set -> {
            testContext.assertEquals(Vertx.currentContext(), orCreateContext);
            switch (atomicInteger.getAndIncrement()) {
                case 0:
                    this.consumer2.subscribe(Collections.singleton(str));
                    testContext.assertEquals(2, Integer.valueOf(set.size()));
                    testContext.assertTrue(set.contains(new TopicPartition(str, 0)));
                    testContext.assertTrue(set.contains(new TopicPartition(str, 1)));
                    return;
                case 1:
                    testContext.fail();
                    return;
                case 2:
                    testContext.assertEquals(1, Integer.valueOf(set.size()));
                    async.countDown();
                    return;
                default:
                    return;
            }
        });
        this.consumer.partitionsRevokedHandler(set2 -> {
            testContext.assertEquals(Vertx.currentContext(), orCreateContext);
            switch (atomicInteger.getAndIncrement()) {
                case 0:
                    testContext.fail();
                    return;
                case 1:
                    testContext.assertEquals(2, Integer.valueOf(set2.size()));
                    testContext.assertTrue(set2.contains(new TopicPartition(str, 0)));
                    testContext.assertTrue(set2.contains(new TopicPartition(str, 1)));
                    return;
                case 2:
                    testContext.fail();
                    return;
                case 3:
                    testContext.assertEquals(1, Integer.valueOf(set2.size()));
                    return;
                default:
                    return;
            }
        });
        AtomicInteger atomicInteger2 = new AtomicInteger();
        this.consumer2.partitionsAssignedHandler(set3 -> {
            switch (atomicInteger2.getAndIncrement()) {
                case 0:
                    testContext.assertEquals(1, Integer.valueOf(set3.size()));
                    async.countDown();
                    return;
                default:
                    return;
            }
        });
        this.consumer.subscribe(Collections.singleton(str));
    }

    @Test
    public void testSeek(TestContext testContext) throws Exception {
        String str = "the_topic_0-" + getClass().getName();
        testSeek(str, 500, testContext, () -> {
            this.consumer.seek(new TopicPartition(str, 0), 0L);
        }, -500);
    }

    @Test
    public void testSeekToBeginning(TestContext testContext) throws Exception {
        String str = "the_topic_1-" + getClass().getName();
        testSeek(str, 500, testContext, () -> {
            this.consumer.seekToBeginning(Collections.singleton(new TopicPartition(str, 0)));
        }, -500);
    }

    @Test
    public void testSeekToEnd(TestContext testContext) throws Exception {
        String str = "the_topic_2-" + getClass().getName();
        testSeek(str, 500, testContext, () -> {
            this.consumer.seekToEnd(Collections.singleton(new TopicPartition(str, 0)));
        }, 0);
    }

    private void testSeek(String str, int i, TestContext testContext, Runnable runnable, int i2) throws Exception {
        kafkaCluster.createTopic(str, 1, 1);
        Properties consumerProperties = kafkaCluster.useTo().getConsumerProperties(str, str, OffsetResetStrategy.EARLIEST);
        consumerProperties.put("key.deserializer", StringDeserializer.class);
        consumerProperties.put("value.deserializer", StringDeserializer.class);
        this.consumer = createConsumer(this.vertx.getOrCreateContext(), consumerProperties);
        Async async = testContext.async();
        AtomicInteger atomicInteger = new AtomicInteger();
        KafkaCluster.Usage useTo = kafkaCluster.useTo();
        Objects.requireNonNull(async);
        useTo.produceStrings(i, async::complete, () -> {
            return new ProducerRecord(str, 0, "key-" + atomicInteger.get(), "value-" + atomicInteger.getAndIncrement());
        });
        async.awaitSuccess(10000L);
        AtomicInteger atomicInteger2 = new AtomicInteger(i);
        Async async2 = testContext.async();
        this.consumer.handler(consumerRecord -> {
            int decrementAndGet = atomicInteger2.decrementAndGet();
            if (decrementAndGet >= 0) {
                testContext.assertEquals("key-" + ((i - decrementAndGet) - 1), consumerRecord.key());
            } else {
                testContext.assertEquals("key-" + ((-1) - decrementAndGet), consumerRecord.key());
            }
            if (decrementAndGet == 0) {
                runnable.run();
            }
            if (decrementAndGet == i2) {
                async2.complete();
            }
        });
        this.consumer.subscribe(Collections.singleton(str));
    }

    @Test
    public void testSeekAfterConsume(TestContext testContext) throws Exception {
        String str = "testSeekAfterConsume-" + getClass().getName();
        kafkaCluster.createTopic(str, 1, 1);
        Properties consumerProperties = kafkaCluster.useTo().getConsumerProperties(str, str, OffsetResetStrategy.EARLIEST);
        consumerProperties.put("key.deserializer", StringDeserializer.class);
        consumerProperties.put("value.deserializer", StringDeserializer.class);
        this.consumer = createConsumer(this.vertx.getOrCreateContext(), consumerProperties);
        Async async = testContext.async();
        AtomicInteger atomicInteger = new AtomicInteger();
        KafkaCluster.Usage useTo = kafkaCluster.useTo();
        Objects.requireNonNull(async);
        useTo.produceStrings(5000, async::complete, () -> {
            return new ProducerRecord(str, 0, "key-" + atomicInteger.get(), "value-" + atomicInteger.getAndIncrement());
        });
        async.awaitSuccess(10000L);
        Async async2 = testContext.async();
        TopicPartition topicPartition = new TopicPartition(str, 0);
        ArrayList arrayList = new ArrayList();
        this.consumer.assign(Collections.singleton(topicPartition), asyncResult -> {
            AtomicBoolean atomicBoolean = new AtomicBoolean();
            this.consumer.handler(consumerRecord -> {
                if (atomicBoolean.compareAndSet(false, true)) {
                    testContext.assertEquals("key-0", consumerRecord.key());
                    this.consumer.pause();
                    this.consumer.seekToBeginning(Collections.singleton(topicPartition), asyncResult -> {
                        this.consumer.position(topicPartition, testContext.asyncAssertSuccess(l -> {
                            testContext.assertEquals(0L, l, "Expecting offset 0 after seek to 0");
                            this.consumer.resume();
                        }));
                    });
                    return;
                }
                arrayList.add((String) consumerRecord.key());
                if (arrayList.size() == 5000) {
                    for (int i = 0; i < 5000; i++) {
                        testContext.assertEquals("key-" + i, arrayList.get(i));
                    }
                    async2.complete();
                }
            });
        });
    }

    @Test
    public void testSubscription(TestContext testContext) throws Exception {
        String str = "testSubscription-" + getClass().getName();
        kafkaCluster.createTopic(str, 1, 1);
        Properties consumerProperties = kafkaCluster.useTo().getConsumerProperties(str, str, OffsetResetStrategy.EARLIEST);
        consumerProperties.put("key.deserializer", StringDeserializer.class);
        consumerProperties.put("value.deserializer", StringDeserializer.class);
        this.consumer = createConsumer(this.vertx.getOrCreateContext(), consumerProperties);
        Async async = testContext.async();
        this.consumer.handler(consumerRecord -> {
        });
        this.consumer.subscribe(Collections.singleton(str), asyncResult -> {
            if (asyncResult.succeeded()) {
                this.consumer.subscription(asyncResult -> {
                    if (!asyncResult.succeeded()) {
                        testContext.fail();
                    } else {
                        testContext.assertTrue(((Set) asyncResult.result()).contains(str));
                        async.complete();
                    }
                });
            } else {
                testContext.fail();
            }
        });
    }

    @Test
    public void testAssign(TestContext testContext) throws Exception {
        String str = "testAssign-" + getClass().getName();
        kafkaCluster.createTopic(str, 1, 1);
        Properties consumerProperties = kafkaCluster.useTo().getConsumerProperties(str, str, OffsetResetStrategy.EARLIEST);
        consumerProperties.put("key.deserializer", StringDeserializer.class);
        consumerProperties.put("value.deserializer", StringDeserializer.class);
        this.consumer = createConsumer(this.vertx.getOrCreateContext(), consumerProperties);
        Async async = testContext.async();
        this.consumer.handler(consumerRecord -> {
        });
        TopicPartition topicPartition = new TopicPartition(str, 0);
        this.consumer.assign(Collections.singleton(topicPartition), asyncResult -> {
            if (asyncResult.succeeded()) {
                this.consumer.assignment(asyncResult -> {
                    if (!asyncResult.succeeded()) {
                        testContext.fail();
                    } else {
                        testContext.assertTrue(((Set) asyncResult.result()).contains(topicPartition));
                        async.complete();
                    }
                });
            } else {
                testContext.fail();
            }
        });
    }

    @Test
    public void testSetHandlerThenAssign(TestContext testContext) throws Exception {
        String str = "testSetHandlerThenAssign-" + getClass().getName();
        kafkaCluster.createTopic(str, 1, 1);
        Properties consumerProperties = kafkaCluster.useTo().getConsumerProperties(str, str, OffsetResetStrategy.EARLIEST);
        consumerProperties.put("key.deserializer", StringDeserializer.class);
        consumerProperties.put("value.deserializer", StringDeserializer.class);
        Async async = testContext.async(1 + 2);
        KafkaCluster.Usage useTo = kafkaCluster.useTo();
        Objects.requireNonNull(async);
        useTo.produceStrings(1, async::countDown, () -> {
            return new ProducerRecord(str, 0, "key", "value");
        });
        this.consumer = createConsumer(this.vertx.getOrCreateContext(), consumerProperties);
        Async async2 = testContext.async();
        Async async3 = testContext.async();
        this.consumer.handler(consumerRecord -> {
            testContext.assertTrue(async3.isCompleted() && async2.isCompleted());
            async.countDown();
        });
        async3.complete();
        this.consumer.batchHandler(consumerRecords -> {
            testContext.assertTrue(async3.isCompleted() && async2.isCompleted());
            async.countDown();
        });
        this.consumer.assign(Collections.singleton(new TopicPartition(str, 0)), asyncResult -> {
            if (asyncResult.succeeded()) {
                async2.complete();
            } else {
                testContext.fail();
            }
        });
    }

    @Test
    public void testAssignThenSetHandler(TestContext testContext) throws Exception {
        String str = "testAssignThenSetHandler-" + getClass().getName();
        kafkaCluster.createTopic(str, 1, 1);
        Properties consumerProperties = kafkaCluster.useTo().getConsumerProperties(str, str, OffsetResetStrategy.EARLIEST);
        consumerProperties.put("key.deserializer", StringDeserializer.class);
        consumerProperties.put("value.deserializer", StringDeserializer.class);
        Async async = testContext.async(1 + 2);
        KafkaCluster.Usage useTo = kafkaCluster.useTo();
        Objects.requireNonNull(async);
        useTo.produceStrings(1, async::countDown, () -> {
            return new ProducerRecord(str, 0, "key", "value");
        });
        this.consumer = createConsumer(this.vertx.getOrCreateContext(), consumerProperties);
        Async async2 = testContext.async();
        Async async3 = testContext.async();
        this.consumer.handler(consumerRecord -> {
            testContext.assertTrue(async3.isCompleted() && async2.isCompleted());
            async.countDown();
        });
        this.consumer.batchHandler(consumerRecords -> {
            testContext.assertTrue(async3.isCompleted() && async2.isCompleted());
            async.countDown();
        });
        async3.complete();
        this.consumer.assign(Collections.singleton(new TopicPartition(str, 0)), asyncResult -> {
            if (asyncResult.succeeded()) {
                async2.complete();
            } else {
                testContext.fail();
            }
        });
    }

    @Test
    public void testAssignAndSeek(TestContext testContext) throws Exception {
        String str = "testAssignAndSeek-" + getClass().getName();
        kafkaCluster.createTopic(str, 1, 1);
        Properties consumerProperties = kafkaCluster.useTo().getConsumerProperties(str, str, OffsetResetStrategy.EARLIEST);
        consumerProperties.put("key.deserializer", StringDeserializer.class);
        consumerProperties.put("value.deserializer", StringDeserializer.class);
        Async async = testContext.async(5000);
        AtomicInteger atomicInteger = new AtomicInteger();
        KafkaCluster.Usage useTo = kafkaCluster.useTo();
        Objects.requireNonNull(async);
        useTo.produceStrings(5000, async::complete, () -> {
            return new ProducerRecord(str, 0, "key-" + atomicInteger.get(), "value-" + atomicInteger.getAndIncrement());
        });
        async.await();
        this.consumer = createConsumer(this.vertx.getOrCreateContext(), consumerProperties);
        TopicPartition topicPartition = new TopicPartition(str, 0);
        this.consumer.assign(Collections.singleton(topicPartition), asyncResult -> {
            testContext.assertTrue(asyncResult.succeeded());
        });
        AtomicInteger atomicInteger2 = new AtomicInteger(0);
        Async async2 = testContext.async();
        this.consumer.handler(consumerRecord -> {
            long offset = consumerRecord.offset();
            switch (atomicInteger2.get()) {
                case 0:
                case 1:
                    if (offset == 5) {
                        atomicInteger2.set(1);
                        this.consumer.seek(topicPartition, 0L).onComplete(asyncResult2 -> {
                            atomicInteger2.set(2);
                            testContext.assertTrue(asyncResult2.succeeded());
                        });
                        return;
                    }
                    return;
                default:
                    return;
            }
        });
        this.consumer.batchHandler(consumerRecords -> {
            switch (atomicInteger2.get()) {
                case 0:
                case 1:
                default:
                    return;
                case 2:
                    Iterator it = consumerRecords.iterator();
                    while (it.hasNext()) {
                        ConsumerRecord consumerRecord2 = (ConsumerRecord) it.next();
                        if (consumerRecord2.offset() > 5 && !async2.isCompleted()) {
                            testContext.fail();
                        }
                        if (consumerRecord2.offset() == 0) {
                            async2.complete();
                        }
                    }
                    return;
            }
        });
    }

    @Test
    public void testReassign(TestContext testContext) throws Exception {
        String str = "testReassign1-" + getClass().getName();
        String str2 = "testReassign2-" + getClass().getName();
        String str3 = "testReassign-" + getClass().getName();
        kafkaCluster.createTopic(str, 1, 1);
        kafkaCluster.createTopic(str2, 1, 1);
        Properties consumerProperties = kafkaCluster.useTo().getConsumerProperties(str3, str3, OffsetResetStrategy.EARLIEST);
        consumerProperties.put("key.deserializer", StringDeserializer.class);
        consumerProperties.put("value.deserializer", StringDeserializer.class);
        int i = 5000;
        Async async = testContext.async(2);
        AtomicInteger atomicInteger = new AtomicInteger();
        KafkaCluster.Usage useTo = kafkaCluster.useTo();
        Objects.requireNonNull(async);
        useTo.produceStrings(5000, async::countDown, () -> {
            return new ProducerRecord(str, 0, "1key-" + atomicInteger.get(), "1value-" + atomicInteger.getAndIncrement());
        });
        KafkaCluster.Usage useTo2 = kafkaCluster.useTo();
        Objects.requireNonNull(async);
        useTo2.produceStrings(5000, async::countDown, () -> {
            return new ProducerRecord(str2, 0, "2key-" + atomicInteger.get(), "2value-" + atomicInteger.getAndIncrement());
        });
        async.await();
        this.consumer = createConsumer(this.vertx.getOrCreateContext(), consumerProperties);
        TopicPartition topicPartition = new TopicPartition(str, 0);
        TopicPartition topicPartition2 = new TopicPartition(str2, 0);
        this.consumer.assign(Collections.singleton(topicPartition), asyncResult -> {
            testContext.assertTrue(asyncResult.succeeded());
        });
        AtomicInteger atomicInteger2 = new AtomicInteger(0);
        Async async2 = testContext.async();
        this.consumer.handler(consumerRecord -> {
            long offset = consumerRecord.offset();
            switch (atomicInteger2.get()) {
                case 0:
                case 1:
                    if (consumerRecord.topic().equals(str2)) {
                        testContext.fail("Seen a " + str2 + " message before reassignment");
                    }
                    if (offset == 5) {
                        atomicInteger2.set(1);
                        this.consumer.assign(Collections.singleton(topicPartition2), asyncResult2 -> {
                            atomicInteger2.set(2);
                            testContext.assertTrue(asyncResult2.succeeded());
                        });
                        return;
                    }
                    return;
                default:
                    return;
            }
        });
        this.consumer.batchHandler(consumerRecords -> {
            switch (atomicInteger2.get()) {
                case 0:
                case 1:
                default:
                    return;
                case 2:
                    Iterator it = consumerRecords.iterator();
                    while (it.hasNext()) {
                        ConsumerRecord consumerRecord2 = (ConsumerRecord) it.next();
                        long offset = consumerRecord2.offset();
                        if (consumerRecord2.topic().equals(str)) {
                            testContext.fail("Seen a " + str + " message after reassignment");
                        }
                        if (offset == i - 1) {
                            async2.complete();
                        }
                    }
                    return;
            }
        });
    }

    @Test
    public void testListTopics(TestContext testContext) throws Exception {
        String str = "testListTopics-" + getClass().getName();
        kafkaCluster.createTopic(str, 1, 1);
        Properties consumerProperties = kafkaCluster.useTo().getConsumerProperties(str, str, OffsetResetStrategy.EARLIEST);
        consumerProperties.put("key.deserializer", StringDeserializer.class);
        consumerProperties.put("value.deserializer", StringDeserializer.class);
        this.consumer = createConsumer(this.vertx.getOrCreateContext(), consumerProperties);
        Async async = testContext.async();
        this.consumer.handler(consumerRecord -> {
        });
        this.consumer.subscribe(Collections.singleton(str), asyncResult -> {
            if (asyncResult.succeeded()) {
                this.consumer.listTopics(asyncResult -> {
                    if (!asyncResult.succeeded()) {
                        testContext.fail();
                    } else {
                        testContext.assertTrue(((Map) asyncResult.result()).containsKey(str));
                        async.complete();
                    }
                });
            } else {
                testContext.fail();
            }
        });
    }

    @Test
    public void testPartitionsFor(TestContext testContext) throws Exception {
        String str = "testPartitionsFor-" + getClass().getName();
        kafkaCluster.createTopic(str, 2, 1);
        Properties consumerProperties = kafkaCluster.useTo().getConsumerProperties(str, str, OffsetResetStrategy.EARLIEST);
        consumerProperties.put("key.deserializer", StringDeserializer.class);
        consumerProperties.put("value.deserializer", StringDeserializer.class);
        this.consumer = createConsumer(this.vertx.getOrCreateContext(), consumerProperties);
        Async async = testContext.async();
        this.consumer.partitionsFor(str, asyncResult -> {
            if (asyncResult.succeeded()) {
                testContext.assertEquals(2, Integer.valueOf(((List) asyncResult.result()).size()));
            } else {
                testContext.fail();
            }
            async.complete();
        });
    }

    @Test
    public void testPositionEmptyTopic(TestContext testContext) throws Exception {
        String str = "testPositionEmptyTopic-" + getClass().getName();
        kafkaCluster.createTopic(str, 1, 1);
        Properties consumerProperties = kafkaCluster.useTo().getConsumerProperties(str, str, OffsetResetStrategy.EARLIEST);
        consumerProperties.put("key.deserializer", StringDeserializer.class);
        consumerProperties.put("value.deserializer", StringDeserializer.class);
        this.consumer = createConsumer(this.vertx.getOrCreateContext(), consumerProperties);
        Async async = testContext.async();
        this.consumer.handler(consumerRecord -> {
        });
        this.consumer.subscribe(Collections.singleton(str), asyncResult -> {
            if (asyncResult.succeeded()) {
                this.consumer.partitionsFor(str, asyncResult -> {
                    if (!asyncResult.succeeded()) {
                        testContext.fail();
                        return;
                    }
                    Iterator it = ((List) asyncResult.result()).iterator();
                    while (it.hasNext()) {
                        this.consumer.position(new TopicPartition(str, ((PartitionInfo) it.next()).partition()), asyncResult -> {
                            if (!asyncResult.succeeded()) {
                                testContext.fail();
                            } else {
                                testContext.assertTrue(((Long) asyncResult.result()).longValue() == 0);
                                async.complete();
                            }
                        });
                    }
                });
            } else {
                testContext.fail();
            }
        });
    }

    @Test
    public void testPositionNonEmptyTopic(TestContext testContext) throws Exception {
        String str = "testPositionNonEmptyTopic-" + getClass().getName();
        kafkaCluster.createTopic(str, 1, 1);
        Async async = testContext.async();
        AtomicInteger atomicInteger = new AtomicInteger();
        int i = 1000;
        KafkaCluster.Usage useTo = kafkaCluster.useTo();
        Objects.requireNonNull(async);
        useTo.produceStrings(1000, async::complete, () -> {
            return new ProducerRecord(str, 0, "key-" + atomicInteger.get(), "value-" + atomicInteger.getAndIncrement());
        });
        async.awaitSuccess(20000L);
        Properties consumerProperties = kafkaCluster.useTo().getConsumerProperties(str, str, OffsetResetStrategy.EARLIEST);
        consumerProperties.put("key.deserializer", StringDeserializer.class);
        consumerProperties.put("value.deserializer", StringDeserializer.class);
        this.consumer = createConsumer(this.vertx.getOrCreateContext(), consumerProperties);
        Async async2 = testContext.async();
        AtomicInteger atomicInteger2 = new AtomicInteger(1000);
        KafkaReadStream<String, String> kafkaReadStream = this.consumer;
        Objects.requireNonNull(testContext);
        kafkaReadStream.exceptionHandler(testContext::fail);
        this.consumer.handler(consumerRecord -> {
            if (atomicInteger2.decrementAndGet() == 0) {
                this.consumer.partitionsFor(str, asyncResult -> {
                    if (!asyncResult.succeeded()) {
                        testContext.fail();
                        return;
                    }
                    Iterator it = ((List) asyncResult.result()).iterator();
                    while (it.hasNext()) {
                        this.consumer.position(new TopicPartition(str, ((PartitionInfo) it.next()).partition()), asyncResult -> {
                            if (!asyncResult.succeeded()) {
                                testContext.fail();
                            } else {
                                testContext.assertTrue(((Long) asyncResult.result()).longValue() == ((long) i));
                                async2.complete();
                            }
                        });
                    }
                });
            }
        });
        this.consumer.subscribe(Collections.singleton(str));
    }

    @Test
    public void testBeginningOffset(TestContext testContext) throws Exception {
        testBeginningEndOffset(testContext, true);
    }

    @Test
    public void testEndOffset(TestContext testContext) throws Exception {
        testBeginningEndOffset(testContext, false);
    }

    public void testBeginningEndOffset(TestContext testContext, boolean z) throws Exception {
        String str = "testBeginningEndOffset_" + (z ? "beginning" : "end") + "-" + getClass().getName();
        Async async = testContext.async();
        AtomicInteger atomicInteger = new AtomicInteger();
        int i = 1000;
        KafkaCluster.Usage useTo = kafkaCluster.useTo();
        Objects.requireNonNull(async);
        useTo.produceStrings(1000, async::complete, () -> {
            return new ProducerRecord(str, 0, "key-" + atomicInteger.get(), "value-" + atomicInteger.getAndIncrement());
        });
        async.awaitSuccess(20000L);
        Properties consumerProperties = kafkaCluster.useTo().getConsumerProperties(str, str, OffsetResetStrategy.EARLIEST);
        consumerProperties.put("key.deserializer", StringDeserializer.class);
        consumerProperties.put("value.deserializer", StringDeserializer.class);
        this.consumer = createConsumer(this.vertx.getOrCreateContext(), consumerProperties);
        KafkaReadStream<String, String> kafkaReadStream = this.consumer;
        Objects.requireNonNull(testContext);
        kafkaReadStream.exceptionHandler(testContext::fail);
        HashSet hashSet = new HashSet();
        TopicPartition topicPartition = new TopicPartition(str, 0);
        hashSet.add(topicPartition);
        Async async2 = testContext.async(2);
        this.consumer.handler(consumerRecord -> {
        });
        this.consumer.subscribe(Collections.singleton(str), testContext.asyncAssertSuccess(r14 -> {
            if (z) {
                this.consumer.beginningOffsets(hashSet, asyncResult -> {
                    testContext.assertTrue(asyncResult.succeeded());
                    testContext.assertEquals(1, Integer.valueOf(((Map) asyncResult.result()).size()));
                    testContext.assertEquals(0L, ((Map) asyncResult.result()).get(topicPartition));
                    async2.countDown();
                });
                this.consumer.beginningOffsets(topicPartition, asyncResult2 -> {
                    testContext.assertTrue(asyncResult2.succeeded());
                    testContext.assertEquals(0L, asyncResult2.result());
                    async2.countDown();
                });
            } else {
                this.consumer.endOffsets(hashSet, asyncResult3 -> {
                    testContext.assertTrue(asyncResult3.succeeded());
                    testContext.assertEquals(1, Integer.valueOf(((Map) asyncResult3.result()).size()));
                    testContext.assertEquals(Long.valueOf(i), ((Map) asyncResult3.result()).get(topicPartition));
                    async2.countDown();
                });
                this.consumer.endOffsets(topicPartition, asyncResult4 -> {
                    testContext.assertTrue(asyncResult4.succeeded());
                    testContext.assertEquals(Long.valueOf(i), asyncResult4.result());
                    async2.countDown();
                });
            }
        }));
    }

    @Test
    public void testOffsetsForTimes(TestContext testContext) throws Exception {
        String str = "testOffsetsForTimes-" + getClass().getName();
        Async async = testContext.async();
        AtomicInteger atomicInteger = new AtomicInteger();
        int i = 1000;
        long currentTimeMillis = System.currentTimeMillis();
        KafkaCluster.Usage useTo = kafkaCluster.useTo();
        Objects.requireNonNull(async);
        useTo.produceStrings(1000, async::complete, () -> {
            return new ProducerRecord(str, 0, "key-" + atomicInteger.get(), "value-" + atomicInteger.getAndIncrement());
        });
        async.awaitSuccess(20000L);
        long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
        Properties consumerProperties = kafkaCluster.useTo().getConsumerProperties(str, str, OffsetResetStrategy.EARLIEST);
        consumerProperties.put("key.deserializer", StringDeserializer.class);
        consumerProperties.put("value.deserializer", StringDeserializer.class);
        this.consumer = createConsumer(this.vertx.getOrCreateContext(), consumerProperties);
        KafkaReadStream<String, String> kafkaReadStream = this.consumer;
        Objects.requireNonNull(testContext);
        kafkaReadStream.exceptionHandler(testContext::fail);
        TopicPartition topicPartition = new TopicPartition(str, 0);
        Async async2 = testContext.async(2);
        this.consumer.handler(consumerRecord -> {
        });
        this.consumer.subscribe(Collections.singleton(str), testContext.asyncAssertSuccess(r20 -> {
            long j = currentTimeMillis + (currentTimeMillis2 / 2);
            this.consumer.offsetsForTimes(Collections.singletonMap(topicPartition, Long.valueOf(j)), testContext.asyncAssertSuccess(map -> {
                OffsetAndTimestamp offsetAndTimestamp = (OffsetAndTimestamp) map.get(topicPartition);
                testContext.assertEquals(1, Integer.valueOf(map.size()));
                testContext.assertTrue(offsetAndTimestamp.offset() >= 0 && offsetAndTimestamp.offset() <= ((long) i), "Invalid offset 0 <= " + offsetAndTimestamp.offset() + " <= " + i);
                testContext.assertTrue(offsetAndTimestamp.timestamp() >= j);
                async2.countDown();
            }));
            this.consumer.offsetsForTimes(topicPartition, j, testContext.asyncAssertSuccess(offsetAndTimestamp -> {
                testContext.assertTrue(offsetAndTimestamp.offset() >= 0 && offsetAndTimestamp.offset() <= ((long) i), "Invalid offset 0 <= " + offsetAndTimestamp.offset() + " <= " + i);
                testContext.assertTrue(offsetAndTimestamp.timestamp() >= j);
                async2.countDown();
            }));
        }));
    }

    @Test
    public void testOffsetsForTimesWithTimestampInFuture(TestContext testContext) throws Exception {
        String str = "testOffsetsForTimesWithTimestampInFuture-" + getClass().getName();
        Async async = testContext.async();
        AtomicInteger atomicInteger = new AtomicInteger();
        KafkaCluster.Usage useTo = kafkaCluster.useTo();
        Objects.requireNonNull(async);
        useTo.produceStrings(10, async::complete, () -> {
            return new ProducerRecord(str, 0, "key-" + atomicInteger.get(), "value-" + atomicInteger.getAndIncrement());
        });
        async.awaitSuccess(20000L);
        Properties consumerProperties = kafkaCluster.useTo().getConsumerProperties(str, str, OffsetResetStrategy.EARLIEST);
        consumerProperties.put("key.deserializer", StringDeserializer.class);
        consumerProperties.put("value.deserializer", StringDeserializer.class);
        KafkaConsumer create = KafkaConsumer.create(this.vertx, consumerProperties);
        Objects.requireNonNull(testContext);
        create.exceptionHandler(testContext::fail);
        io.vertx.kafka.client.common.TopicPartition topicPartition = new io.vertx.kafka.client.common.TopicPartition(str, 0);
        Async async2 = testContext.async(2);
        async2.handler(asyncResult -> {
            create.close();
        });
        create.handler(kafkaConsumerRecord -> {
        });
        long currentTimeMillis = System.currentTimeMillis();
        create.offsetsForTimes(topicPartition, Long.valueOf(currentTimeMillis), testContext.asyncAssertSuccess(offsetAndTimestamp -> {
            testContext.assertEquals((Object) null, offsetAndTimestamp, "Must return null because no offset for a timestamp in the future can exist");
            async2.countDown();
        }));
        create.offsetsForTimes(Collections.singletonMap(topicPartition, Long.valueOf(currentTimeMillis)), testContext.asyncAssertSuccess(map -> {
            io.vertx.kafka.client.consumer.OffsetAndTimestamp offsetAndTimestamp2 = (io.vertx.kafka.client.consumer.OffsetAndTimestamp) map.get(topicPartition);
            testContext.assertEquals(0, Integer.valueOf(map.size()), "Must not return a result, because no Offset is found");
            testContext.assertEquals((Object) null, offsetAndTimestamp2, "Must return null because no offset for a timestamp in the future can exist");
            async2.countDown();
        }));
    }

    @Test
    public void testBatchHandler(TestContext testContext) throws Exception {
        String str = "testBatchHandler-" + getClass().getName();
        Async async = testContext.async();
        AtomicInteger atomicInteger = new AtomicInteger();
        int i = 500;
        KafkaCluster.Usage useTo = kafkaCluster.useTo();
        Objects.requireNonNull(async);
        useTo.produceStrings(500, async::complete, () -> {
            return new ProducerRecord(str, 0, "key-" + atomicInteger.get(), "value-" + atomicInteger.getAndIncrement());
        });
        async.awaitSuccess(10000L);
        Properties consumerProperties = kafkaCluster.useTo().getConsumerProperties(str, str, OffsetResetStrategy.EARLIEST);
        consumerProperties.put("key.deserializer", StringDeserializer.class);
        consumerProperties.put("value.deserializer", StringDeserializer.class);
        this.consumer = createConsumer(this.vertx.getOrCreateContext(), consumerProperties);
        Async async2 = testContext.async();
        this.consumer.batchHandler(consumerRecords -> {
            testContext.assertEquals(Integer.valueOf(i), Integer.valueOf(consumerRecords.count()));
            async2.complete();
        });
        KafkaReadStream<String, String> kafkaReadStream = this.consumer;
        Objects.requireNonNull(testContext);
        kafkaReadStream.exceptionHandler(testContext::fail);
        this.consumer.handler(consumerRecord -> {
        });
        this.consumer.subscribe(Collections.singleton(str));
    }

    @Test
    public void testConsumerBatchHandler(TestContext testContext) throws Exception {
        String str = "testConsumerBatchHandler-" + getClass().getName();
        Async async = testContext.async();
        AtomicInteger atomicInteger = new AtomicInteger();
        int i = 500;
        int i2 = 1000;
        KafkaCluster.Usage useTo = kafkaCluster.useTo();
        Objects.requireNonNull(async);
        useTo.produceStrings(1000, async::complete, () -> {
            return new ProducerRecord(str, 0, "key-" + atomicInteger.get(), "value-" + atomicInteger.getAndIncrement());
        });
        async.awaitSuccess(10000L);
        Properties consumerProperties = kafkaCluster.useTo().getConsumerProperties(str, str, OffsetResetStrategy.EARLIEST);
        consumerProperties.put("key.deserializer", StringDeserializer.class);
        consumerProperties.put("value.deserializer", StringDeserializer.class);
        KafkaConsumer create = KafkaConsumer.create(this.vertx, consumerProperties);
        Objects.requireNonNull(testContext);
        create.exceptionHandler(testContext::fail);
        AtomicInteger atomicInteger2 = new AtomicInteger(1000);
        Async async2 = testContext.async();
        async2.handler(asyncResult -> {
            create.close();
        });
        create.batchHandler(kafkaConsumerRecords -> {
            testContext.assertEquals(Integer.valueOf(i), Integer.valueOf(kafkaConsumerRecords.size()));
            for (int i3 = 0; i3 < kafkaConsumerRecords.size(); i3++) {
                KafkaConsumerRecord recordAt = kafkaConsumerRecords.recordAt(i3);
                int decrementAndGet = atomicInteger2.decrementAndGet();
                if (decrementAndGet >= 0) {
                    testContext.assertEquals("key-" + ((i2 - decrementAndGet) - 1), recordAt.key());
                } else {
                    testContext.assertEquals("key-" + ((-1) - decrementAndGet), recordAt.key());
                }
                if (decrementAndGet == 0) {
                    async2.complete();
                }
            }
        });
        create.subscribe(Collections.singleton(str));
    }

    @Test
    public void testPollExceptionHandler(TestContext testContext) throws Exception {
        Properties consumerProperties = kafkaCluster.useTo().getConsumerProperties("someRandomGroup", "someRandomClientID", OffsetResetStrategy.EARLIEST);
        consumerProperties.remove("group.id");
        consumerProperties.put("key.deserializer", StringDeserializer.class);
        consumerProperties.put("value.deserializer", StringDeserializer.class);
        this.consumer = createConsumer(this.vertx, consumerProperties);
        Async async = testContext.async();
        this.consumer.exceptionHandler(th -> {
            testContext.assertTrue(th instanceof InvalidGroupIdException);
            async.complete();
        });
        KafkaReadStream<String, String> kafkaReadStream = this.consumer;
        PrintStream printStream = System.out;
        Objects.requireNonNull(printStream);
        kafkaReadStream.handler((v1) -> {
            r1.println(v1);
        }).subscribe(Collections.singleton("someTopic"));
    }

    @Test
    public void testPollTimeout(TestContext testContext) throws Exception {
        Async async = testContext.async();
        String str = "testPollTimeout-" + getClass().getName();
        Properties consumerProperties = kafkaCluster.useTo().getConsumerProperties(str, str, OffsetResetStrategy.EARLIEST);
        consumerProperties.put("key.deserializer", StringDeserializer.class);
        consumerProperties.put("value.deserializer", StringDeserializer.class);
        io.vertx.kafka.client.common.TopicPartition topicPartition = new io.vertx.kafka.client.common.TopicPartition(str, 0);
        KafkaConsumer create = KafkaConsumer.create(this.vertx, consumerProperties);
        int i = 1500;
        create.pollTimeout(Duration.ofMillis(1500));
        create.subscribe(str, asyncResult -> {
            create.handler(kafkaConsumerRecord -> {
            });
            long currentTimeMillis = System.currentTimeMillis();
            create.seekToBeginning(topicPartition, asyncResult -> {
                testContext.assertTrue(System.currentTimeMillis() - currentTimeMillis >= ((long) i), "Operation must take at least as long as the polling timeout");
                create.close();
                async.countDown();
            });
        });
    }

    @Test
    public void testNotCommitted(TestContext testContext) throws Exception {
        String str = "testNotCommitted-" + getClass().getName();
        kafkaCluster.createTopic(str, 1, 1);
        Properties consumerProperties = kafkaCluster.useTo().getConsumerProperties(str, str, OffsetResetStrategy.EARLIEST);
        consumerProperties.put("key.deserializer", StringDeserializer.class);
        consumerProperties.put("value.deserializer", StringDeserializer.class);
        Async async = testContext.async();
        KafkaConsumer create = KafkaConsumer.create(this.vertx, consumerProperties);
        create.handler(kafkaConsumerRecord -> {
        });
        create.partitionsAssignedHandler(set -> {
            Iterator it = set.iterator();
            while (it.hasNext()) {
                create.committed((io.vertx.kafka.client.common.TopicPartition) it.next(), asyncResult -> {
                    if (asyncResult.succeeded()) {
                        testContext.assertNull(asyncResult.result());
                    } else {
                        testContext.fail(asyncResult.cause());
                    }
                });
            }
            async.complete();
        });
        create.subscribe(Collections.singleton(str));
    }

    @Test
    public void testConsumeWithPoll(TestContext testContext) {
        String str = "testConsumeWithPoll-" + getClass().getName();
        Async async = testContext.async();
        KafkaCluster.Usage useTo = kafkaCluster.useTo();
        Objects.requireNonNull(async);
        useTo.produceStrings(1000, async::complete, () -> {
            return new ProducerRecord(str, "value");
        });
        async.awaitSuccess(20000L);
        Properties consumerProperties = kafkaCluster.useTo().getConsumerProperties(str, str, OffsetResetStrategy.EARLIEST);
        consumerProperties.put("key.deserializer", StringDeserializer.class);
        consumerProperties.put("value.deserializer", StringDeserializer.class);
        KafkaConsumer create = KafkaConsumer.create(this.vertx, consumerProperties);
        Async async2 = testContext.async();
        AtomicInteger atomicInteger = new AtomicInteger(1000);
        create.subscribe(Collections.singleton(str), asyncResult -> {
            if (asyncResult.succeeded()) {
                this.vertx.setPeriodic(1000L, l -> {
                    create.poll(Duration.ofMillis(100L), asyncResult -> {
                        if (!asyncResult.succeeded()) {
                            testContext.fail();
                        } else if (atomicInteger.updateAndGet(i -> {
                            return atomicInteger.get() - ((KafkaConsumerRecords) asyncResult.result()).size();
                        }) == 0) {
                            this.vertx.cancelTimer(l.longValue());
                            async2.complete();
                        }
                    });
                });
            } else {
                testContext.fail();
            }
        });
    }

    @Test
    public void testConsumeWithPollNoMessages(TestContext testContext) {
        String str = "testConsumeWithPollNoMessages-" + getClass().getName();
        Properties consumerProperties = kafkaCluster.useTo().getConsumerProperties(str, str, OffsetResetStrategy.EARLIEST);
        consumerProperties.put("key.deserializer", StringDeserializer.class);
        consumerProperties.put("value.deserializer", StringDeserializer.class);
        KafkaConsumer create = KafkaConsumer.create(this.vertx, consumerProperties);
        Async async = testContext.async();
        AtomicInteger atomicInteger = new AtomicInteger(5);
        create.subscribe(Collections.singleton(str), asyncResult -> {
            if (asyncResult.succeeded()) {
                this.vertx.setPeriodic(1000L, l -> {
                    create.poll(Duration.ofMillis(100L), asyncResult -> {
                        if (!asyncResult.succeeded()) {
                            testContext.fail();
                            return;
                        }
                        if (((KafkaConsumerRecords) asyncResult.result()).size() > 0) {
                            testContext.fail();
                        } else if (atomicInteger.decrementAndGet() == 0) {
                            this.vertx.cancelTimer(l.longValue());
                            async.complete();
                        }
                    });
                });
            } else {
                testContext.fail();
            }
        });
    }

    <K, V> KafkaReadStream<K, V> createConsumer(Context context, Properties properties) throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        context.runOnContext(r9 -> {
            try {
                completableFuture.complete(createConsumer(context.owner(), properties));
            } catch (Exception e) {
                completableFuture.completeExceptionally(e);
            }
        });
        return (KafkaReadStream) completableFuture.get(10L, TimeUnit.SECONDS);
    }

    abstract <K, V> KafkaReadStream<K, V> createConsumer(Vertx vertx, Properties properties);
}
