package org.axonframework.messaging;

import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.axonframework.messaging.Message;
import org.axonframework.messaging.MessageStream;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/axonframework/messaging/SingleValueMessageStream.class */
public class SingleValueMessageStream<M extends Message<?>> implements MessageStream.Single<M> {
    private final CompletableFuture<MessageStream.Entry<M>> source;
    private final AtomicBoolean read;

    /* JADX INFO: Access modifiers changed from: package-private */
    public SingleValueMessageStream(@Nullable MessageStream.Entry<M> entry) {
        this(CompletableFuture.completedFuture(entry));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SingleValueMessageStream(@Nonnull CompletableFuture<MessageStream.Entry<M>> completableFuture) {
        this.read = new AtomicBoolean(false);
        this.source = completableFuture;
    }

    @Override // org.axonframework.messaging.MessageStream.Single
    public CompletableFuture<MessageStream.Entry<M>> asCompletableFuture() {
        return this.source;
    }

    @Override // org.axonframework.messaging.MessageStream
    public Flux<MessageStream.Entry<M>> asFlux() {
        return Flux.from(asMono());
    }

    @Override // org.axonframework.messaging.MessageStream.Single
    public Mono<MessageStream.Entry<M>> asMono() {
        return Mono.fromFuture(this.source);
    }

    @Override // org.axonframework.messaging.MessageStream
    public Optional<MessageStream.Entry<M>> next() {
        if (this.source.isDone() && !this.source.isCompletedExceptionally()) {
            MessageStream.Entry<M> now = this.source.getNow(null);
            if (this.read.compareAndSet(false, true)) {
                return Optional.of(now);
            }
        }
        return Optional.empty();
    }

    @Override // org.axonframework.messaging.MessageStream
    public void onAvailable(@Nonnull Runnable runnable) {
        if (this.source.isDone()) {
            runnable.run();
        } else {
            this.source.whenComplete((entry, th) -> {
                runnable.run();
            });
        }
    }

    @Override // org.axonframework.messaging.MessageStream
    public Optional<Throwable> error() {
        return this.source.isCompletedExceptionally() ? Optional.of(this.source.exceptionNow()) : Optional.empty();
    }

    @Override // org.axonframework.messaging.MessageStream
    public boolean isCompleted() {
        return this.source.isCompletedExceptionally() || this.read.get();
    }

    @Override // org.axonframework.messaging.MessageStream
    public boolean hasNextAvailable() {
        return (!this.source.isDone() || this.source.isCompletedExceptionally() || this.read.get()) ? false : true;
    }

    @Override // org.axonframework.messaging.MessageStream
    public void close() {
        if (this.source.isDone()) {
            return;
        }
        this.source.cancel(false);
    }

    @Override // org.axonframework.messaging.MessageStream.Single, org.axonframework.messaging.MessageStream
    public <RM extends Message<?>> MessageStream.Single<RM> map(@Nonnull Function<MessageStream.Entry<M>, MessageStream.Entry<RM>> function) {
        return new SingleValueMessageStream(this.source.thenApply((Function<? super MessageStream.Entry<M>, ? extends U>) function));
    }

    @Override // org.axonframework.messaging.MessageStream
    public <R> CompletableFuture<R> reduce(@Nonnull R r, @Nonnull BiFunction<R, MessageStream.Entry<M>, R> biFunction) {
        return (CompletableFuture<R>) this.source.thenApply(entry -> {
            return biFunction.apply(r, entry);
        });
    }
}
