package org.axonframework.axonserver.connector.event;

import io.axoniq.axonserver.connector.event.EventStream;
import io.axoniq.axonserver.grpc.event.Event;
import io.axoniq.axonserver.grpc.event.EventWithToken;
import jakarta.annotation.Nonnull;
import java.util.Optional;
import java.util.function.Function;
import org.axonframework.common.StringUtils;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.GlobalSequenceTrackingToken;
import org.axonframework.eventhandling.TrackingToken;
import org.axonframework.eventsourcing.eventstore.LegacyResources;
import org.axonframework.messaging.Context;
import org.axonframework.messaging.MessageStream;
import org.axonframework.messaging.SimpleEntry;

/* loaded from: input_file:org/axonframework/axonserver/connector/event/AxonServerMessageStream.class */
class AxonServerMessageStream implements MessageStream<EventMessage<?>> {
    private final EventStream stream;
    private final Function<Event, EventMessage<byte[]>> messageConverter;

    public AxonServerMessageStream(@Nonnull EventStream eventStream, @Nonnull Function<Event, EventMessage<byte[]>> function) {
        this.stream = eventStream;
        this.messageConverter = function;
    }

    public Optional<MessageStream.Entry<EventMessage<?>>> next() {
        EventWithToken eventWithToken = (EventWithToken) this.stream.nextIfAvailable();
        if (eventWithToken == null) {
            return Optional.empty();
        }
        Event event = eventWithToken.getEvent();
        EventMessage<byte[]> apply = this.messageConverter.apply(event);
        Context with = Context.with(TrackingToken.RESOURCE_KEY, new GlobalSequenceTrackingToken(eventWithToken.getToken()));
        if (StringUtils.nonEmptyOrNull(event.getAggregateIdentifier())) {
            with = with.withResource(LegacyResources.AGGREGATE_SEQUENCE_NUMBER_KEY, Long.valueOf(event.getAggregateSequenceNumber())).withResource(LegacyResources.AGGREGATE_TYPE_KEY, event.getAggregateType()).withResource(LegacyResources.AGGREGATE_IDENTIFIER_KEY, event.getAggregateIdentifier());
        }
        return Optional.of(new SimpleEntry(apply, with));
    }

    public void onAvailable(@Nonnull Runnable runnable) {
        this.stream.onAvailable(runnable);
    }

    public Optional<Throwable> error() {
        return this.stream.getError();
    }

    public boolean isCompleted() {
        return this.stream.isClosed();
    }

    public boolean hasNextAvailable() {
        return this.stream.peek() != null;
    }

    public void close() {
        this.stream.close();
    }
}
