package org.axonframework.messaging;

import jakarta.annotation.Nonnull;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import org.axonframework.messaging.MessageStream;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;

/* loaded from: input_file:org/axonframework/messaging/MessageStreamUtils.class */
public abstract class MessageStreamUtils {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/axonframework/messaging/MessageStreamUtils$FirstResult.class */
    public static class FirstResult<M extends Message<?>> {
        private final MessageStream<M> source;
        private final AtomicBoolean processingGate = new AtomicBoolean(false);
        private final CompletableFuture<MessageStream.Entry<M>> result = new CompletableFuture<>();

        public FirstResult(MessageStream<M> messageStream) {
            this.source = messageStream;
        }

        public void process() {
            if (this.processingGate.getAndSet(true)) {
                return;
            }
            try {
                if (!this.result.isDone() && this.source.hasNextAvailable()) {
                    Optional<MessageStream.Entry<M>> next = this.source.next();
                    CompletableFuture<MessageStream.Entry<M>> completableFuture = this.result;
                    Objects.requireNonNull(completableFuture);
                    next.ifPresent((v1) -> {
                        r1.complete(v1);
                    });
                }
                if (this.source.isCompleted() && !this.result.isDone()) {
                    Optional<Throwable> error = this.source.error();
                    CompletableFuture<MessageStream.Entry<M>> completableFuture2 = this.result;
                    Objects.requireNonNull(completableFuture2);
                    error.ifPresentOrElse(completableFuture2::completeExceptionally, () -> {
                        this.result.complete(null);
                    });
                }
            } finally {
                this.processingGate.set(false);
            }
        }

        public CompletableFuture<MessageStream.Entry<M>> result() {
            return this.result;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/axonframework/messaging/MessageStreamUtils$FluxStreamAdapter.class */
    public static class FluxStreamAdapter<M extends Message<?>> {
        private final AtomicBoolean processingGate = new AtomicBoolean(false);
        private final MessageStream<M> source;
        private final FluxSink<MessageStream.Entry<M>> emitter;

        public FluxStreamAdapter(MessageStream<M> messageStream, FluxSink<MessageStream.Entry<M>> fluxSink) {
            this.source = messageStream;
            this.emitter = fluxSink;
        }

        public void process() {
            boolean z = true;
            while (z && !this.processingGate.getAndSet(true)) {
                while (this.emitter.requestedFromDownstream() > 0 && this.source.hasNextAvailable()) {
                    try {
                        Optional<MessageStream.Entry<M>> next = this.source.next();
                        FluxSink<MessageStream.Entry<M>> fluxSink = this.emitter;
                        Objects.requireNonNull(fluxSink);
                        next.ifPresent((v1) -> {
                            r1.next(v1);
                        });
                    } catch (Exception e) {
                        this.emitter.error(e);
                        this.source.close();
                    } finally {
                        this.processingGate.set(false);
                    }
                }
                if (this.source.isCompleted()) {
                    Optional<Throwable> error = this.source.error();
                    FluxSink<MessageStream.Entry<M>> fluxSink2 = this.emitter;
                    Objects.requireNonNull(fluxSink2);
                    Consumer<? super Throwable> consumer = fluxSink2::error;
                    FluxSink<MessageStream.Entry<M>> fluxSink3 = this.emitter;
                    Objects.requireNonNull(fluxSink3);
                    error.ifPresentOrElse(consumer, fluxSink3::complete);
                }
                z = this.emitter.requestedFromDownstream() > 0 && this.source.hasNextAvailable();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/axonframework/messaging/MessageStreamUtils$Reducer.class */
    public static class Reducer<M extends Message<?>, R> {
        private final MessageStream<M> source;
        private final BiFunction<R, MessageStream.Entry<M>, R> accumulator;
        private final AtomicReference<R> intermediateResult;
        private final AtomicBoolean processingGate = new AtomicBoolean(false);
        private final CompletableFuture<R> result = new CompletableFuture<>();

        public Reducer(MessageStream<M> messageStream, R r, BiFunction<R, MessageStream.Entry<M>, R> biFunction) {
            this.source = messageStream;
            this.intermediateResult = new AtomicReference<>(r);
            this.accumulator = biFunction;
        }

        public CompletableFuture<R> result() {
            return this.result;
        }

        public void process() {
            boolean z = true;
            while (z && !this.processingGate.getAndSet(true)) {
                while (this.source.hasNextAvailable()) {
                    try {
                        this.source.next().ifPresent(entry -> {
                            this.intermediateResult.updateAndGet(obj -> {
                                return this.accumulator.apply(obj, entry);
                            });
                        });
                    } catch (Exception e) {
                        this.result.completeExceptionally(e);
                        this.source.close();
                    } finally {
                        this.processingGate.set(false);
                    }
                }
                if (this.source.isCompleted()) {
                    Optional<Throwable> error = this.source.error();
                    CompletableFuture<R> completableFuture = this.result;
                    Objects.requireNonNull(completableFuture);
                    error.ifPresentOrElse(completableFuture::completeExceptionally, () -> {
                        this.result.complete(this.intermediateResult.get());
                    });
                }
                z = !this.result.isDone() && (this.source.hasNextAvailable() || this.source.isCompleted());
            }
        }
    }

    private MessageStreamUtils() {
    }

    public static <M extends Message<?>> Flux<MessageStream.Entry<M>> asFlux(@Nonnull MessageStream<M> messageStream) {
        return Flux.create(fluxSink -> {
            FluxStreamAdapter fluxStreamAdapter = new FluxStreamAdapter(messageStream, fluxSink);
            fluxSink.onRequest(j -> {
                fluxStreamAdapter.process();
            });
            Objects.requireNonNull(fluxStreamAdapter);
            messageStream.onAvailable(fluxStreamAdapter::process);
        });
    }

    public static <M extends Message<?>, R> CompletableFuture<R> reduce(@Nonnull MessageStream<M> messageStream, @Nonnull R r, @Nonnull BiFunction<R, MessageStream.Entry<M>, R> biFunction) {
        Reducer reducer = new Reducer(messageStream, r, biFunction);
        Objects.requireNonNull(reducer);
        messageStream.onAvailable(reducer::process);
        return reducer.result();
    }

    public static <M extends Message<?>> CompletableFuture<MessageStream.Entry<M>> asCompletableFuture(@Nonnull MessageStream<M> messageStream) {
        FirstResult firstResult = new FirstResult(messageStream);
        Objects.requireNonNull(firstResult);
        messageStream.onAvailable(firstResult::process);
        return firstResult.result();
    }
}
