package org.axonframework.messaging;

import jakarta.annotation.Nonnull;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.axonframework.messaging.Message;
import org.axonframework.messaging.MessageStream;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/axonframework/messaging/OnErrorContinueMessageStream.class */
public class OnErrorContinueMessageStream<M extends Message<?>> implements MessageStream<M> {
    private final MessageStream<M> delegate;
    private final Function<Throwable, MessageStream<M>> onError;
    private final AtomicReference<MessageStream<M>> onErrorStream = new AtomicReference<>();
    private final AtomicReference<Runnable> callback = new AtomicReference<>(() -> {
    });

    /* JADX INFO: Access modifiers changed from: package-private */
    public OnErrorContinueMessageStream(@Nonnull MessageStream<M> messageStream, @Nonnull Function<Throwable, MessageStream<M>> function) {
        this.delegate = messageStream;
        this.onError = function;
    }

    @Override // org.axonframework.messaging.MessageStream
    public Optional<MessageStream.Entry<M>> next() {
        return resolveCurrentDelegate().next();
    }

    @Override // org.axonframework.messaging.MessageStream
    public void onAvailable(@Nonnull Runnable runnable) {
        resolveCurrentDelegate().onAvailable(runnable);
        this.callback.set(runnable);
    }

    @Override // org.axonframework.messaging.MessageStream
    public Optional<MessageStream.Entry<M>> peek() {
        return resolveCurrentDelegate().peek();
    }

    private MessageStream<M> resolveCurrentDelegate() {
        MessageStream<M> updateAndGet;
        if (!this.delegate.isCompleted() || this.delegate.error().isEmpty()) {
            return this.delegate;
        }
        if (this.onErrorStream.get() != null) {
            return this.onErrorStream.get();
        }
        synchronized (this) {
            updateAndGet = this.onErrorStream.updateAndGet(messageStream -> {
                return messageStream == null ? this.onError.apply(this.delegate.error().orElse(null)) : messageStream;
            });
            updateAndGet.onAvailable(this.callback.get());
        }
        return updateAndGet;
    }

    @Override // org.axonframework.messaging.MessageStream
    public Optional<Throwable> error() {
        return resolveCurrentDelegate().error();
    }

    @Override // org.axonframework.messaging.MessageStream
    public boolean isCompleted() {
        return resolveCurrentDelegate().isCompleted();
    }

    @Override // org.axonframework.messaging.MessageStream
    public boolean hasNextAvailable() {
        return resolveCurrentDelegate().hasNextAvailable();
    }

    @Override // org.axonframework.messaging.MessageStream
    public void close() {
        resolveCurrentDelegate().close();
    }
}
