package org.axonframework.utils;

import jakarta.annotation.Nonnull;
import java.time.Instant;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.GlobalSequenceTrackingToken;
import org.axonframework.eventhandling.TrackingToken;
import org.axonframework.eventstreaming.StreamableEventSource;
import org.axonframework.eventstreaming.StreamingCondition;
import org.axonframework.eventstreaming.Tag;
import org.axonframework.messaging.Context;
import org.axonframework.messaging.MessageStream;
import org.axonframework.messaging.SimpleEntry;

/* loaded from: input_file:org/axonframework/utils/AsyncInMemoryStreamableEventSource.class */
public class AsyncInMemoryStreamableEventSource implements StreamableEventSource<EventMessage<?>> {
    private static final String FAIL_PAYLOAD = "FAIL";
    public static final EventMessage<String> FAIL_EVENT = org.axonframework.eventhandling.EventTestUtils.asEventMessage(FAIL_PAYLOAD);
    private volatile NavigableMap<Long, EventMessage<?>> eventStorage;
    private final AtomicLong nextIndex;
    private final boolean streamCallbackSupported;
    private final List<EventMessage<?>> ignoredEvents;
    private final Set<AsyncMessageStream> openStreams;

    /* loaded from: input_file:org/axonframework/utils/AsyncInMemoryStreamableEventSource$AsyncMessageStream.class */
    private class AsyncMessageStream implements MessageStream<EventMessage<?>> {
        private final AtomicLong currentPosition;
        private final StreamingCondition condition;
        private final AtomicReference<Runnable> callback = new AtomicReference<>(() -> {
        });
        private volatile boolean closed = false;

        public AsyncMessageStream(StreamingCondition streamingCondition) {
            this.condition = streamingCondition;
            if (streamingCondition == null) {
                this.currentPosition = new AtomicLong(0L);
                return;
            }
            TrackingToken position = streamingCondition.position();
            if (position == null) {
                this.currentPosition = new AtomicLong(0L);
            } else {
                this.currentPosition = new AtomicLong(position.position().orElse(-1L) + 1);
            }
        }

        public Optional<MessageStream.Entry<EventMessage<?>>> next() {
            if (this.closed) {
                return Optional.empty();
            }
            while (true) {
                long j = this.currentPosition.get();
                EventMessage<?> eventMessage = (EventMessage) AsyncInMemoryStreamableEventSource.this.eventStorage.get(Long.valueOf(j));
                if (eventMessage == null) {
                    return Optional.empty();
                }
                this.currentPosition.incrementAndGet();
                Context addToContext = Tag.addToContext(TrackingToken.addToContext(Context.empty(), new GlobalSequenceTrackingToken(j + 1)), Collections.emptySet());
                if (AsyncInMemoryStreamableEventSource.FAIL_PAYLOAD.equals(eventMessage.getPayload())) {
                    throw new IllegalStateException("Cannot retrieve event at position [" + j + "].");
                }
                if (AsyncInMemoryStreamableEventSource.matches(eventMessage, this.condition)) {
                    return Optional.of(new SimpleEntry(eventMessage, addToContext));
                }
                AsyncInMemoryStreamableEventSource.this.ignoredEvents.add(eventMessage);
            }
        }

        public Optional<MessageStream.Entry<EventMessage<?>>> peek() {
            if (this.closed) {
                return Optional.empty();
            }
            long j = this.currentPosition.get();
            EventMessage eventMessage = (EventMessage) AsyncInMemoryStreamableEventSource.this.eventStorage.get(Long.valueOf(j));
            if (eventMessage == null) {
                return Optional.empty();
            }
            Context addToContext = Tag.addToContext(TrackingToken.addToContext(Context.empty(), new GlobalSequenceTrackingToken(j + 1)), Collections.emptySet());
            if (AsyncInMemoryStreamableEventSource.FAIL_PAYLOAD.equals(eventMessage.getPayload())) {
                throw new IllegalStateException("Cannot retrieve event at position [" + j + "].");
            }
            if (AsyncInMemoryStreamableEventSource.matches(eventMessage, this.condition)) {
                return Optional.of(new SimpleEntry(eventMessage, addToContext));
            }
            long j2 = j;
            while (true) {
                long j3 = j2 + 1;
                EventMessage eventMessage2 = (EventMessage) AsyncInMemoryStreamableEventSource.this.eventStorage.get(Long.valueOf(j3));
                if (eventMessage2 == null) {
                    return Optional.empty();
                }
                if (AsyncInMemoryStreamableEventSource.FAIL_PAYLOAD.equals(eventMessage2.getPayload())) {
                    throw new IllegalStateException("Cannot retrieve event at position [" + j3 + "].");
                }
                if (AsyncInMemoryStreamableEventSource.matches(eventMessage2, this.condition)) {
                    return Optional.of(new SimpleEntry(eventMessage2, Tag.addToContext(TrackingToken.addToContext(Context.empty(), new GlobalSequenceTrackingToken(j3 + 1)), Collections.emptySet())));
                }
                j2 = j3;
            }
        }

        public void onAvailable(@Nonnull Runnable runnable) {
            this.callback.set(runnable);
            if (AsyncInMemoryStreamableEventSource.this.streamCallbackSupported && hasNextAvailable()) {
                runnable.run();
            }
        }

        public Optional<Throwable> error() {
            return Optional.empty();
        }

        public boolean isCompleted() {
            return this.closed;
        }

        public boolean hasNextAvailable() {
            if (this.closed) {
                return false;
            }
            return AsyncInMemoryStreamableEventSource.this.eventStorage.containsKey(Long.valueOf(this.currentPosition.get()));
        }

        public void close() {
            this.closed = true;
            AsyncInMemoryStreamableEventSource.this.openStreams.remove(this);
            AsyncInMemoryStreamableEventSource.this.clearAllMessages();
        }

        public void notifyEventAvailable() {
            Runnable runnable;
            if (this.closed || (runnable = this.callback.get()) == null) {
                return;
            }
            runnable.run();
        }

        public void runCallback() {
            Runnable runnable;
            if (this.closed || (runnable = this.callback.get()) == null) {
                return;
            }
            runnable.run();
        }
    }

    public AsyncInMemoryStreamableEventSource() {
        this(false);
    }

    public AsyncInMemoryStreamableEventSource(boolean z) {
        this.eventStorage = new ConcurrentSkipListMap();
        this.nextIndex = new AtomicLong(0L);
        this.ignoredEvents = new CopyOnWriteArrayList();
        this.openStreams = new CopyOnWriteArraySet();
        this.streamCallbackSupported = z;
    }

    public MessageStream<EventMessage<?>> open(@Nonnull StreamingCondition streamingCondition) {
        AsyncMessageStream asyncMessageStream = new AsyncMessageStream(streamingCondition);
        this.openStreams.add(asyncMessageStream);
        return asyncMessageStream;
    }

    public CompletableFuture<TrackingToken> latestToken() {
        return CompletableFuture.completedFuture(null);
    }

    public CompletableFuture<TrackingToken> firstToken() {
        return CompletableFuture.completedFuture(this.eventStorage.isEmpty() ? null : new GlobalSequenceTrackingToken(this.eventStorage.lastKey().longValue() + 1));
    }

    public CompletableFuture<TrackingToken> tokenAt(@Nonnull Instant instant) {
        return (CompletableFuture) this.eventStorage.entrySet().stream().filter(entry -> {
            Instant timestamp = ((EventMessage) entry.getValue()).getTimestamp();
            return timestamp.equals(instant) || timestamp.isAfter(instant);
        }).map((v0) -> {
            return v0.getKey();
        }).min(Comparator.comparingLong((v0) -> {
            return v0.longValue();
        })).map(l -> {
            return Long.valueOf(l.longValue() - 1);
        }).map((v1) -> {
            return new GlobalSequenceTrackingToken(v1);
        }).map(globalSequenceTrackingToken -> {
            return globalSequenceTrackingToken;
        }).map((v0) -> {
            return CompletableFuture.completedFuture(v0);
        }).orElseGet(this::firstToken);
    }

    public synchronized void publishMessage(EventMessage<?> eventMessage) {
        this.eventStorage.put(Long.valueOf(this.nextIndex.getAndIncrement()), eventMessage);
        this.openStreams.forEach((v0) -> {
            v0.notifyEventAvailable();
        });
    }

    public List<EventMessage<?>> getIgnoredEvents() {
        return Collections.unmodifiableList(this.ignoredEvents);
    }

    public void runOnAvailableCallback() {
        this.openStreams.forEach((v0) -> {
            v0.runCallback();
        });
    }

    private synchronized void clearAllMessages() {
        this.eventStorage = new ConcurrentSkipListMap();
        this.nextIndex.set(0L);
    }

    private static boolean matches(EventMessage<?> eventMessage, StreamingCondition streamingCondition) {
        if (streamingCondition == null || FAIL_PAYLOAD.equals(eventMessage.getPayload())) {
            return true;
        }
        return streamingCondition.matches(eventMessage.type().qualifiedName(), Collections.emptySet());
    }
}
