package io.atleon.core;

import java.util.List;
import java.util.Objects;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.GroupedFlux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/atleon/core/DeduplicatingTransformer.class */
public final class DeduplicatingTransformer<T> implements Function<Publisher<T>, Publisher<T>> {
    private final DeduplicationConfig config;
    private final Deduplicator<T, ?> deduplicator;
    private final Scheduler sourceScheduler;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/atleon/core/DeduplicatingTransformer$Deduplicator.class */
    public static final class Deduplicator<T, R> {
        private final Function<T, R> dataExtractor;
        private final Function<R, Object> keyExtractor;
        private final Function<List<T>, T> reducer;

        private Deduplicator(Function<T, R> function, Function<R, Object> function2, Function<List<T>, T> function3) {
            this.dataExtractor = function;
            this.keyExtractor = function2;
            this.reducer = function3;
        }

        public static <T> Deduplicator<T, T> identity(Deduplication<T> deduplication) {
            Function function = list -> {
                Objects.requireNonNull(deduplication);
                return reduceToSingle(list, deduplication::reduceDuplicates);
            };
            Function identity = Function.identity();
            Objects.requireNonNull(deduplication);
            return new Deduplicator<>(identity, deduplication::extractKey, function);
        }

        public static <T> Deduplicator<Alo<T>, T> alo(Deduplication<T> deduplication) {
            Function function = list -> {
                Objects.requireNonNull(deduplication);
                return reduceToSingleAlo(list, deduplication::reduceDuplicates);
            };
            Function function2 = (v0) -> {
                return v0.get();
            };
            Objects.requireNonNull(deduplication);
            return new Deduplicator<>(function2, deduplication::extractKey, function);
        }

        public Object extractKey(T t) {
            return this.keyExtractor.apply(this.dataExtractor.apply(t));
        }

        public T deduplicate(List<T> list) {
            return this.reducer.apply(list);
        }

        private static <T> Alo<T> reduceToSingleAlo(List<Alo<T>> list, BinaryOperator<T> binaryOperator) {
            if (list.isEmpty()) {
                throw newEmptyDeduplicationGroupException();
            }
            return list.size() == 1 ? list.get(0) : AloOps.fanIn(list).map(list2 -> {
                return reduceToSingle(list2, binaryOperator);
            });
        }

        /* JADX INFO: Access modifiers changed from: private */
        public static <T> T reduceToSingle(List<T> list, BinaryOperator<T> binaryOperator) {
            return list.stream().reduce(binaryOperator).orElseThrow(Deduplicator::newEmptyDeduplicationGroupException);
        }

        private static IllegalStateException newEmptyDeduplicationGroupException() {
            return new IllegalStateException("Something bad has happened. Deduplication group was empty.");
        }
    }

    private DeduplicatingTransformer(DeduplicationConfig deduplicationConfig, Deduplicator<T, ?> deduplicator, Scheduler scheduler) {
        this.config = deduplicationConfig;
        this.deduplicator = deduplicator;
        this.sourceScheduler = scheduler;
    }

    static <T> DeduplicatingTransformer<T> identity(DeduplicationConfig deduplicationConfig, Deduplication<T> deduplication, Scheduler scheduler) {
        return new DeduplicatingTransformer<>(deduplicationConfig, Deduplicator.identity(deduplication), scheduler);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <T> DeduplicatingTransformer<Alo<T>> alo(DeduplicationConfig deduplicationConfig, Deduplication<T> deduplication, Scheduler scheduler) {
        return new DeduplicatingTransformer<>(deduplicationConfig, Deduplicator.alo(deduplication), scheduler);
    }

    @Override // java.util.function.Function
    public Publisher<T> apply(Publisher<T> publisher) {
        return this.config.isEnabled() ? Flux.from(publisher).switchOnFirst((signal, flux) -> {
            return flux.transform((v1) -> {
                return applyDeduplication(v1);
            });
        }) : publisher;
    }

    private Flux<T> applyDeduplication(Publisher<T> publisher) {
        Scheduler single = Schedulers.single(this.sourceScheduler);
        Flux publishOn = Flux.from(publisher).publishOn(single, this.config.getDeduplicationSourcePrefetch());
        Deduplicator<T, ?> deduplicator = this.deduplicator;
        Objects.requireNonNull(deduplicator);
        return publishOn.groupBy(deduplicator::extractKey).flatMap(groupedFlux -> {
            return deduplicateGroup(groupedFlux, single);
        }, this.config.getDeduplicationConcurrency()).subscribeOn(single);
    }

    private Mono<T> deduplicateGroup(GroupedFlux<Object, T> groupedFlux, Scheduler scheduler) {
        Mono collectList = groupedFlux.take(this.config.getDeduplicationTimeout(), scheduler).take(this.config.getMaxDeduplicationSize()).collectList();
        Deduplicator<T, ?> deduplicator = this.deduplicator;
        Objects.requireNonNull(deduplicator);
        return collectList.map(deduplicator::deduplicate);
    }
}
