package org.axonframework.messaging;

import jakarta.annotation.Nonnull;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import org.axonframework.messaging.Message;
import org.axonframework.messaging.MessageStream;
import reactor.core.publisher.Flux;

/* loaded from: input_file:org/axonframework/messaging/CompletionCallbackMessageStream.class */
class CompletionCallbackMessageStream<M extends Message<?>> extends DelegatingMessageStream<M, M> {
    private final MessageStream<M> delegate;
    private final Runnable completeHandler;
    private final AtomicBoolean invoked;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/axonframework/messaging/CompletionCallbackMessageStream$Empty.class */
    public static class Empty<M extends Message<?>> extends CompletionCallbackMessageStream<M> implements MessageStream.Empty<M> {
        /* JADX INFO: Access modifiers changed from: package-private */
        public Empty(@Nonnull MessageStream.Empty<M> empty, @Nonnull Runnable runnable) {
            super(empty, runnable);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/axonframework/messaging/CompletionCallbackMessageStream$Single.class */
    public static class Single<M extends Message<?>> extends CompletionCallbackMessageStream<M> implements MessageStream.Single<M> {
        /* JADX INFO: Access modifiers changed from: package-private */
        public Single(@Nonnull MessageStream.Single<M> single, @Nonnull Runnable runnable) {
            super(single, runnable);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CompletionCallbackMessageStream(@Nonnull MessageStream<M> messageStream, @Nonnull Runnable runnable) {
        super(messageStream);
        this.invoked = new AtomicBoolean(false);
        this.delegate = messageStream;
        this.completeHandler = runnable;
        messageStream.onAvailable(this::invokceCompletionHandlerIfCompleted);
    }

    @Override // org.axonframework.messaging.MessageStream
    public Flux<MessageStream.Entry<M>> asFlux() {
        return this.delegate.asFlux().doOnComplete(this.completeHandler);
    }

    @Override // org.axonframework.messaging.MessageStream
    public Optional<MessageStream.Entry<M>> next() {
        Optional<MessageStream.Entry<M>> next = this.delegate.next();
        if (next.isEmpty()) {
            invokceCompletionHandlerIfCompleted();
        }
        return next;
    }

    @Override // org.axonframework.messaging.MessageStream
    public Optional<MessageStream.Entry<M>> peek() {
        Optional<MessageStream.Entry<M>> peek = this.delegate.peek();
        if (peek.isEmpty()) {
            invokceCompletionHandlerIfCompleted();
        }
        return peek;
    }

    private void invokceCompletionHandlerIfCompleted() {
        if (this.delegate.isCompleted() && this.delegate.error().isEmpty() && !this.invoked.getAndSet(true)) {
            this.completeHandler.run();
        }
    }

    @Override // org.axonframework.messaging.DelegatingMessageStream, org.axonframework.messaging.MessageStream
    public void onAvailable(@Nonnull Runnable runnable) {
        this.delegate.onAvailable(() -> {
            runnable.run();
            invokceCompletionHandlerIfCompleted();
        });
    }

    @Override // org.axonframework.messaging.DelegatingMessageStream, org.axonframework.messaging.MessageStream
    public Optional<Throwable> error() {
        invokceCompletionHandlerIfCompleted();
        return this.delegate.error();
    }

    @Override // org.axonframework.messaging.DelegatingMessageStream, org.axonframework.messaging.MessageStream
    public boolean hasNextAvailable() {
        boolean hasNextAvailable = this.delegate.hasNextAvailable();
        if (!hasNextAvailable && delegate().isCompleted()) {
            invokceCompletionHandlerIfCompleted();
        }
        return hasNextAvailable;
    }

    @Override // org.axonframework.messaging.MessageStream
    public <R> CompletableFuture<R> reduce(@Nonnull R r, @Nonnull BiFunction<R, MessageStream.Entry<M>, R> biFunction) {
        return this.delegate.reduce(r, biFunction).whenComplete((obj, th) -> {
            if (th == null) {
                this.completeHandler.run();
            }
        });
    }
}
