package io.github.embeddedkafka.ops;

import io.github.embeddedkafka.EmbeddedKafkaConfig;
import io.github.embeddedkafka.KafkaUnavailableException;
import java.util.Iterator;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import scala.$less$colon$less$;
import scala.Function1;
import scala.MatchError;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Tuple2;
import scala.collection.IterableOnce;
import scala.collection.IterableOnceOps;
import scala.collection.immutable.List;
import scala.collection.immutable.Map;
import scala.collection.immutable.Map$;
import scala.collection.immutable.Set;
import scala.collection.mutable.Growable;
import scala.collection.mutable.ListBuffer$;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.duration.package;
import scala.concurrent.duration.package$;
import scala.jdk.CollectionConverters$;
import scala.runtime.BoxesRunTime;
import scala.runtime.LongRef;
import scala.runtime.ScalaRunTime$;
import scala.util.Try;
import scala.util.Try$;

/* compiled from: ConsumerOps.scala */
/* loaded from: input_file:io/github/embeddedkafka/ops/ConsumerOps.class */
public interface ConsumerOps<C extends EmbeddedKafkaConfig> {
    FiniteDuration consumerPollingTimeout();

    void io$github$embeddedkafka$ops$ConsumerOps$_setter_$consumerPollingTimeout_$eq(FiniteDuration finiteDuration);

    Map<String, Object> baseConsumerConfig(C c);

    default Map<String, Object> defaultConsumerConfig(C c) {
        return (Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension((String) Predef$.MODULE$.ArrowAssoc("group.id"), "embedded-kafka-spec"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension((String) Predef$.MODULE$.ArrowAssoc("bootstrap.servers"), new StringBuilder(10).append("localhost:").append(c.kafkaPort()).toString()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension((String) Predef$.MODULE$.ArrowAssoc("auto.offset.reset"), OffsetResetStrategy.EARLIEST.toString().toLowerCase()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension((String) Predef$.MODULE$.ArrowAssoc("enable.auto.commit"), BoxesRunTime.boxToBoolean(false).toString())}));
    }

    default String consumeFirstStringMessageFrom(String str, boolean z, Duration duration, C c) {
        return (String) consumeNumberStringMessagesFrom(str, 1, z, duration, c).head();
    }

    default boolean consumeFirstStringMessageFrom$default$2() {
        return false;
    }

    default Duration consumeFirstStringMessageFrom$default$3() {
        return new package.DurationInt(package$.MODULE$.DurationInt(5)).seconds();
    }

    default List<String> consumeNumberStringMessagesFrom(String str, int i, boolean z, Duration duration, C c) {
        return consumeNumberMessagesFrom(str, i, z, duration, c, new StringDeserializer());
    }

    default boolean consumeNumberStringMessagesFrom$default$3() {
        return false;
    }

    default Duration consumeNumberStringMessagesFrom$default$4() {
        return new package.DurationInt(package$.MODULE$.DurationInt(5)).seconds();
    }

    default <V> V consumeFirstMessageFrom(String str, boolean z, Duration duration, C c, Deserializer<V> deserializer) throws KafkaUnavailableException, TimeoutException {
        return (V) consumeNumberMessagesFrom(str, 1, z, duration, c, deserializer).head();
    }

    default boolean consumeFirstMessageFrom$default$2() {
        return false;
    }

    default <V> Duration consumeFirstMessageFrom$default$3() {
        return new package.DurationInt(package$.MODULE$.DurationInt(5)).seconds();
    }

    default <K, V> Tuple2<K, V> consumeFirstKeyedMessageFrom(String str, boolean z, Duration duration, C c, Deserializer<K> deserializer, Deserializer<V> deserializer2) throws KafkaUnavailableException, TimeoutException {
        return (Tuple2) consumeNumberKeyedMessagesFrom(str, 1, z, duration, c, deserializer, deserializer2).head();
    }

    default boolean consumeFirstKeyedMessageFrom$default$2() {
        return false;
    }

    default <K, V> Duration consumeFirstKeyedMessageFrom$default$3() {
        return new package.DurationInt(package$.MODULE$.DurationInt(5)).seconds();
    }

    default <V> List<V> consumeNumberMessagesFrom(String str, int i, boolean z, Duration duration, C c, Deserializer<V> deserializer) {
        return (List) consumeNumberMessagesFromTopics((Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{str})), i, z, duration, consumeNumberMessagesFromTopics$default$5(), c, deserializer).apply(str);
    }

    default boolean consumeNumberMessagesFrom$default$3() {
        return false;
    }

    default <V> Duration consumeNumberMessagesFrom$default$4() {
        return new package.DurationInt(package$.MODULE$.DurationInt(5)).seconds();
    }

    default <K, V> List<Tuple2<K, V>> consumeNumberKeyedMessagesFrom(String str, int i, boolean z, Duration duration, C c, Deserializer<K> deserializer, Deserializer<V> deserializer2) {
        return (List) consumeNumberKeyedMessagesFromTopics((Set) Predef$.MODULE$.Set().apply(ScalaRunTime$.MODULE$.wrapRefArray(new String[]{str})), i, z, duration, consumeNumberKeyedMessagesFromTopics$default$5(), c, deserializer, deserializer2).apply(str);
    }

    default boolean consumeNumberKeyedMessagesFrom$default$3() {
        return false;
    }

    default <K, V> Duration consumeNumberKeyedMessagesFrom$default$4() {
        return new package.DurationInt(package$.MODULE$.DurationInt(5)).seconds();
    }

    default <V> Map<String, List<V>> consumeNumberMessagesFromTopics(Set<String> set, int i, boolean z, Duration duration, boolean z2, C c, Deserializer<V> deserializer) {
        return consumeNumberKeyedMessagesFromTopics(set, i, z, duration, z2, c, new StringDeserializer(), deserializer).view().mapValues(list -> {
            return list.map(tuple2 -> {
                if (tuple2 != null) {
                    return tuple2._2();
                }
                throw new MatchError(tuple2);
            });
        }).toMap($less$colon$less$.MODULE$.refl());
    }

    default boolean consumeNumberMessagesFromTopics$default$3() {
        return false;
    }

    default <V> Duration consumeNumberMessagesFromTopics$default$4() {
        return new package.DurationInt(package$.MODULE$.DurationInt(5)).seconds();
    }

    default boolean consumeNumberMessagesFromTopics$default$5() {
        return true;
    }

    default <K, V> Map<String, List<Tuple2<K, V>>> consumeNumberKeyedMessagesFromTopics(Set<String> set, int i, boolean z, Duration duration, boolean z2, C c, Deserializer<K> deserializer, Deserializer<V> deserializer2) {
        Map $plus$plus = baseConsumerConfig(c).$plus$plus((IterableOnce) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension((String) Predef$.MODULE$.ArrowAssoc("enable.auto.commit"), BoxesRunTime.boxToBoolean(z).toString())})));
        LongRef create = LongRef.create(System.nanoTime() + duration.toNanos());
        KafkaConsumer kafkaConsumer = new KafkaConsumer(CollectionConverters$.MODULE$.MapHasAsJava($plus$plus).asJava(), deserializer, deserializer2);
        Try apply = Try$.MODULE$.apply(() -> {
            return r1.$anonfun$1(r2, r3, r4, r5, r6, r7);
        });
        kafkaConsumer.close();
        return (Map) apply.recover(new ConsumerOps$$anon$1()).get();
    }

    default boolean consumeNumberKeyedMessagesFromTopics$default$3() {
        return false;
    }

    default <K, V> Duration consumeNumberKeyedMessagesFromTopics$default$4() {
        return new package.DurationInt(package$.MODULE$.DurationInt(5)).seconds();
    }

    default boolean consumeNumberKeyedMessagesFromTopics$default$5() {
        return true;
    }

    default <K, V, T> T withConsumer(Function1<KafkaConsumer<K, V>, T> function1, C c, Deserializer<K> deserializer, Deserializer<V> deserializer2) {
        return (T) io.github.embeddedkafka.package$.MODULE$.loanAndClose(new KafkaConsumer(CollectionConverters$.MODULE$.MapHasAsJava(baseConsumerConfig(c)).asJava(), deserializer, deserializer2), function1);
    }

    private default Map $anonfun$1(Set set, KafkaConsumer kafkaConsumer, int i, LongRef longRef, boolean z, Duration duration) {
        Map map = ((IterableOnceOps) set.map(str -> {
            return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension((String) Predef$.MODULE$.ArrowAssoc(str), ListBuffer$.MODULE$.empty());
        })).toMap($less$colon$less$.MODULE$.refl());
        int i2 = 0;
        kafkaConsumer.subscribe(CollectionConverters$.MODULE$.SetHasAsJava(set).asJava());
        set.foreach(str2 -> {
            return kafkaConsumer.partitionsFor(str2);
        });
        while (i2 < i && System.nanoTime() < longRef.elem) {
            Iterator it = kafkaConsumer.poll(io.github.embeddedkafka.package$.MODULE$.duration2JavaDuration(consumerPollingTimeout())).iterator();
            if (z && it.hasNext()) {
                longRef.elem = System.nanoTime() + duration.toNanos();
            }
            while (it.hasNext() && i2 < i) {
                ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                ((Growable) map.apply(consumerRecord.topic())).$plus$eq(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(consumerRecord.key()), consumerRecord.value()));
                kafkaConsumer.commitSync(CollectionConverters$.MODULE$.MapHasAsJava((scala.collection.Map) Map$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension((TopicPartition) Predef$.MODULE$.ArrowAssoc(new TopicPartition(consumerRecord.topic(), consumerRecord.partition())), new OffsetAndMetadata(consumerRecord.offset() + 1))}))).asJava());
                i2++;
            }
        }
        if (i2 < i) {
            throw new TimeoutException(new StringBuilder(45).append("Unable to retrieve ").append(i).append(" message(s) from Kafka in ").append(duration).toString());
        }
        return map.view().mapValues(listBuffer -> {
            return listBuffer.toList();
        }).toMap($less$colon$less$.MODULE$.refl());
    }
}
