package io.atleon.kafka;

import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.function.Function;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;

/* loaded from: input_file:io/atleon/kafka/KafkaBoundedReceiver.class */
public class KafkaBoundedReceiver<K, V> {
    public static final String CONFIG_PREFIX = "kafka.bounded.receiver";
    public static final String POLL_TIMEOUT_CONFIG = "kafka.bounded.receiverpoll.timeout";
    public static final String CLOSE_TIMEOUT_CONFIG = "kafka.bounded.receiverclose.timeout";
    private static final Duration DEFAULT_POLL_TIMEOUT = Duration.ofMillis(100);
    private static final Duration DEFAULT_CLOSE_TIMEOUT = Duration.ofSeconds(30);
    private final KafkaConfigSource configSource;

    private KafkaBoundedReceiver(KafkaConfigSource kafkaConfigSource) {
        this.configSource = kafkaConfigSource;
    }

    public static <K, V> KafkaBoundedReceiver<K, V> create(KafkaConfigSource kafkaConfigSource) {
        return new KafkaBoundedReceiver<>(kafkaConfigSource);
    }

    public Flux<ConsumerRecord<K, V>> receiveRecords(String str, OffsetRange offsetRange) {
        return receiveRecords(Collections.singletonList(str), offsetRange);
    }

    public Flux<ConsumerRecord<K, V>> receiveRecords(Collection<String> collection, OffsetRange offsetRange) {
        return receiveRecords(collection, OffsetRangeProvider.inOffsetRangeFromAllTopicPartitions(offsetRange));
    }

    public Flux<ConsumerRecord<K, V>> receiveRecords(String str, OffsetRangeProvider offsetRangeProvider) {
        return receiveRecords(Collections.singletonList(str), offsetRangeProvider);
    }

    public Flux<ConsumerRecord<K, V>> receiveRecords(Collection<String> collection, OffsetRangeProvider offsetRangeProvider) {
        return ((Mono) this.configSource.create()).flatMapMany(kafkaConfig -> {
            return listRecordRanges(kafkaConfig, collection, offsetRangeProvider);
        }).filter((v0) -> {
            return v0.hasNonNegativeLength();
        }).concatMap(this::receiveRecordsInRange);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Flux<ConsumerRecord<K, V>> receiveRecordsInRange(RecordRange recordRange) {
        return ((Mono) this.configSource.create()).flatMapMany(kafkaConfig -> {
            return receiveRecordsInRange(kafkaConfig, recordRange);
        });
    }

    private Flux<ConsumerRecord<K, V>> receiveRecordsInRange(KafkaConfig kafkaConfig, RecordRange recordRange) {
        return KafkaReceiver.create(ReceiverOptions.create(kafkaConfig.nativeProperties()).assignment(Collections.singletonList(recordRange.topicPartition())).pollTimeout(kafkaConfig.loadDuration(POLL_TIMEOUT_CONFIG).orElse(DEFAULT_POLL_TIMEOUT)).closeTimeout(kafkaConfig.loadDuration(CLOSE_TIMEOUT_CONFIG).orElse(DEFAULT_CLOSE_TIMEOUT)).addAssignListener(collection -> {
            collection.forEach(receiverPartition -> {
                receiverPartition.seek(recordRange.minInclusive());
            });
        })).receive().map(Function.identity()).takeWhile(consumerRecord -> {
            return consumerRecord.offset() <= recordRange.maxInclusive();
        }).takeUntil(consumerRecord2 -> {
            return consumerRecord2.offset() == recordRange.maxInclusive();
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Flux<RecordRange> listRecordRanges(KafkaConfig kafkaConfig, Collection<String> collection, OffsetRangeProvider offsetRangeProvider) {
        return Flux.using(() -> {
            return ReactiveAdmin.create(kafkaConfig.nativeProperties());
        }, reactiveAdmin -> {
            return RecordRange.list(reactiveAdmin, collection, offsetRangeProvider);
        }, (v0) -> {
            v0.close();
        });
    }
}
