package org.axonframework.eventstreaming;

import jakarta.annotation.Nonnull;
import java.util.Optional;
import org.assertj.core.api.Assertions;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.EventTestUtils;
import org.axonframework.eventhandling.GenericTrackedEventMessage;
import org.axonframework.eventhandling.GlobalSequenceTrackingToken;
import org.axonframework.eventhandling.TrackedEventMessage;
import org.axonframework.eventhandling.TrackingToken;
import org.axonframework.messaging.Context;
import org.axonframework.messaging.MessageStream;
import org.axonframework.messaging.StreamableMessageSource;
import org.axonframework.utils.InMemoryStreamableEventSource;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/axonframework/eventstreaming/LegacyStreamableEventSourceTest.class */
class LegacyStreamableEventSourceTest {
    private InMemoryStreamableEventSource legacyEventSource;
    private LegacyStreamableEventSource<TrackedEventMessage<?>> testSubject;

    @Nested
    /* loaded from: input_file:org/axonframework/eventstreaming/LegacyStreamableEventSourceTest$ConstructorTest.class */
    class ConstructorTest {
        ConstructorTest() {
        }

        @Test
        void doNotSupportEventCriteriaOtherThanAny() {
            Assertions.assertThatThrownBy(() -> {
                LegacyStreamableEventSourceTest.this.testSubject.open(StreamingCondition.conditionFor(LegacyStreamableEventSourceTest.firstToken(), EventCriteria.havingTags(new String[]{"tag1", "tag2"})));
            }).isInstanceOf(IllegalArgumentException.class).hasMessageContaining("Only AnyEvent criteria is supported in this legacy adapter, but received: TagFilteredEventCriteria[tags=[Tag[key=tag1, value=tag2]]]");
        }

        @Test
        void shouldThrowExceptionWithNullDelegate() {
            org.junit.jupiter.api.Assertions.assertThrows(NullPointerException.class, () -> {
                new LegacyStreamableEventSource((StreamableMessageSource) null);
            });
        }
    }

    @Nested
    /* loaded from: input_file:org/axonframework/eventstreaming/LegacyStreamableEventSourceTest$MessageStreamTest.class */
    class MessageStreamTest {
        private MessageStream<TrackedEventMessage<?>> messageStream;

        @Nested
        /* loaded from: input_file:org/axonframework/eventstreaming/LegacyStreamableEventSourceTest$MessageStreamTest$CompletedTest.class */
        class CompletedTest {
            CompletedTest() {
            }

            @Test
            void shouldNotConsiderCompletedIfMessagesAvailable() {
                LegacyStreamableEventSourceTest.this.legacyEventSource.publishMessage(LegacyStreamableEventSourceTest.this.trackedEventMessage("event-1", LegacyStreamableEventSourceTest.tokenAt(1L)));
                Assertions.assertThat(MessageStreamTest.this.messageStream.isCompleted()).isFalse();
            }

            @Test
            void shouldConsiderCompletedIfNoMessagesAvailable() {
                Assertions.assertThat(MessageStreamTest.this.messageStream.isCompleted()).isTrue();
            }
        }

        @Nested
        /* loaded from: input_file:org/axonframework/eventstreaming/LegacyStreamableEventSourceTest$MessageStreamTest$NextTest.class */
        class NextTest {
            NextTest() {
            }

            @Test
            void shouldReturnEmptyIfNoMessagesAvailable() {
                Assertions.assertThat(MessageStreamTest.this.messageStream.next()).isEmpty();
            }

            @Test
            void shouldReturnEventIfMessageAvailable() {
                TrackingToken trackingToken = LegacyStreamableEventSourceTest.tokenAt(1L);
                EventMessage<?> trackedEventMessage = LegacyStreamableEventSourceTest.this.trackedEventMessage("event-1", trackingToken);
                LegacyStreamableEventSourceTest.this.legacyEventSource.publishMessage(trackedEventMessage);
                Optional next = MessageStreamTest.this.messageStream.next();
                Assertions.assertThat(next).isPresent();
                MessageStreamTest.assertEvent(((MessageStream.Entry) next.get()).message(), trackedEventMessage);
                Assertions.assertThat(TrackingToken.fromContext((Context) next.get())).hasValue(trackingToken);
            }
        }

        @Nested
        /* loaded from: input_file:org/axonframework/eventstreaming/LegacyStreamableEventSourceTest$MessageStreamTest$PeekTest.class */
        class PeekTest {
            PeekTest() {
            }

            @Test
            void shouldReturnEmptyIfNoMessagesAvailable() {
                Assertions.assertThat(MessageStreamTest.this.messageStream.peek()).isEmpty();
            }

            @Test
            void shouldReturnEventIfMessageAvailable() {
                TrackingToken trackingToken = LegacyStreamableEventSourceTest.tokenAt(1L);
                EventMessage<?> trackedEventMessage = LegacyStreamableEventSourceTest.this.trackedEventMessage("event-1", trackingToken);
                LegacyStreamableEventSourceTest.this.legacyEventSource.publishMessage(trackedEventMessage);
                Optional peek = MessageStreamTest.this.messageStream.peek();
                Assertions.assertThat(peek).isPresent();
                MessageStreamTest.assertEvent(((MessageStream.Entry) peek.get()).message(), trackedEventMessage);
                Assertions.assertThat(TrackingToken.fromContext((Context) peek.get())).hasValue(trackingToken);
            }

            @Test
            void shouldNotAdvanceTeStream() {
                TrackingToken trackingToken = LegacyStreamableEventSourceTest.tokenAt(1L);
                TrackingToken trackingToken2 = LegacyStreamableEventSourceTest.tokenAt(1L);
                EventMessage<?> trackedEventMessage = LegacyStreamableEventSourceTest.this.trackedEventMessage("event-1", trackingToken);
                EventMessage<?> trackedEventMessage2 = LegacyStreamableEventSourceTest.this.trackedEventMessage("event-2", trackingToken2);
                LegacyStreamableEventSourceTest.this.legacyEventSource.publishMessage(trackedEventMessage);
                LegacyStreamableEventSourceTest.this.legacyEventSource.publishMessage(trackedEventMessage2);
                Optional next = MessageStreamTest.this.messageStream.next();
                Assertions.assertThat(next).isPresent();
                MessageStreamTest.assertEvent(((MessageStream.Entry) next.get()).message(), trackedEventMessage);
                Optional peek = MessageStreamTest.this.messageStream.peek();
                Optional peek2 = MessageStreamTest.this.messageStream.peek();
                Optional peek3 = MessageStreamTest.this.messageStream.peek();
                Assertions.assertThat(peek).isEqualTo(peek2);
                Assertions.assertThat(peek).isEqualTo(peek3);
            }
        }

        MessageStreamTest() {
        }

        @BeforeEach
        void beforeEach() {
            this.messageStream = LegacyStreamableEventSourceTest.this.testSubject.open(StreamingCondition.startingFrom(LegacyStreamableEventSourceTest.firstToken()));
        }

        @AfterEach
        void afterEach() {
            this.messageStream.close();
        }

        private static void assertEvent(EventMessage<?> eventMessage, EventMessage<?> eventMessage2) {
            org.junit.jupiter.api.Assertions.assertEquals(eventMessage2.getIdentifier(), eventMessage.getIdentifier());
            org.junit.jupiter.api.Assertions.assertEquals(eventMessage2.getPayload(), eventMessage.getPayload());
            org.junit.jupiter.api.Assertions.assertEquals(eventMessage2.getTimestamp(), eventMessage.getTimestamp());
            org.junit.jupiter.api.Assertions.assertEquals(eventMessage2.getMetaData(), eventMessage.getMetaData());
        }
    }

    @Nested
    /* loaded from: input_file:org/axonframework/eventstreaming/LegacyStreamableEventSourceTest$TrackingTokenTest.class */
    class TrackingTokenTest {
        TrackingTokenTest() {
        }

        @Test
        void firstTokenShouldBeSameAsInDelegate() {
            Assertions.assertThat((TrackingToken) LegacyStreamableEventSourceTest.this.testSubject.firstToken().join()).isEqualTo(LegacyStreamableEventSourceTest.this.legacyEventSource.createHeadToken());
        }

        @Test
        void latestTokenShouldBeSameAsInDelegate() {
            Assertions.assertThat((TrackingToken) LegacyStreamableEventSourceTest.this.testSubject.latestToken().join()).isEqualTo(LegacyStreamableEventSourceTest.this.legacyEventSource.createTailToken());
        }
    }

    LegacyStreamableEventSourceTest() {
    }

    @BeforeEach
    void beforeEach() {
        this.legacyEventSource = new InMemoryStreamableEventSource();
        this.testSubject = new LegacyStreamableEventSource<>(this.legacyEventSource);
    }

    private static GlobalSequenceTrackingToken firstToken() {
        return tokenAt(0L);
    }

    @Nonnull
    private static GlobalSequenceTrackingToken tokenAt(long j) {
        return new GlobalSequenceTrackingToken(j);
    }

    private TrackedEventMessage<?> trackedEventMessage(String str, TrackingToken trackingToken) {
        return new GenericTrackedEventMessage(trackingToken, EventTestUtils.asEventMessage(str));
    }
}
