package org.axonframework.eventstreaming;

import jakarta.annotation.Nonnull;
import java.time.Instant;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.axonframework.common.stream.BlockingStream;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.TrackedEventMessage;
import org.axonframework.eventhandling.TrackingToken;
import org.axonframework.messaging.Context;
import org.axonframework.messaging.Message;
import org.axonframework.messaging.MessageStream;
import org.axonframework.messaging.SimpleEntry;
import org.axonframework.messaging.StreamableMessageSource;

@Deprecated(since = "5.0.0")
/* loaded from: input_file:org/axonframework/eventstreaming/LegacyStreamableEventSource.class */
public class LegacyStreamableEventSource<E extends EventMessage<?>> implements StreamableEventSource<E> {
    private final StreamableMessageSource<E> delegate;

    /* loaded from: input_file:org/axonframework/eventstreaming/LegacyStreamableEventSource$BlockingMessageStream.class */
    private static class BlockingMessageStream<E extends EventMessage<?>> implements MessageStream<E> {
        private final BlockingStream<E> stream;

        BlockingMessageStream(BlockingStream<E> blockingStream, EventCriteria eventCriteria) {
            this.stream = blockingStream;
            if (eventCriteria != AnyEvent.INSTANCE) {
                throw new IllegalArgumentException("Only AnyEvent criteria is supported in this legacy adapter, but received: " + String.valueOf(eventCriteria));
            }
        }

        @Override // org.axonframework.messaging.MessageStream
        public Optional<MessageStream.Entry<E>> next() {
            try {
                return !this.stream.hasNextAvailable() ? Optional.empty() : Optional.ofNullable(this.stream.nextAvailable()).map(this::createEntryForMessage);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return Optional.empty();
            }
        }

        @Override // org.axonframework.messaging.MessageStream
        public Optional<MessageStream.Entry<E>> peek() {
            return (Optional<MessageStream.Entry<E>>) this.stream.peek().map(this::createEntryForMessage);
        }

        @Override // org.axonframework.messaging.MessageStream
        public void onAvailable(@Nonnull Runnable runnable) {
            this.stream.setOnAvailableCallback(runnable);
        }

        @Override // org.axonframework.messaging.MessageStream
        public Optional<Throwable> error() {
            return Optional.empty();
        }

        @Override // org.axonframework.messaging.MessageStream
        public boolean isCompleted() {
            return !this.stream.hasNextAvailable() && this.stream.peek().isEmpty();
        }

        @Override // org.axonframework.messaging.MessageStream
        public boolean hasNextAvailable() {
            return this.stream.hasNextAvailable();
        }

        @Override // org.axonframework.messaging.MessageStream
        public void close() {
            this.stream.close();
        }

        private MessageStream.Entry<E> createEntryForMessage(E e) {
            Context empty = Context.empty();
            if (e instanceof TrackedEventMessage) {
                empty = TrackingToken.addToContext(empty, ((TrackedEventMessage) e).trackingToken());
            }
            return new SimpleEntry(e, empty.withResource(Message.RESOURCE_KEY, e));
        }
    }

    public LegacyStreamableEventSource(@Nonnull StreamableMessageSource<E> streamableMessageSource) {
        Objects.requireNonNull(streamableMessageSource, "Delegate is required");
        this.delegate = streamableMessageSource;
    }

    @Override // org.axonframework.eventstreaming.StreamableEventSource
    public MessageStream<E> open(@Nonnull StreamingCondition streamingCondition) {
        return new BlockingMessageStream(this.delegate.openStream2(streamingCondition.position()), streamingCondition.criteria());
    }

    @Override // org.axonframework.eventstreaming.TrackingTokenSource
    public CompletableFuture<TrackingToken> firstToken() {
        return this.delegate.firstToken();
    }

    @Override // org.axonframework.eventstreaming.TrackingTokenSource
    public CompletableFuture<TrackingToken> latestToken() {
        return this.delegate.latestToken();
    }

    @Override // org.axonframework.eventstreaming.TrackingTokenSource
    public CompletableFuture<TrackingToken> tokenAt(@Nonnull Instant instant) {
        return this.delegate.tokenAt(instant);
    }
}
