package org.axonframework.messaging;

import jakarta.annotation.Nonnull;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.function.BiFunction;
import org.axonframework.messaging.Message;
import org.axonframework.messaging.MessageStream;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/axonframework/messaging/DelayedMessageStream.class */
public class DelayedMessageStream<M extends Message<?>> implements MessageStream<M> {
    private final CompletableFuture<? extends MessageStream<M>> delegate;

    /* loaded from: input_file:org/axonframework/messaging/DelayedMessageStream$Empty.class */
    static class Empty<M extends Message<?>> extends DelayedMessageStream<M> implements MessageStream.Empty<M> {
        Empty(@Nonnull CompletableFuture<MessageStream.Empty<M>> completableFuture) {
            super(completableFuture);
        }
    }

    /* loaded from: input_file:org/axonframework/messaging/DelayedMessageStream$Single.class */
    static class Single<M extends Message<?>> extends DelayedMessageStream<M> implements MessageStream.Single<M> {
        Single(@Nonnull CompletableFuture<MessageStream.Single<M>> completableFuture) {
            super(completableFuture);
        }
    }

    private DelayedMessageStream(@Nonnull CompletableFuture<? extends MessageStream<M>> completableFuture) {
        this.delegate = completableFuture;
    }

    public static <M extends Message<?>> MessageStream<M> create(@Nonnull CompletableFuture<? extends MessageStream<M>> completableFuture) {
        CompletableFuture<U> thenApply = completableFuture.exceptionallyCompose(CompletableFuture::failedFuture).thenApply(messageStream -> {
            return (MessageStream) Objects.requireNonNullElse(messageStream, MessageStream.empty().cast());
        });
        if (!thenApply.isDone()) {
            return new DelayedMessageStream(thenApply);
        }
        try {
            return completableFuture.get();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return new DelayedMessageStream(thenApply);
        } catch (java.util.concurrent.ExecutionException e2) {
            return MessageStream.failed(e2.getCause());
        }
    }

    public static <M extends Message<?>> MessageStream.Single<M> createSingle(@Nonnull CompletableFuture<MessageStream.Single<M>> completableFuture) {
        CompletableFuture<U> thenApply = completableFuture.exceptionallyCompose(CompletableFuture::failedFuture).thenApply(single -> {
            return (MessageStream.Single) Objects.requireNonNullElse(single, MessageStream.empty().cast());
        });
        if (!thenApply.isDone()) {
            return new Single(thenApply);
        }
        try {
            return completableFuture.get();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return new Single(thenApply);
        } catch (java.util.concurrent.ExecutionException e2) {
            return MessageStream.failed(e2.getCause());
        }
    }

    @Override // org.axonframework.messaging.MessageStream
    public Flux<MessageStream.Entry<M>> asFlux() {
        return Mono.fromFuture(this.delegate).flatMapMany((v0) -> {
            return v0.asFlux();
        });
    }

    @Override // org.axonframework.messaging.MessageStream
    public Optional<MessageStream.Entry<M>> next() {
        return (!this.delegate.isDone() || this.delegate.isCompletedExceptionally()) ? Optional.empty() : this.delegate.getNow(null).next();
    }

    @Override // org.axonframework.messaging.MessageStream
    public void onAvailable(@Nonnull Runnable runnable) {
        this.delegate.whenComplete((messageStream, th) -> {
            if (messageStream != null) {
                messageStream.onAvailable(runnable);
            } else {
                runnable.run();
            }
        });
    }

    @Override // org.axonframework.messaging.MessageStream
    public Optional<Throwable> error() {
        if (!this.delegate.isDone()) {
            return Optional.empty();
        }
        if (this.delegate.isCompletedExceptionally() && !this.delegate.isCancelled()) {
            return Optional.of(this.delegate.exceptionNow());
        }
        try {
            return this.delegate.getNow(null).error();
        } catch (CancellationException | CompletionException e) {
            return Optional.of(e);
        }
    }

    @Override // org.axonframework.messaging.MessageStream
    public boolean isCompleted() {
        return this.delegate.isDone() && (this.delegate.isCompletedExceptionally() || this.delegate.getNow(null).isCompleted());
    }

    @Override // org.axonframework.messaging.MessageStream
    public boolean hasNextAvailable() {
        return this.delegate.isDone() && !this.delegate.isCompletedExceptionally() && this.delegate.getNow(null).hasNextAvailable();
    }

    @Override // org.axonframework.messaging.MessageStream
    public void close() {
        if (!this.delegate.isDone()) {
            this.delegate.cancel(false);
        } else {
            if (this.delegate.isCompletedExceptionally()) {
                return;
            }
            this.delegate.getNow(null).close();
        }
    }

    @Override // org.axonframework.messaging.MessageStream
    public <R> CompletableFuture<R> reduce(@Nonnull R r, @Nonnull BiFunction<R, MessageStream.Entry<M>, R> biFunction) {
        return (CompletableFuture<R>) this.delegate.thenCompose(messageStream -> {
            return messageStream.reduce(r, biFunction);
        });
    }

    @Override // org.axonframework.messaging.MessageStream
    public Optional<MessageStream.Entry<M>> peek() {
        return (!this.delegate.isDone() || this.delegate.isCompletedExceptionally()) ? Optional.empty() : this.delegate.getNow(null).peek();
    }
}
