package org.axonframework.axonserver.connector.event;

import io.axoniq.axonserver.connector.ResultStream;
import io.axoniq.axonserver.grpc.event.dcb.SequencedEvent;
import io.axoniq.axonserver.grpc.event.dcb.SourceEventsResponse;
import jakarta.annotation.Nonnull;
import java.lang.invoke.MethodHandles;
import java.util.Objects;
import java.util.Optional;
import org.axonframework.common.annotation.Internal;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.GlobalSequenceTrackingToken;
import org.axonframework.eventhandling.TerminalEventMessage;
import org.axonframework.eventhandling.TrackingToken;
import org.axonframework.eventsourcing.eventstore.ConsistencyMarker;
import org.axonframework.eventsourcing.eventstore.GlobalIndexConsistencyMarker;
import org.axonframework.messaging.Context;
import org.axonframework.messaging.MessageStream;
import org.axonframework.messaging.SimpleEntry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/axonframework/axonserver/connector/event/SourcingEventMessageStream.class */
public class SourcingEventMessageStream implements MessageStream<EventMessage<?>> {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private final ResultStream<SourceEventsResponse> stream;
    private final EventConverter converter;

    public SourcingEventMessageStream(@Nonnull ResultStream<SourceEventsResponse> resultStream, @Nonnull EventConverter eventConverter) {
        this.stream = (ResultStream) Objects.requireNonNull(resultStream, "The source result stream cannot be null.");
        this.converter = (EventConverter) Objects.requireNonNull(eventConverter, "The converter cannot be null.");
    }

    public Optional<MessageStream.Entry<EventMessage<?>>> next() {
        SourceEventsResponse sourceEventsResponse = (SourceEventsResponse) this.stream.nextIfAvailable();
        if (sourceEventsResponse == null) {
            logger.debug("Reached the end of the source result stream.");
            return Optional.empty();
        }
        if (!sourceEventsResponse.hasConsistencyMarker()) {
            return getConvertToEventEntry(sourceEventsResponse.getEvent());
        }
        logger.debug("Reached the consistency marker message of the source result stream.");
        return convertToMarkerEntry(sourceEventsResponse.getConsistencyMarker());
    }

    private Optional<MessageStream.Entry<EventMessage<?>>> getConvertToEventEntry(SequencedEvent sequencedEvent) {
        return Optional.of(new SimpleEntry(this.converter.convertEvent(sequencedEvent.getEvent()), Context.with(TrackingToken.RESOURCE_KEY, new GlobalSequenceTrackingToken(sequencedEvent.getSequence() + 1))));
    }

    private static Optional<MessageStream.Entry<EventMessage<?>>> convertToMarkerEntry(long j) {
        return Optional.of(new SimpleEntry(TerminalEventMessage.INSTANCE, ConsistencyMarker.addToContext(Context.empty(), new GlobalIndexConsistencyMarker(j))));
    }

    public void onAvailable(@Nonnull Runnable runnable) {
        this.stream.onAvailable(runnable);
    }

    public Optional<Throwable> error() {
        return this.stream.getError();
    }

    public boolean isCompleted() {
        return this.stream.isClosed();
    }

    public boolean hasNextAvailable() {
        return this.stream.peek() != null;
    }

    public void close() {
        this.stream.close();
    }
}
