package io.atleon.core;

import io.atleon.util.Consuming;
import java.util.Collection;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.publisher.ConnectableFlux;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxOperator;
import reactor.util.context.Context;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/atleon/core/AloErrorDelegatingOperator.class */
public final class AloErrorDelegatingOperator<T> extends FluxOperator<Alo<T>, Alo<T>> {
    private static final Logger LOGGER = LoggerFactory.getLogger(AloErrorDelegatingOperator.class);
    private final BiFunction<? super T, ? super Throwable, ? extends Publisher<?>> delegator;

    /* loaded from: input_file:io/atleon/core/AloErrorDelegatingOperator$AloErrorDelegatingSubscriber.class */
    private final class AloErrorDelegatingSubscriber implements CoreSubscriber<Alo<T>> {
        private final CoreSubscriber<? super Alo<T>> actual;
        private final SerialQueue<Consumer<Collection<Disposable>>> inFlight;
        private volatile boolean unsuccessfullyDone;

        private AloErrorDelegatingSubscriber(CoreSubscriber<? super Alo<T>> coreSubscriber) {
            this.inFlight = SerialQueue.on(Collections.newSetFromMap(new IdentityHashMap()));
            this.unsuccessfullyDone = false;
            this.actual = coreSubscriber;
        }

        public Context currentContext() {
            return this.actual.currentContext();
        }

        public void onSubscribe(final Subscription subscription) {
            this.actual.onSubscribe(new Subscription() { // from class: io.atleon.core.AloErrorDelegatingOperator.AloErrorDelegatingSubscriber.1
                public void request(long j) {
                    subscription.request(j);
                }

                public void cancel() {
                    AloErrorDelegatingSubscriber.this.unsuccessfullyDone = true;
                    try {
                        subscription.cancel();
                    } finally {
                        AloErrorDelegatingSubscriber.this.safelyDisposeAllInFlight();
                    }
                }
            });
        }

        public void onNext(Alo<T> alo) {
            this.actual.onNext(alo.propagator().create(alo.get(), alo.getAcknowledger(), th -> {
                alo.runInContext(() -> {
                    delegateAloError(alo, th);
                });
            }));
        }

        public void onError(Throwable th) {
            this.unsuccessfullyDone = true;
            safelyDisposeAllInFlight();
            this.actual.onError(th);
        }

        public void onComplete() {
            this.actual.onComplete();
        }

        private void delegateAloError(Alo<T> alo, Throwable th) {
            AtomicReference atomicReference = new AtomicReference();
            ConnectableFlux publish = delegateError(alo.get(), th).doAfterTerminate(() -> {
                this.inFlight.addAndDrain(collection -> {
                    collection.remove(atomicReference.get());
                });
            }).publish();
            publish.subscribe(Consuming.noOp(), alo.getNacknowledger(), alo.getAcknowledger());
            this.inFlight.addAndDrain(collection -> {
                if (this.unsuccessfullyDone) {
                    return;
                }
                publish.connect(disposable -> {
                    atomicReference.set(disposable);
                    collection.add(disposable);
                });
            });
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void safelyDisposeAllInFlight() {
            try {
                this.inFlight.addAndDrain(collection -> {
                    collection.forEach((v0) -> {
                        v0.dispose();
                    });
                });
            } catch (Exception e) {
                AloErrorDelegatingOperator.LOGGER.error("Failed to dispose all in-flight error delegations. This may cause a memory leak...", e);
            }
        }

        private Flux<?> delegateError(T t, Throwable th) {
            try {
                return Flux.from((Publisher) AloErrorDelegatingOperator.this.delegator.apply(t, th)).onErrorMap(th2 -> {
                    return consolidateErrors(th, th2);
                });
            } catch (Throwable th3) {
                return Flux.error(consolidateErrors(th, th3));
            }
        }

        private Throwable consolidateErrors(Throwable th, Throwable th2) {
            if (th != th2) {
                th.addSuppressed(th2);
            }
            return th;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AloErrorDelegatingOperator(Flux<Alo<T>> flux, BiFunction<? super T, ? super Throwable, ? extends Publisher<?>> biFunction) {
        super(flux);
        this.delegator = biFunction;
    }

    public void subscribe(CoreSubscriber<? super Alo<T>> coreSubscriber) {
        this.source.subscribe(new AloErrorDelegatingSubscriber(coreSubscriber));
    }
}
