package org.axonframework.messaging;

import jakarta.annotation.Nonnull;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.axonframework.messaging.Message;
import org.axonframework.messaging.MessageStream;

/* loaded from: input_file:org/axonframework/messaging/QueueMessageStream.class */
public class QueueMessageStream<M extends Message<?>> implements MessageStream<M> {
    private static final Runnable NO_OP_CALLBACK = () -> {
    };
    private final BlockingQueue<MessageStream.Entry<M>> queue;
    private final AtomicReference<Runnable> onAvailableCallbackRef;
    private final AtomicBoolean completed;
    private final AtomicReference<Throwable> errorRef;
    private final AtomicReference<Runnable> onConsumeCallback;
    private final AtomicBoolean closed;

    public QueueMessageStream() {
        this(new LinkedBlockingQueue());
    }

    public QueueMessageStream(@Nonnull BlockingQueue<MessageStream.Entry<M>> blockingQueue) {
        this.onAvailableCallbackRef = new AtomicReference<>(NO_OP_CALLBACK);
        this.completed = new AtomicBoolean(false);
        this.errorRef = new AtomicReference<>();
        this.onConsumeCallback = new AtomicReference<>(NO_OP_CALLBACK);
        this.closed = new AtomicBoolean(false);
        this.queue = blockingQueue;
    }

    public boolean offer(@Nonnull M m, @Nonnull Context context) {
        if (!this.queue.offer(new SimpleEntry(m, context))) {
            return false;
        }
        this.onAvailableCallbackRef.get().run();
        return true;
    }

    public void complete() {
        this.completed.set(true);
        this.onAvailableCallbackRef.get().run();
    }

    public void completeExceptionally(@Nonnull Throwable th) {
        this.errorRef.set(th);
        this.completed.set(true);
        this.onAvailableCallbackRef.get().run();
    }

    public void onConsumeCallback(@Nonnull Runnable runnable) {
        this.onConsumeCallback.set(runnable);
    }

    public boolean isClosed() {
        return this.closed.get();
    }

    @Override // org.axonframework.messaging.MessageStream
    public Optional<MessageStream.Entry<M>> next() {
        MessageStream.Entry<M> poll = this.queue.poll();
        if (poll != null) {
            this.onConsumeCallback.get().run();
        }
        return Optional.ofNullable(poll);
    }

    @Override // org.axonframework.messaging.MessageStream
    public void onAvailable(@Nonnull Runnable runnable) {
        this.onAvailableCallbackRef.set(runnable);
        if (!this.queue.isEmpty() || isCompleted()) {
            runnable.run();
        }
    }

    @Override // org.axonframework.messaging.MessageStream
    public Optional<Throwable> error() {
        return Optional.ofNullable(this.errorRef.get());
    }

    @Override // org.axonframework.messaging.MessageStream
    public boolean isCompleted() {
        return this.queue.isEmpty() && this.completed.get();
    }

    @Override // org.axonframework.messaging.MessageStream
    public boolean hasNextAvailable() {
        return !this.queue.isEmpty();
    }

    @Override // org.axonframework.messaging.MessageStream
    public void close() {
        this.closed.set(true);
        this.onConsumeCallback.get().run();
    }

    @Override // org.axonframework.messaging.MessageStream
    public Optional<MessageStream.Entry<M>> peek() {
        return Optional.ofNullable(this.queue.peek());
    }
}
