package io.atleon.kafka;

import io.atleon.core.AcknowledgementQueueMode;
import io.atleon.core.Alo;
import io.atleon.core.AloComponentExtractor;
import io.atleon.core.AloFactory;
import io.atleon.core.AloFactoryConfig;
import io.atleon.core.AloFlux;
import io.atleon.core.AloQueueListener;
import io.atleon.core.AloQueueListenerConfig;
import io.atleon.core.AloQueueingTransformer;
import io.atleon.core.AloSignalListenerFactory;
import io.atleon.core.AloSignalListenerFactoryConfig;
import io.atleon.core.ErrorEmitter;
import io.atleon.kafka.ConsumerMutexEnforcer;
import io.atleon.kafka.NacknowledgerFactory;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.IntStream;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOffset;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.receiver.ReceiverRecord;

/* loaded from: input_file:io/atleon/kafka/AloKafkaReceiver.class */
public class AloKafkaReceiver<K, V> {
    public static final String CONFIG_PREFIX = "kafka.receiver.";
    public static final String ACKNOWLEDGEMENT_QUEUE_MODE_CONFIG = "kafka.receiver.acknowledgement.queue.mode";
    public static final String NACKNOWLEDGER_TYPE_CONFIG = "kafka.receiver.nacknowledger.type";
    public static final String NACKNOWLEDGER_TYPE_EMIT = "emit";
    public static final String ERROR_EMISSION_TIMEOUT_CONFIG = "kafka.receiver.error.emission.timeout";
    public static final String MAX_IN_FLIGHT_PER_SUBSCRIPTION_CONFIG = "kafka.receiver.max.in.flight.per.subscription";
    public static final String AUTO_INCREMENT_CLIENT_ID_CONFIG = "kafka.receiver.auto.increment.client.id";
    public static final String POLL_TIMEOUT_CONFIG = "kafka.receiver.poll.timeout";
    public static final String COMMIT_INTERVAL_CONFIG = "kafka.receiver.commit.interval";
    public static final String MAX_COMMIT_ATTEMPTS_CONFIG = "kafka.receiver.max.commit.attempts";
    public static final String CLOSE_TIMEOUT_CONFIG = "kafka.receiver.close.timeout";
    public static final String MAX_DELAY_REBALANCE_CONFIG = "kafka.receiver.max.delay.rebalance";
    private static final long DEFAULT_MAX_IN_FLIGHT_PER_SUBSCRIPTION = 4096;
    private static final boolean DEFAULT_AUTO_INCREMENT_CLIENT_ID = false;
    private final KafkaConfigSource configSource;
    private static final AcknowledgementQueueMode DEFAULT_ACKNOWLEDGEMENT_QUEUE_MODE = AcknowledgementQueueMode.STRICT;
    private static final Duration DEFAULT_CLOSE_TIMEOUT = Duration.ofSeconds(30);
    private static final Map<String, AtomicLong> COUNTS_BY_ID = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/atleon/kafka/AloKafkaReceiver$ReceiveResources.class */
    public static final class ReceiveResources<K, V> {
        private final KafkaConfig config;
        private final NacknowledgerFactory<K, V> nacknowledgerFactory;
        private final AcknowledgementQueueMode acknowledgementQueueMode;

        public ReceiveResources(KafkaConfig kafkaConfig) {
            this.config = kafkaConfig;
            this.nacknowledgerFactory = createNacknowledgerFactory(kafkaConfig);
            this.acknowledgementQueueMode = loadAcknowledgementQueueMode(kafkaConfig);
        }

        public Flux<Alo<ConsumerRecord<K, V>>> receive(ReceiverOptionsInitializer<K, V> receiverOptionsInitializer, ConsumerMutexEnforcer consumerMutexEnforcer) {
            ErrorEmitter<Alo<ConsumerRecord<K, V>>> newErrorEmitter = newErrorEmitter();
            ReceiverOptions<K, V> newReceiverOptions = newReceiverOptions(receiverOptionsInitializer);
            ConsumerMutexEnforcer.ProhibitableConsumerFactory newConsumerFactory = consumerMutexEnforcer.newConsumerFactory();
            Flux receive = KafkaReceiver.create(newConsumerFactory, newReceiverOptions).receive();
            Objects.requireNonNull(newErrorEmitter);
            Flux transform = receive.transform(newAloQueueingTransformer(newErrorEmitter::safelyEmit));
            Objects.requireNonNull(newErrorEmitter);
            return transform.transform((v1) -> {
                return r1.applyTo(v1);
            }).transform(this::applySignalListenerFactories).doFinally(signalType -> {
                newConsumerFactory.prohibitFurtherConsumption(newReceiverOptions.closeTimeout().multipliedBy(2L));
            });
        }

        private ErrorEmitter<Alo<ConsumerRecord<K, V>>> newErrorEmitter() {
            return ErrorEmitter.create(this.config.loadDuration(AloKafkaReceiver.ERROR_EMISSION_TIMEOUT_CONFIG).orElse(ErrorEmitter.DEFAULT_TIMEOUT));
        }

        private ReceiverOptions<K, V> newReceiverOptions(ReceiverOptionsInitializer<K, V> receiverOptionsInitializer) {
            ReceiverOptions<K, V> initialize = receiverOptionsInitializer.initialize(newConsumerConfig());
            return initialize.pollTimeout(this.config.loadDuration(AloKafkaReceiver.POLL_TIMEOUT_CONFIG).orElse(initialize.pollTimeout())).commitInterval(this.config.loadDuration(AloKafkaReceiver.COMMIT_INTERVAL_CONFIG).orElse(initialize.commitInterval())).maxCommitAttempts(this.config.loadInt(AloKafkaReceiver.MAX_COMMIT_ATTEMPTS_CONFIG).orElse(Integer.valueOf(initialize.maxCommitAttempts())).intValue()).closeTimeout(this.config.loadDuration(AloKafkaReceiver.CLOSE_TIMEOUT_CONFIG).orElse(AloKafkaReceiver.DEFAULT_CLOSE_TIMEOUT)).maxDelayRebalance(loadMaxDelayRebalance().orElse(initialize.maxDelayRebalance()));
        }

        private Map<String, Object> newConsumerConfig() {
            return this.config.modifyAndGetProperties(map -> {
                map.keySet().removeIf(str -> {
                    return str.startsWith(AloKafkaReceiver.CONFIG_PREFIX);
                });
                if (this.config.loadBoolean(AloKafkaReceiver.AUTO_INCREMENT_CLIENT_ID_CONFIG).orElse(false).booleanValue()) {
                    map.computeIfPresent("client.id", (str2, obj) -> {
                        return incrementId(obj.toString());
                    });
                }
            });
        }

        private Optional<Duration> loadMaxDelayRebalance() {
            Optional<Duration> loadDuration = this.config.loadDuration(AloKafkaReceiver.MAX_DELAY_REBALANCE_CONFIG);
            return (loadDuration.isPresent() || this.acknowledgementQueueMode == AcknowledgementQueueMode.STRICT) ? loadDuration : Optional.of(Duration.ZERO);
        }

        private AloQueueingTransformer<ReceiverRecord<K, V>, ConsumerRecord<K, V>> newAloQueueingTransformer(Consumer<Throwable> consumer) {
            return AloQueueingTransformer.create(newComponentExtractor(consumer)).withGroupExtractor(receiverRecord -> {
                return receiverRecord.receiverOffset().topicPartition();
            }).withQueueMode(this.acknowledgementQueueMode).withListener(loadQueueListener()).withFactory(loadAloFactory()).withMaxInFlight(loadMaxInFlightPerSubscription());
        }

        private AloComponentExtractor<ReceiverRecord<K, V>, ConsumerRecord<K, V>> newComponentExtractor(Consumer<Throwable> consumer) {
            return AloComponentExtractor.composed(receiverRecord -> {
                ReceiverOffset receiverOffset = receiverRecord.receiverOffset();
                Objects.requireNonNull(receiverOffset);
                return receiverOffset::acknowledge;
            }, receiverRecord2 -> {
                return this.nacknowledgerFactory.create(receiverRecord2, consumer);
            }, Function.identity());
        }

        private AloQueueListener loadQueueListener() {
            return (AloQueueListener) AloQueueListenerConfig.load(this.config.modifyAndGetProperties(map -> {
            }), AloKafkaQueueListener.class).orElseGet(AloQueueListener::noOp);
        }

        private AloFactory<ConsumerRecord<K, V>> loadAloFactory() {
            return AloFactoryConfig.loadDecorated(this.config.modifyAndGetProperties(map -> {
            }), AloKafkaConsumerRecordDecorator.class);
        }

        private long loadMaxInFlightPerSubscription() {
            return this.config.loadLong(AloKafkaReceiver.MAX_IN_FLIGHT_PER_SUBSCRIPTION_CONFIG).orElse(Long.valueOf(AloKafkaReceiver.DEFAULT_MAX_IN_FLIGHT_PER_SUBSCRIPTION)).longValue();
        }

        private Flux<Alo<ConsumerRecord<K, V>>> applySignalListenerFactories(Flux<Alo<ConsumerRecord<K, V>>> flux) {
            Iterator it = AloSignalListenerFactoryConfig.loadList(this.config.modifyAndGetProperties(map -> {
            }), AloKafkaConsumerRecordSignalListenerFactory.class).iterator();
            while (it.hasNext()) {
                flux = flux.tap((AloSignalListenerFactory) it.next());
            }
            return flux;
        }

        private static <K, V> NacknowledgerFactory<K, V> createNacknowledgerFactory(KafkaConfig kafkaConfig) {
            return (NacknowledgerFactory) loadNacknowledgerFactory(kafkaConfig, AloKafkaReceiver.NACKNOWLEDGER_TYPE_CONFIG, NacknowledgerFactory.class).orElseGet(NacknowledgerFactory.Emit::new);
        }

        private static <K, V, N extends NacknowledgerFactory<K, V>> Optional<NacknowledgerFactory<K, V>> loadNacknowledgerFactory(KafkaConfig kafkaConfig, String str, Class<N> cls) {
            return kafkaConfig.loadConfiguredWithPredefinedTypes(str, cls, ReceiveResources::newPredefinedNacknowledgerFactory);
        }

        private static <K, V> Optional<NacknowledgerFactory<K, V>> newPredefinedNacknowledgerFactory(String str) {
            return str.equalsIgnoreCase(AloKafkaReceiver.NACKNOWLEDGER_TYPE_EMIT) ? Optional.of(new NacknowledgerFactory.Emit()) : Optional.empty();
        }

        private static AcknowledgementQueueMode loadAcknowledgementQueueMode(KafkaConfig kafkaConfig) {
            return (AcknowledgementQueueMode) kafkaConfig.loadEnum(AloKafkaReceiver.ACKNOWLEDGEMENT_QUEUE_MODE_CONFIG, AcknowledgementQueueMode.class).orElse(AloKafkaReceiver.DEFAULT_ACKNOWLEDGEMENT_QUEUE_MODE);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public static String incrementId(String str) {
            return str + "-" + ((AtomicLong) AloKafkaReceiver.COUNTS_BY_ID.computeIfAbsent(str, str2 -> {
                return new AtomicLong();
            })).incrementAndGet();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/atleon/kafka/AloKafkaReceiver$ReceiverOptionsInitializer.class */
    public interface ReceiverOptionsInitializer<K, V> {
        ReceiverOptions<K, V> initialize(Map<String, Object> map);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/atleon/kafka/AloKafkaReceiver$ReceptionFactory.class */
    public static final class ReceptionFactory<K, V> {
        private final ReceiverOptionsInitializer<K, V> optionsInitializer;
        private final Map<Object, ConsumerMutexEnforcer> consumerMutexEnforcers = new ConcurrentHashMap();

        private ReceptionFactory(ReceiverOptionsInitializer<K, V> receiverOptionsInitializer) {
            this.optionsInitializer = receiverOptionsInitializer;
        }

        public static <K, V> ReceptionFactory<K, V> topics(Collection<String> collection) {
            return new ReceptionFactory<>(map -> {
                return ReceiverOptions.create(map).subscription(collection);
            });
        }

        public static <K, V> ReceptionFactory<K, V> topicsPattern(Pattern pattern) {
            return new ReceptionFactory<>(map -> {
                return ReceiverOptions.create(map).subscription(pattern);
            });
        }

        public Flux<Alo<ConsumerRecord<K, V>>> receive(KafkaConfig kafkaConfig) {
            return receive(-1, kafkaConfig);
        }

        public Flux<Alo<ConsumerRecord<K, V>>>[] receive(List<KafkaConfig> list) {
            return (Flux[]) IntStream.range(AloKafkaReceiver.DEFAULT_AUTO_INCREMENT_CLIENT_ID, list.size()).mapToObj(i -> {
                return receive(Integer.valueOf(i), (KafkaConfig) list.get(i));
            }).toArray(i2 -> {
                return new Flux[i2];
            });
        }

        private Flux<Alo<ConsumerRecord<K, V>>> receive(Object obj, KafkaConfig kafkaConfig) {
            return new ReceiveResources(kafkaConfig).receive(this.optionsInitializer, this.consumerMutexEnforcers.computeIfAbsent(obj, obj2 -> {
                return new ConsumerMutexEnforcer();
            }));
        }
    }

    private AloKafkaReceiver(KafkaConfigSource kafkaConfigSource) {
        this.configSource = kafkaConfigSource;
    }

    public static <K, V> AloKafkaReceiver<K, V> from(KafkaConfigSource kafkaConfigSource) {
        return create(kafkaConfigSource);
    }

    public static <K, V> AloKafkaReceiver<K, V> create(KafkaConfigSource kafkaConfigSource) {
        return new AloKafkaReceiver<>(kafkaConfigSource);
    }

    @Deprecated
    public static <V> AloKafkaReceiver<Object, V> forValues(KafkaConfigSource kafkaConfigSource) {
        return new AloKafkaReceiver<>(kafkaConfigSource);
    }

    public AloFlux<V> receiveAloValues(String str) {
        return receiveAloValues(Collections.singletonList(str));
    }

    public AloFlux<V> receiveAloValues(Collection<String> collection) {
        return receiveAloRecords(collection).mapNotNull((v0) -> {
            return v0.value();
        });
    }

    public AloFlux<V> receiveAloValues(Pattern pattern) {
        return receiveAloRecords(pattern).mapNotNull((v0) -> {
            return v0.value();
        });
    }

    public AloFlux<ConsumerRecord<K, V>> receiveAloRecords(String str) {
        return receiveAloRecords(Collections.singletonList(str));
    }

    public AloFlux<ConsumerRecord<K, V>> receiveAloRecords(Collection<String> collection) {
        ReceptionFactory receptionFactory = ReceptionFactory.topics(collection);
        Mono mono = (Mono) this.configSource.create();
        Objects.requireNonNull(receptionFactory);
        return (AloFlux) mono.flatMapMany(receptionFactory::receive).as((v0) -> {
            return AloFlux.wrap(v0);
        });
    }

    public AloFlux<ConsumerRecord<K, V>> receiveAloRecords(Pattern pattern) {
        ReceptionFactory receptionFactory = ReceptionFactory.topicsPattern(pattern);
        Mono mono = (Mono) this.configSource.create();
        Objects.requireNonNull(receptionFactory);
        return (AloFlux) mono.flatMapMany(receptionFactory::receive).as((v0) -> {
            return AloFlux.wrap(v0);
        });
    }

    public AloFlux<ConsumerRecord<K, V>> receivePrioritizedAloRecords(String str, ReceptionPrioritization receptionPrioritization) {
        return receivePrioritizedAloRecords(Collections.singleton(str), receptionPrioritization, 256);
    }

    public AloFlux<ConsumerRecord<K, V>> receivePrioritizedAloRecords(Collection<String> collection, ReceptionPrioritization receptionPrioritization, int i) {
        ReceptionFactory receptionFactory = ReceptionFactory.topics(collection);
        Mono flatMap = ((Mono) this.configSource.create()).flatMap(kafkaConfig -> {
            return createOrderedConfigs(kafkaConfig, collection, receptionPrioritization);
        });
        Objects.requireNonNull(receptionFactory);
        return (AloFlux) flatMap.map(receptionFactory::receive).flatMapMany(fluxArr -> {
            return Flux.mergePriority(i, toComparator(receptionPrioritization), fluxArr);
        }).as((v0) -> {
            return AloFlux.wrap(v0);
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Mono<List<KafkaConfig>> createOrderedConfigs(KafkaConfig kafkaConfig, Collection<String> collection, ReceptionPrioritization receptionPrioritization) {
        return listTopicPartitions(kafkaConfig, collection).map(topicPartition -> {
            return Integer.valueOf(prioritize(receptionPrioritization, topicPartition));
        }).distinct().sort(Comparator.naturalOrder()).map(num -> {
            return kafkaConfig.withClientIdSuffix("-", "p" + num);
        }).collectList();
    }

    private static Flux<TopicPartition> listTopicPartitions(KafkaConfig kafkaConfig, Collection<String> collection) {
        return Flux.using(() -> {
            return ReactiveAdmin.create(kafkaConfig.nativeProperties());
        }, reactiveAdmin -> {
            return reactiveAdmin.listTopicPartitions((Collection<String>) collection);
        }, (v0) -> {
            v0.close();
        });
    }

    private static int prioritize(ReceptionPrioritization receptionPrioritization, TopicPartition topicPartition) {
        int prioritize = receptionPrioritization.prioritize(topicPartition);
        if (prioritize < 0) {
            throw new IllegalStateException("Priority must be non-negative for topicPartition=" + topicPartition);
        }
        return prioritize;
    }

    private static <K, V> Comparator<Alo<ConsumerRecord<K, V>>> toComparator(ReceptionPrioritization receptionPrioritization) {
        return Comparator.comparing(alo -> {
            return Integer.valueOf(receptionPrioritization.prioritize(ConsumerRecordExtraction.topicPartition((ConsumerRecord) alo.get())));
        });
    }
}
