package org.axonframework.utils;

import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.axonframework.common.stream.BlockingStream;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.GlobalSequenceTrackingToken;
import org.axonframework.eventhandling.TrackedEventMessage;
import org.axonframework.eventhandling.TrackingToken;
import org.axonframework.eventstreaming.StreamingCondition;
import org.axonframework.messaging.Context;
import org.axonframework.messaging.MessageStream;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/axonframework/utils/AsyncInMemoryStreamableEventSourceTest.class */
class AsyncInMemoryStreamableEventSourceTest {

    @Nested
    /* loaded from: input_file:org/axonframework/utils/AsyncInMemoryStreamableEventSourceTest$TrackingTokenHandling.class */
    class TrackingTokenHandling {
        TrackingTokenHandling(AsyncInMemoryStreamableEventSourceTest asyncInMemoryStreamableEventSourceTest) {
        }

        @DisplayName("Starting from null token reads all events from beginning")
        @Test
        void nullTrackingTokenStartsFromBeginning() {
            AsyncInMemoryStreamableEventSource asyncInMemoryStreamableEventSource = new AsyncInMemoryStreamableEventSource();
            EventMessage<?> asEventMessage = org.axonframework.eventhandling.EventTestUtils.asEventMessage("Event 1");
            EventMessage<?> asEventMessage2 = org.axonframework.eventhandling.EventTestUtils.asEventMessage("Event 2");
            asyncInMemoryStreamableEventSource.publishMessage(asEventMessage);
            asyncInMemoryStreamableEventSource.publishMessage(asEventMessage2);
            MessageStream<EventMessage<?>> open = asyncInMemoryStreamableEventSource.open(StreamingCondition.startingFrom((TrackingToken) null));
            Optional next = open.next();
            Assertions.assertTrue(next.isPresent());
            Assertions.assertEquals("Event 1", ((MessageStream.Entry) next.get()).message().getPayload());
            Assertions.assertEquals(1L, ((TrackingToken) TrackingToken.fromContext((Context) next.get()).orElseThrow()).position().orElse(-1L));
            Optional next2 = open.next();
            Assertions.assertTrue(next2.isPresent());
            Assertions.assertEquals("Event 2", ((MessageStream.Entry) next2.get()).message().getPayload());
            Assertions.assertEquals(2L, ((TrackingToken) TrackingToken.fromContext((Context) next2.get()).orElseThrow()).position().orElse(-1L));
        }

        @DisplayName("Starting from token beyond available events returns no events")
        @Test
        void trackingTokenBeyondAvailableEvents() {
            AsyncInMemoryStreamableEventSource asyncInMemoryStreamableEventSource = new AsyncInMemoryStreamableEventSource();
            asyncInMemoryStreamableEventSource.publishMessage(org.axonframework.eventhandling.EventTestUtils.asEventMessage("Event 1"));
            MessageStream<EventMessage<?>> open = asyncInMemoryStreamableEventSource.open(StreamingCondition.startingFrom(new GlobalSequenceTrackingToken(5L)));
            Assertions.assertFalse(open.hasNextAvailable(), "Stream should have no events when starting beyond available events");
            Assertions.assertTrue(open.next().isEmpty(), "next() should return empty Optional");
        }
    }

    AsyncInMemoryStreamableEventSourceTest() {
    }

    @DisplayName("Both implementations handle normal event flow identically")
    @Test
    void normalEventFlowCompatibility() throws ExecutionException, InterruptedException {
        EventMessage<?> asEventMessage = org.axonframework.eventhandling.EventTestUtils.asEventMessage("Test Event 1");
        EventMessage<?> asEventMessage2 = org.axonframework.eventhandling.EventTestUtils.asEventMessage("Test Event 2");
        AsyncInMemoryStreamableEventSource asyncInMemoryStreamableEventSource = new AsyncInMemoryStreamableEventSource();
        InMemoryStreamableEventSource inMemoryStreamableEventSource = new InMemoryStreamableEventSource();
        asyncInMemoryStreamableEventSource.publishMessage(asEventMessage);
        asyncInMemoryStreamableEventSource.publishMessage(asEventMessage2);
        inMemoryStreamableEventSource.publishMessage(asEventMessage);
        inMemoryStreamableEventSource.publishMessage(asEventMessage2);
        Assertions.assertEquals(inMemoryStreamableEventSource.createHeadToken().position(), asyncInMemoryStreamableEventSource.firstToken().get().position());
        MessageStream<EventMessage<?>> open = asyncInMemoryStreamableEventSource.open(StreamingCondition.startingFrom((TrackingToken) null));
        BlockingStream<TrackedEventMessage<?>> openStream = inMemoryStreamableEventSource.openStream(null);
        try {
            Assertions.assertTrue(open.hasNextAvailable());
            Assertions.assertTrue(openStream.hasNextAvailable(0, TimeUnit.MILLISECONDS));
            MessageStream.Entry entry = (MessageStream.Entry) open.next().orElseThrow();
            TrackedEventMessage trackedEventMessage = (TrackedEventMessage) openStream.nextAvailable();
            Assertions.assertEquals(trackedEventMessage.getPayload(), entry.message().getPayload());
            Assertions.assertEquals(trackedEventMessage.trackingToken().position(), ((TrackingToken) TrackingToken.fromContext(entry).orElseThrow()).position());
            MessageStream.Entry entry2 = (MessageStream.Entry) open.next().orElseThrow();
            TrackedEventMessage trackedEventMessage2 = (TrackedEventMessage) openStream.nextAvailable();
            Assertions.assertEquals(trackedEventMessage2.getPayload(), entry2.message().getPayload());
            Assertions.assertEquals(trackedEventMessage2.trackingToken().position(), ((TrackingToken) TrackingToken.fromContext(entry2).orElseThrow()).position());
            Assertions.assertFalse(open.hasNextAvailable());
            Assertions.assertFalse(openStream.hasNextAvailable(0, TimeUnit.MILLISECONDS));
            if (openStream != null) {
                openStream.close();
            }
        } catch (Throwable th) {
            if (openStream != null) {
                try {
                    openStream.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @DisplayName("Both implementations handle FAIL_EVENT identically")
    @Test
    void failEventCompatibility() {
        AsyncInMemoryStreamableEventSource asyncInMemoryStreamableEventSource = new AsyncInMemoryStreamableEventSource();
        InMemoryStreamableEventSource inMemoryStreamableEventSource = new InMemoryStreamableEventSource();
        asyncInMemoryStreamableEventSource.publishMessage(AsyncInMemoryStreamableEventSource.FAIL_EVENT);
        inMemoryStreamableEventSource.publishMessage(InMemoryStreamableEventSource.FAIL_EVENT);
        MessageStream<EventMessage<?>> open = asyncInMemoryStreamableEventSource.open(StreamingCondition.startingFrom((TrackingToken) null));
        Objects.requireNonNull(open);
        Assertions.assertTrue(((IllegalStateException) Assertions.assertThrows(IllegalStateException.class, open::next)).getMessage().contains("Cannot retrieve event at position [0]"));
        BlockingStream<TrackedEventMessage<?>> openStream = inMemoryStreamableEventSource.openStream(null);
        try {
            Objects.requireNonNull(openStream);
            Assertions.assertTrue(((IllegalStateException) Assertions.assertThrows(IllegalStateException.class, openStream::nextAvailable)).getMessage().contains("Cannot retrieve event at position"));
            if (openStream != null) {
                openStream.close();
            }
        } catch (Throwable th) {
            if (openStream != null) {
                try {
                    openStream.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @DisplayName("Both implementations handle destructive close behavior identically")
    @Test
    void destructiveCloseBehaviorCompatibility() throws ExecutionException, InterruptedException {
        AsyncInMemoryStreamableEventSource asyncInMemoryStreamableEventSource = new AsyncInMemoryStreamableEventSource();
        InMemoryStreamableEventSource inMemoryStreamableEventSource = new InMemoryStreamableEventSource();
        EventMessage<?> asEventMessage = org.axonframework.eventhandling.EventTestUtils.asEventMessage("Test Event");
        asyncInMemoryStreamableEventSource.publishMessage(asEventMessage);
        inMemoryStreamableEventSource.publishMessage(asEventMessage);
        Assertions.assertNotNull(asyncInMemoryStreamableEventSource.firstToken().get());
        Assertions.assertNotNull(inMemoryStreamableEventSource.createHeadToken());
        StreamingCondition startingFrom = StreamingCondition.startingFrom((TrackingToken) null);
        asyncInMemoryStreamableEventSource.open(startingFrom).close();
        BlockingStream<TrackedEventMessage<?>> openStream = inMemoryStreamableEventSource.openStream(null);
        if (openStream != null) {
            openStream.close();
        }
        Assertions.assertNull(asyncInMemoryStreamableEventSource.firstToken().get());
        Assertions.assertNull(inMemoryStreamableEventSource.createHeadToken());
        Assertions.assertFalse(asyncInMemoryStreamableEventSource.open(startingFrom).hasNextAvailable());
        BlockingStream<TrackedEventMessage<?>> openStream2 = inMemoryStreamableEventSource.openStream(null);
        try {
            Assertions.assertFalse(openStream2.hasNextAvailable(0, TimeUnit.MILLISECONDS));
            if (openStream2 != null) {
                openStream2.close();
            }
        } catch (Throwable th) {
            if (openStream2 != null) {
                try {
                    openStream2.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }
}
