package org.axonframework.messaging;

import jakarta.annotation.Nonnull;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.axonframework.messaging.Message;
import org.axonframework.messaging.MessageStream;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.Flux;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/axonframework/messaging/FluxMessageStream.class */
public class FluxMessageStream<M extends Message<?>> implements MessageStream<M> {
    private final Flux<MessageStream.Entry<M>> source;
    private final BlockingQueue<MessageStream.Entry<M>> readAhead = new LinkedBlockingQueue(5);
    private final AtomicBoolean sourceSubscribed = new AtomicBoolean();
    private final AtomicReference<Subscription> subscription = new AtomicReference<>();
    private final AtomicReference<Runnable> availabilityCallback = new AtomicReference<>(() -> {
    });
    private final AtomicReference<Throwable> error = new AtomicReference<>();
    private final AtomicBoolean completed = new AtomicBoolean(false);
    private final AtomicBoolean closed = new AtomicBoolean(false);

    /* JADX INFO: Access modifiers changed from: package-private */
    public FluxMessageStream(@Nonnull Flux<MessageStream.Entry<M>> flux) {
        this.source = flux;
    }

    @Override // org.axonframework.messaging.MessageStream
    public Flux<MessageStream.Entry<M>> asFlux() {
        return this.source;
    }

    @Override // org.axonframework.messaging.MessageStream
    public <RM extends Message<?>> MessageStream<RM> map(@Nonnull Function<MessageStream.Entry<M>, MessageStream.Entry<RM>> function) {
        return new FluxMessageStream(this.source.map(function));
    }

    @Override // org.axonframework.messaging.MessageStream
    public <R> CompletableFuture<R> reduce(@Nonnull R r, @Nonnull BiFunction<R, MessageStream.Entry<M>, R> biFunction) {
        return this.source.reduce(r, biFunction).toFuture();
    }

    @Override // org.axonframework.messaging.MessageStream
    public Optional<MessageStream.Entry<M>> next() {
        subscribeToSource();
        MessageStream.Entry<M> poll = this.readAhead.poll();
        if (poll != null && this.readAhead.isEmpty() && !this.closed.get()) {
            this.subscription.get().request(1L);
        }
        return Optional.ofNullable(poll);
    }

    @Override // org.axonframework.messaging.MessageStream
    public void onAvailable(@Nonnull Runnable runnable) {
        this.availabilityCallback.set(runnable);
        if (hasNextAvailable() || isCompleted()) {
            runnable.run();
        }
        subscribeToSource();
    }

    @Override // org.axonframework.messaging.MessageStream
    public Optional<Throwable> error() {
        return this.readAhead.isEmpty() ? Optional.ofNullable(this.error.get()) : Optional.empty();
    }

    @Override // org.axonframework.messaging.MessageStream
    public boolean isCompleted() {
        return this.readAhead.isEmpty() && this.completed.get();
    }

    @Override // org.axonframework.messaging.MessageStream
    public boolean hasNextAvailable() {
        subscribeToSource();
        return !this.readAhead.isEmpty();
    }

    @Override // org.axonframework.messaging.MessageStream
    public void close() {
        this.closed.set(true);
        Subscription subscription = this.subscription.get();
        if (subscription != null) {
            subscription.cancel();
        }
    }

    private void subscribeToSource() {
        if (this.sourceSubscribed.getAndSet(true)) {
            return;
        }
        this.source.subscribe(new Subscriber<MessageStream.Entry<M>>() { // from class: org.axonframework.messaging.FluxMessageStream.1
            public void onSubscribe(Subscription subscription) {
                FluxMessageStream.this.subscription.set(subscription);
                subscription.request(1L);
            }

            public void onNext(MessageStream.Entry<M> entry) {
                FluxMessageStream.this.readAhead.add(entry);
                FluxMessageStream.this.availabilityCallback.get().run();
            }

            public void onError(Throwable th) {
                FluxMessageStream.this.error.set(th);
                FluxMessageStream.this.completed.set(true);
                FluxMessageStream.this.availabilityCallback.get().run();
            }

            public void onComplete() {
                FluxMessageStream.this.error.set(null);
                FluxMessageStream.this.completed.set(true);
                FluxMessageStream.this.availabilityCallback.get().run();
            }
        });
    }

    @Override // org.axonframework.messaging.MessageStream
    public MessageStream<M> onErrorContinue(@Nonnull Function<Throwable, MessageStream<M>> function) {
        return new FluxMessageStream(this.source.onErrorResume(th -> {
            return ((MessageStream) function.apply(th)).asFlux();
        }));
    }
}
