package org.axonframework.axonserver.connector.event;

import io.axoniq.axonserver.connector.ResultStream;
import io.axoniq.axonserver.grpc.event.dcb.SequencedEvent;
import io.axoniq.axonserver.grpc.event.dcb.StreamEventsResponse;
import jakarta.annotation.Nonnull;
import java.lang.invoke.MethodHandles;
import java.util.Objects;
import java.util.Optional;
import org.axonframework.common.annotation.Internal;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.GlobalSequenceTrackingToken;
import org.axonframework.eventhandling.TrackingToken;
import org.axonframework.messaging.Context;
import org.axonframework.messaging.MessageStream;
import org.axonframework.messaging.SimpleEntry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/axonframework/axonserver/connector/event/StreamingEventMessageStream.class */
public class StreamingEventMessageStream implements MessageStream<EventMessage<?>> {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private final ResultStream<StreamEventsResponse> stream;
    private final EventConverter converter;

    public StreamingEventMessageStream(@Nonnull ResultStream<StreamEventsResponse> resultStream, @Nonnull EventConverter eventConverter) {
        this.stream = (ResultStream) Objects.requireNonNull(resultStream, "The result stream cannot be null.");
        this.converter = (EventConverter) Objects.requireNonNull(eventConverter, "The converter cannot be null.");
    }

    public Optional<MessageStream.Entry<EventMessage<?>>> next() {
        StreamEventsResponse streamEventsResponse = (StreamEventsResponse) this.stream.nextIfAvailable();
        if (streamEventsResponse != null) {
            return Optional.of(convertToEntry(streamEventsResponse.getEvent()));
        }
        logger.debug("There are no more events to stream at this moment in time.");
        return Optional.empty();
    }

    private SimpleEntry<EventMessage<?>> convertToEntry(SequencedEvent sequencedEvent) {
        return new SimpleEntry<>(this.converter.convertEvent(sequencedEvent.getEvent()), Context.with(TrackingToken.RESOURCE_KEY, new GlobalSequenceTrackingToken(sequencedEvent.getSequence() + 1)));
    }

    public void onAvailable(@Nonnull Runnable runnable) {
        this.stream.onAvailable(runnable);
    }

    public Optional<Throwable> error() {
        return this.stream.getError();
    }

    public boolean isCompleted() {
        return this.stream.isClosed();
    }

    public boolean hasNextAvailable() {
        return this.stream.peek() != null;
    }

    public void close() {
        this.stream.close();
    }
}
