package io.atleon.kafka;

import io.atleon.core.Alo;
import io.atleon.core.AloFlux;
import io.atleon.core.ComposedAlo;
import io.atleon.core.ErrorEmitter;
import io.atleon.util.Consuming;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.TopicPartition;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:io/atleon/kafka/AloKafkaBoundedReceiver.class */
public class AloKafkaBoundedReceiver<K, V> {
    private final KafkaConfigSource configSource;

    private AloKafkaBoundedReceiver(KafkaConfigSource kafkaConfigSource) {
        this.configSource = kafkaConfigSource;
    }

    public static <K, V> AloKafkaBoundedReceiver<K, V> create(KafkaConfigSource kafkaConfigSource) {
        return new AloKafkaBoundedReceiver<>(kafkaConfigSource);
    }

    public AloFlux<ConsumerRecord<K, V>> receiveAloRecordsUpTo(String str, OffsetCriteria offsetCriteria) {
        return receiveAloRecordsUpTo(Collections.singletonList(str), offsetCriteria);
    }

    public AloFlux<ConsumerRecord<K, V>> receiveAloRecordsUpTo(Collection<String> collection, OffsetCriteria offsetCriteria) {
        return (AloFlux) ((Mono) this.configSource.create()).flatMapMany(kafkaConfig -> {
            return receiveAloRecordsUpTo(kafkaConfig, collection, offsetCriteria);
        }).as((v0) -> {
            return AloFlux.wrap(v0);
        });
    }

    private Flux<Alo<ConsumerRecord<K, V>>> receiveAloRecordsUpTo(KafkaConfig kafkaConfig, Collection<String> collection, OffsetCriteria offsetCriteria) {
        return Flux.using(() -> {
            return ReactiveAdmin.create(kafkaConfig.nativeProperties());
        }, reactiveAdmin -> {
            return receiveAndCommitRecordsInRange(reactiveAdmin, kafkaConfig, collection, offsetCriteria);
        }, (v0) -> {
            v0.close();
        });
    }

    private Flux<Alo<ConsumerRecord<K, V>>> receiveAndCommitRecordsInRange(ReactiveAdmin reactiveAdmin, KafkaConfig kafkaConfig, Collection<String> collection, OffsetCriteria offsetCriteria) {
        String orElseThrow = kafkaConfig.loadString("group.id").orElseThrow(() -> {
            return new IllegalArgumentException("Must provide Consumer Group ID");
        });
        OffsetResetStrategy offsetResetStrategy = (OffsetResetStrategy) kafkaConfig.loadString("auto.offset.reset").map(str -> {
            return OffsetResetStrategy.valueOf(str.toUpperCase(Locale.ROOT));
        }).orElse(OffsetResetStrategy.LATEST);
        return reactiveAdmin.listTopicPartitions(collection).collectList().flatMapMany(list -> {
            return reactiveAdmin.listTopicPartitionGroupOffsets(orElseThrow, offsetResetStrategy, list);
        }).collectMap((v0) -> {
            return v0.topicPartition();
        }, topicPartitionGroupOffsets -> {
            return toOffsetRange(topicPartitionGroupOffsets.groupOffset(), offsetCriteria);
        }).flatMapMany(map -> {
            return RecordRange.list(reactiveAdmin, map);
        }).filter((v0) -> {
            return v0.hasNonNegativeLength();
        }).sort(Comparator.comparing((v0) -> {
            return v0.topicPartition();
        }, topicPartitionComparator())).concatMap(recordRange -> {
            return receiveAndCommitRecordsInRange(reactiveAdmin, orElseThrow, recordRange);
        });
    }

    private AloFlux<ConsumerRecord<K, V>> receiveAndCommitRecordsInRange(ReactiveAdmin reactiveAdmin, String str, RecordRange recordRange) {
        Map singletonMap = Collections.singletonMap(recordRange.topicPartition(), Long.valueOf(recordRange.maxInclusive() + 1));
        ErrorEmitter create = ErrorEmitter.create();
        Runnable runnable = () -> {
            Mono<Void> alterRawConsumerGroupOffsets = reactiveAdmin.alterRawConsumerGroupOffsets(str, singletonMap);
            Consumer noOp = Consuming.noOp();
            Objects.requireNonNull(create);
            Consumer consumer = create::safelyEmit;
            Objects.requireNonNull(create);
            alterRawConsumerGroupOffsets.subscribe(noOp, consumer, create::safelyComplete);
        };
        Objects.requireNonNull(create);
        AloFlux just = AloFlux.just(new Alo[]{new ComposedAlo(recordRange, runnable, create::safelyEmit)});
        KafkaBoundedReceiver create2 = KafkaBoundedReceiver.create(this.configSource);
        Objects.requireNonNull(create2);
        AloFlux concatMap = just.concatMap(create2::receiveRecordsInRange);
        Objects.requireNonNull(create);
        return concatMap.transform((v1) -> {
            return r1.applyTo(v1);
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static OffsetRange toOffsetRange(long j, OffsetCriteria offsetCriteria) {
        return OffsetCriteria.raw(j).to(offsetCriteria);
    }

    private static Comparator<TopicPartition> topicPartitionComparator() {
        return Comparator.comparing((v0) -> {
            return v0.topic();
        }).thenComparing((v0) -> {
            return v0.partition();
        });
    }
}
