package io.vertx.kafka.client.tests;

import io.vertx.core.Vertx;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import io.vertx.kafka.client.consumer.KafkaReadStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.TimestampType;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;

@RunWith(VertxUnitRunner.class)
/* loaded from: input_file:io/vertx/kafka/client/tests/ConsumerMockTestBase.class */
public abstract class ConsumerMockTestBase {
    private Vertx vertx;

    @Before
    public void beforeTest() {
        this.vertx = Vertx.vertx();
    }

    @After
    public void afterTest(TestContext testContext) {
        this.vertx.close(testContext.asyncAssertSuccess());
    }

    @Test
    public void testConsume(TestContext testContext) throws Exception {
        MockConsumer mockConsumer = new MockConsumer(OffsetResetStrategy.EARLIEST);
        KafkaReadStream createConsumer = createConsumer(this.vertx, mockConsumer);
        Async async = testContext.async();
        createConsumer.handler(consumerRecord -> {
            testContext.assertEquals("the_topic", consumerRecord.topic());
            testContext.assertEquals(0, Integer.valueOf(consumerRecord.partition()));
            testContext.assertEquals("abc", consumerRecord.key());
            testContext.assertEquals("def", consumerRecord.value());
            createConsumer.close(asyncResult -> {
                async.complete();
            });
        });
        createConsumer.subscribe(Collections.singleton("the_topic"), asyncResult -> {
            mockConsumer.schedulePollTask(() -> {
                mockConsumer.rebalance(Collections.singletonList(new TopicPartition("the_topic", 0)));
                mockConsumer.addRecord(new ConsumerRecord("the_topic", 0, 0L, "abc", "def"));
                mockConsumer.seek(new TopicPartition("the_topic", 0), 0L);
            });
        });
    }

    @Test
    public void testConsumeWithHeader(TestContext testContext) {
        MockConsumer mockConsumer = new MockConsumer(OffsetResetStrategy.EARLIEST);
        KafkaReadStream createConsumer = createConsumer(this.vertx, mockConsumer);
        Async async = testContext.async();
        createConsumer.handler(consumerRecord -> {
            testContext.assertEquals("the_topic", consumerRecord.topic());
            testContext.assertEquals(0, Integer.valueOf(consumerRecord.partition()));
            testContext.assertEquals("abc", consumerRecord.key());
            testContext.assertEquals("def", consumerRecord.value());
            Header[] array = consumerRecord.headers().toArray();
            testContext.assertEquals(1, Integer.valueOf(array.length));
            Header header = array[0];
            testContext.assertEquals("header_key", header.key());
            testContext.assertEquals("header_value", new String(header.value()));
            createConsumer.close(asyncResult -> {
                async.complete();
            });
        });
        createConsumer.subscribe(Collections.singleton("the_topic"), asyncResult -> {
            mockConsumer.schedulePollTask(() -> {
                mockConsumer.rebalance(Collections.singletonList(new TopicPartition("the_topic", 0)));
                mockConsumer.addRecord(new ConsumerRecord("the_topic", 0, 0L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0L, 0, 0, "abc", "def", new RecordHeaders(Collections.singletonList(new RecordHeader("header_key", "header_value".getBytes())))));
                mockConsumer.seek(new TopicPartition("the_topic", 0), 0L);
            });
        });
    }

    @Test
    public void testBatch(TestContext testContext) throws Exception {
        int i = 50;
        MockConsumer mockConsumer = new MockConsumer(OffsetResetStrategy.EARLIEST);
        KafkaReadStream createConsumer = createConsumer(this.vertx, mockConsumer);
        Async async = testContext.async();
        AtomicInteger atomicInteger = new AtomicInteger();
        createConsumer.handler(consumerRecord -> {
            int andIncrement = atomicInteger.getAndIncrement();
            if (andIncrement < i) {
                testContext.assertEquals("the_topic", consumerRecord.topic());
                testContext.assertEquals(0, Integer.valueOf(consumerRecord.partition()));
                testContext.assertEquals("key-" + andIncrement, consumerRecord.key());
                testContext.assertEquals("value-" + andIncrement, consumerRecord.value());
                if (andIncrement == i - 1) {
                    createConsumer.close(asyncResult -> {
                        async.complete();
                    });
                }
            }
        });
        createConsumer.subscribe(Collections.singleton("the_topic"), asyncResult -> {
            mockConsumer.schedulePollTask(() -> {
                mockConsumer.rebalance(Collections.singletonList(new TopicPartition("the_topic", 0)));
                mockConsumer.seek(new TopicPartition("the_topic", 0), 0L);
                for (int i2 = 0; i2 < i; i2++) {
                    mockConsumer.addRecord(new ConsumerRecord("the_topic", 0, i2, "key-" + i2, "value-" + i2));
                }
            });
        });
    }

    @Test
    public void testConsumedMessagesHandledOnUniqueContexts(TestContext testContext) {
        MockConsumer mockConsumer = new MockConsumer(OffsetResetStrategy.EARLIEST);
        KafkaReadStream createConsumer = createConsumer(this.vertx, mockConsumer);
        int i = 2;
        Async async = testContext.async(2);
        ArrayList arrayList = new ArrayList();
        createConsumer.handler(consumerRecord -> {
            arrayList.add(Vertx.currentContext());
            async.countDown();
        });
        createConsumer.subscribe(Collections.singleton("the_topic"), asyncResult -> {
            mockConsumer.schedulePollTask(() -> {
                mockConsumer.rebalance(Collections.singletonList(new TopicPartition("the_topic", 0)));
                mockConsumer.seek(new TopicPartition("the_topic", 0), 0L);
                for (int i2 = 0; i2 < i; i2++) {
                    mockConsumer.addRecord(new ConsumerRecord("the_topic", 0, i2, "key-" + i2, "value-" + i2));
                }
            });
        });
        async.handler(asyncResult2 -> {
            createConsumer.close(asyncResult2 -> {
                testContext.assertEquals(Integer.valueOf(i), Integer.valueOf(arrayList.size()));
                testContext.assertNotEquals(arrayList.get(0), arrayList.get(1));
            });
        });
    }

    abstract <K, V> KafkaReadStream<K, V> createConsumer(Vertx vertx, Consumer<K, V> consumer);
}
