package org.axonframework.integrationtests.eventhandling;

import java.time.Duration;
import java.time.Instant;
import java.util.Comparator;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.axonframework.common.stream.BlockingStream;
import org.axonframework.eventhandling.DomainEventMessage;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.GenericDomainEventMessage;
import org.axonframework.eventhandling.GenericEventMessage;
import org.axonframework.eventhandling.GenericTrackedEventMessage;
import org.axonframework.eventhandling.GlobalSequenceTrackingToken;
import org.axonframework.eventhandling.MultiSourceTrackingToken;
import org.axonframework.eventhandling.MultiStreamableMessageSource;
import org.axonframework.eventhandling.TrackedEventMessage;
import org.axonframework.eventhandling.TrackingToken;
import org.axonframework.eventsourcing.eventstore.EmbeddedEventStore;
import org.axonframework.eventsourcing.eventstore.inmemory.InMemoryEventStorageEngine;
import org.axonframework.messaging.StreamableMessageSource;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/axonframework/integrationtests/eventhandling/MultiStreamableMessageSourceTest.class */
class MultiStreamableMessageSourceTest {
    private MultiStreamableMessageSource testSubject;
    private EmbeddedEventStore eventStoreA;
    private EmbeddedEventStore eventStoreB;

    /* loaded from: input_file:org/axonframework/integrationtests/eventhandling/MultiStreamableMessageSourceTest$CallbackSupportingBlockingStream.class */
    private static class CallbackSupportingBlockingStream implements BlockingStream<TrackedEventMessage<?>> {
        private Runnable callback;

        private CallbackSupportingBlockingStream() {
        }

        public Optional<TrackedEventMessage<?>> peek() {
            throw new UnsupportedOperationException();
        }

        public boolean hasNextAvailable(int i, TimeUnit timeUnit) {
            throw new UnsupportedOperationException();
        }

        /* renamed from: nextAvailable, reason: merged with bridge method [inline-methods] */
        public TrackedEventMessage<?> m3nextAvailable() throws InterruptedException {
            throw new UnsupportedOperationException();
        }

        public void close() {
            throw new UnsupportedOperationException();
        }

        public boolean setOnAvailableCallback(Runnable runnable) {
            this.callback = runnable;
            return true;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void invokeCallback() {
            this.callback.run();
        }
    }

    MultiStreamableMessageSourceTest() {
    }

    @BeforeEach
    void setUp() {
        this.eventStoreA = EmbeddedEventStore.builder().storageEngine(new InMemoryEventStorageEngine()).build();
        this.eventStoreB = EmbeddedEventStore.builder().storageEngine(new InMemoryEventStorageEngine()).build();
        this.testSubject = MultiStreamableMessageSource.builder().addMessageSource("eventStoreA", this.eventStoreA).addMessageSource("eventStoreB", this.eventStoreB).longPollingSource("eventStoreA").build();
    }

    @Test
    void simplePublishAndConsume() throws InterruptedException {
        EventMessage asEventMessage = GenericEventMessage.asEventMessage("Event1");
        this.eventStoreA.publish(new EventMessage[]{asEventMessage});
        MultiStreamableMessageSource.MultiSourceBlockingStream openStream = this.testSubject.openStream(this.testSubject.createTailToken());
        Assertions.assertTrue(openStream.hasNextAvailable());
        Assertions.assertEquals(asEventMessage.getPayload(), ((TrackedEventMessage) openStream.nextAvailable()).getPayload());
        openStream.close();
    }

    @Test
    void connectionsAreClosedWhenOpeningFails() {
        StreamableMessageSource streamableMessageSource = (StreamableMessageSource) Mockito.mock(StreamableMessageSource.class);
        StreamableMessageSource streamableMessageSource2 = (StreamableMessageSource) Mockito.mock(StreamableMessageSource.class);
        this.testSubject = MultiStreamableMessageSource.builder().addMessageSource("source1", streamableMessageSource).addMessageSource("source2", streamableMessageSource2).build();
        BlockingStream blockingStream = (BlockingStream) Mockito.mock(BlockingStream.class);
        Mockito.when(streamableMessageSource.openStream((TrackingToken) Mockito.any())).thenReturn(blockingStream);
        Mockito.when(streamableMessageSource2.openStream((TrackingToken) Mockito.any())).thenThrow(new Throwable[]{new RuntimeException()});
        Assertions.assertThrows(RuntimeException.class, () -> {
            this.testSubject.openStream((TrackingToken) null);
        });
        ((BlockingStream) Mockito.verify(blockingStream)).close();
        ((StreamableMessageSource) Mockito.verify(streamableMessageSource)).openStream((TrackingToken) null);
        ((StreamableMessageSource) Mockito.verify(streamableMessageSource2)).openStream((TrackingToken) null);
    }

    @Test
    void simplePublishAndConsumeDomainEventMessage() throws InterruptedException {
        EventMessage genericDomainEventMessage = new GenericDomainEventMessage("Aggregate", "id", 0L, "Event1");
        this.eventStoreA.publish(new EventMessage[]{genericDomainEventMessage});
        MultiStreamableMessageSource.MultiSourceBlockingStream openStream = this.testSubject.openStream(this.testSubject.createTailToken());
        Assertions.assertTrue(openStream.hasNextAvailable());
        TrackedEventMessage trackedEventMessage = (TrackedEventMessage) openStream.nextAvailable();
        Assertions.assertEquals(genericDomainEventMessage.getPayload(), trackedEventMessage.getPayload());
        Assertions.assertTrue(trackedEventMessage instanceof DomainEventMessage);
        openStream.close();
    }

    @Test
    void peekingLastMessageKeepsItAvailable() throws InterruptedException {
        this.eventStoreA.publish(new EventMessage[]{GenericEventMessage.asEventMessage("Event1")});
        MultiStreamableMessageSource.MultiSourceBlockingStream openStream = this.testSubject.openStream((TrackingToken) null);
        Assertions.assertEquals("Event1", openStream.peek().map((v0) -> {
            return v0.getPayload();
        }).map((v0) -> {
            return v0.toString();
        }).orElse("None"));
        Assertions.assertTrue(openStream.hasNextAvailable());
        Assertions.assertTrue(openStream.hasNextAvailable(10, TimeUnit.SECONDS));
    }

    @Test
    void openStreamWithWrongToken() {
        Assertions.assertThrows(IllegalArgumentException.class, () -> {
            this.testSubject.openStream(new GlobalSequenceTrackingToken(0L));
        });
    }

    @Test
    void openStreamWithNullTokenReturnsFirstEvent() throws InterruptedException {
        EventMessage asEventMessage = GenericEventMessage.asEventMessage("Event1");
        this.eventStoreA.publish(new EventMessage[]{asEventMessage});
        MultiStreamableMessageSource.MultiSourceBlockingStream openStream = this.testSubject.openStream((TrackingToken) null);
        Assertions.assertNotNull(openStream);
        TrackedEventMessage trackedEventMessage = (TrackedEventMessage) openStream.nextAvailable();
        Assertions.assertEquals(asEventMessage.getIdentifier(), trackedEventMessage.getIdentifier());
        Assertions.assertEquals(asEventMessage.getPayload(), trackedEventMessage.getPayload());
    }

    @Test
    void longPoll() throws InterruptedException {
        MultiStreamableMessageSource.MultiSourceBlockingStream openStream = this.testSubject.openStream(this.testSubject.createTokenAt(Instant.now()));
        long currentTimeMillis = System.currentTimeMillis();
        Assertions.assertFalse(openStream.hasNextAvailable(100, TimeUnit.MILLISECONDS));
        long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
        Assertions.assertTrue(currentTimeMillis2 > 80, "Poll time too short: " + currentTimeMillis2 + "ms");
        Assertions.assertTrue(currentTimeMillis2 < 120, "Poll time too long: " + currentTimeMillis2 + "ms");
        openStream.close();
    }

    @Test
    void longPollMessageImmediatelyAvailable() throws InterruptedException {
        MultiStreamableMessageSource.MultiSourceBlockingStream openStream = this.testSubject.openStream(this.testSubject.createTokenAt(Instant.now()));
        this.eventStoreB.publish(new EventMessage[]{GenericEventMessage.asEventMessage("Event1")});
        long currentTimeMillis = System.currentTimeMillis();
        boolean hasNextAvailable = openStream.hasNextAvailable(100, TimeUnit.MILLISECONDS);
        long currentTimeMillis2 = System.currentTimeMillis();
        Assertions.assertTrue(hasNextAvailable);
        Assertions.assertTrue(currentTimeMillis2 - currentTimeMillis < 10);
        openStream.close();
    }

    @Test
    void multiPublishAndConsume() throws InterruptedException {
        EventMessage asEventMessage = GenericEventMessage.asEventMessage("Event1");
        this.eventStoreA.publish(new EventMessage[]{asEventMessage});
        Thread.sleep(20L);
        EventMessage asEventMessage2 = GenericEventMessage.asEventMessage("Event2");
        this.eventStoreB.publish(new EventMessage[]{asEventMessage2});
        MultiStreamableMessageSource.MultiSourceBlockingStream openStream = this.testSubject.openStream(this.testSubject.createTokenAt(recentTimeStamp()));
        Assertions.assertTrue(openStream.hasNextAvailable());
        Assertions.assertEquals(asEventMessage.getPayload(), ((TrackedEventMessage) openStream.nextAvailable()).getPayload());
        Assertions.assertEquals(asEventMessage2.getPayload(), ((TrackedEventMessage) openStream.nextAvailable()).getPayload());
        Assertions.assertFalse(openStream.hasNextAvailable());
        openStream.close();
    }

    @Test
    void peek() throws InterruptedException {
        EventMessage asEventMessage = GenericEventMessage.asEventMessage("Event1");
        this.eventStoreA.publish(new EventMessage[]{asEventMessage});
        MultiStreamableMessageSource.MultiSourceBlockingStream openStream = this.testSubject.openStream(this.testSubject.createTokenAt(recentTimeStamp()));
        Assertions.assertTrue(openStream.peek().isPresent());
        Assertions.assertEquals(asEventMessage.getPayload(), ((TrackedEventMessage) openStream.peek().get()).getPayload());
        Assertions.assertEquals(asEventMessage.getPayload(), ((TrackedEventMessage) openStream.nextAvailable()).getPayload());
        openStream.close();
    }

    @Test
    void peekWithMultipleStreams() throws InterruptedException {
        EventMessage asEventMessage = GenericEventMessage.asEventMessage("Event1");
        this.eventStoreA.publish(new EventMessage[]{asEventMessage});
        Thread.sleep(20L);
        EventMessage asEventMessage2 = GenericEventMessage.asEventMessage("Event2");
        this.eventStoreB.publish(new EventMessage[]{asEventMessage2});
        MultiStreamableMessageSource.MultiSourceBlockingStream openStream = this.testSubject.openStream(this.testSubject.createTokenAt(recentTimeStamp()));
        Assertions.assertTrue(openStream.peek().isPresent());
        TrackedEventMessage trackedEventMessage = (TrackedEventMessage) openStream.peek().get();
        MultiSourceTrackingToken trackingToken = trackedEventMessage.trackingToken();
        Assertions.assertEquals(asEventMessage.getPayload(), trackedEventMessage.getPayload());
        Assertions.assertEquals(trackedEventMessage.getPayload(), ((TrackedEventMessage) openStream.nextAvailable()).getPayload());
        Assertions.assertTrue(openStream.peek().isPresent());
        TrackedEventMessage trackedEventMessage2 = (TrackedEventMessage) openStream.peek().get();
        MultiSourceTrackingToken trackingToken2 = trackedEventMessage2.trackingToken();
        Assertions.assertEquals(asEventMessage2.getPayload(), trackedEventMessage2.getPayload());
        Assertions.assertEquals(trackedEventMessage2.getPayload(), ((TrackedEventMessage) openStream.nextAvailable()).getPayload());
        Assertions.assertEquals(trackingToken.getTokenForStream("eventStoreA"), trackingToken2.getTokenForStream("eventStoreA"));
        openStream.close();
    }

    private static Instant recentTimeStamp() {
        return Instant.now().minusMillis(1000L);
    }

    @Test
    void createTailToken() {
        this.eventStoreA.publish(new EventMessage[]{GenericEventMessage.asEventMessage("Event1")});
        this.eventStoreB.publish(new EventMessage[]{GenericEventMessage.asEventMessage("Event2")});
        MultiSourceTrackingToken createTailToken = this.testSubject.createTailToken();
        OptionalLong position = createTailToken.getTokenForStream("eventStoreA").position();
        Assertions.assertTrue(position.isPresent());
        Assertions.assertEquals(-1L, position.getAsLong());
        OptionalLong position2 = createTailToken.getTokenForStream("eventStoreB").position();
        Assertions.assertTrue(position2.isPresent());
        Assertions.assertEquals(-1L, position2.getAsLong());
    }

    @Test
    void createHeadToken() {
        this.eventStoreA.publish(new EventMessage[]{GenericEventMessage.asEventMessage("Event1")});
        EventMessage asEventMessage = GenericEventMessage.asEventMessage("Event2");
        this.eventStoreB.publish(new EventMessage[]{asEventMessage});
        this.eventStoreB.publish(new EventMessage[]{asEventMessage});
        MultiSourceTrackingToken createHeadToken = this.testSubject.createHeadToken();
        OptionalLong position = createHeadToken.getTokenForStream("eventStoreA").position();
        Assertions.assertTrue(position.isPresent());
        Assertions.assertEquals(0L, position.getAsLong());
        OptionalLong position2 = createHeadToken.getTokenForStream("eventStoreB").position();
        Assertions.assertTrue(position2.isPresent());
        Assertions.assertEquals(1L, position2.getAsLong());
    }

    @Test
    void createTokenAt() throws InterruptedException {
        EventMessage asEventMessage = GenericEventMessage.asEventMessage("Event1");
        this.eventStoreA.publish(new EventMessage[]{asEventMessage});
        this.eventStoreA.publish(new EventMessage[]{asEventMessage});
        Thread.sleep(20L);
        this.eventStoreB.publish(new EventMessage[]{GenericEventMessage.asEventMessage("Event2")});
        MultiSourceTrackingToken createTokenAt = this.testSubject.createTokenAt(Instant.now().minusMillis(10L));
        Assertions.assertEquals(this.eventStoreA.createHeadToken(), createTokenAt.getTokenForStream("eventStoreA"));
        OptionalLong position = createTokenAt.getTokenForStream("eventStoreB").position();
        Assertions.assertTrue(position.isPresent());
        Assertions.assertEquals(-1L, position.getAsLong());
    }

    @Test
    void createTokenSince() throws InterruptedException {
        EventMessage asEventMessage = GenericEventMessage.asEventMessage("Event1");
        this.eventStoreA.publish(new EventMessage[]{asEventMessage});
        this.eventStoreA.publish(new EventMessage[]{asEventMessage});
        Thread.sleep(20L);
        this.eventStoreB.publish(new EventMessage[]{GenericEventMessage.asEventMessage("Event2")});
        MultiSourceTrackingToken createTokenSince = this.testSubject.createTokenSince(Duration.ofMillis(10L));
        Assertions.assertEquals(this.eventStoreA.createHeadToken(), createTokenSince.getTokenForStream("eventStoreA"));
        OptionalLong position = createTokenSince.getTokenForStream("eventStoreB").position();
        Assertions.assertTrue(position.isPresent());
        Assertions.assertEquals(-1L, position.getAsLong());
    }

    @Test
    void configuredDifferentComparator() throws InterruptedException {
        Comparator thenComparing = Comparator.comparing(entry -> {
            return Boolean.valueOf(!((String) entry.getKey()).equals("eventStoreA"));
        }).thenComparing(entry2 -> {
            return ((TrackedEventMessage) entry2.getValue()).getTimestamp();
        });
        EmbeddedEventStore build = EmbeddedEventStore.builder().storageEngine(new InMemoryEventStorageEngine()).build();
        MultiStreamableMessageSource build2 = MultiStreamableMessageSource.builder().addMessageSource("eventStoreA", this.eventStoreA).addMessageSource("eventStoreB", this.eventStoreB).addMessageSource("eventStoreC", build).trackedEventComparator(thenComparing).build();
        EventMessage asEventMessage = GenericEventMessage.asEventMessage("Event1");
        this.eventStoreA.publish(new EventMessage[]{asEventMessage});
        this.eventStoreA.publish(new EventMessage[]{asEventMessage});
        this.eventStoreA.publish(new EventMessage[]{asEventMessage});
        EventMessage asEventMessage2 = GenericEventMessage.asEventMessage("Event2");
        build.publish(new EventMessage[]{asEventMessage2});
        Thread.sleep(5L);
        EventMessage asEventMessage3 = GenericEventMessage.asEventMessage("Event3");
        this.eventStoreB.publish(new EventMessage[]{asEventMessage3});
        MultiStreamableMessageSource.MultiSourceBlockingStream openStream = build2.openStream(build2.createTailToken());
        openStream.nextAvailable();
        openStream.nextAvailable();
        openStream.nextAvailable();
        Assertions.assertEquals(asEventMessage2.getPayload(), ((TrackedEventMessage) openStream.nextAvailable()).getPayload());
        Assertions.assertEquals(asEventMessage3.getPayload(), ((TrackedEventMessage) openStream.nextAvailable()).getPayload());
    }

    @Test
    void skipMessagesWithPayloadTypeOfInvokesAllConfiguredStreams() {
        GenericTrackedEventMessage genericTrackedEventMessage = new GenericTrackedEventMessage(new GlobalSequenceTrackingToken(1L), GenericEventMessage.asEventMessage("some-payload"));
        StreamableMessageSource streamableMessageSource = (StreamableMessageSource) Mockito.mock(StreamableMessageSource.class);
        BlockingStream blockingStream = (BlockingStream) Mockito.mock(BlockingStream.class);
        Mockito.when(streamableMessageSource.openStream((TrackingToken) Mockito.any())).thenReturn(blockingStream);
        StreamableMessageSource streamableMessageSource2 = (StreamableMessageSource) Mockito.mock(StreamableMessageSource.class);
        BlockingStream blockingStream2 = (BlockingStream) Mockito.mock(BlockingStream.class);
        Mockito.when(streamableMessageSource2.openStream((TrackingToken) Mockito.any())).thenReturn(blockingStream2);
        StreamableMessageSource streamableMessageSource3 = (StreamableMessageSource) Mockito.mock(StreamableMessageSource.class);
        BlockingStream blockingStream3 = (BlockingStream) Mockito.mock(BlockingStream.class);
        Mockito.when(streamableMessageSource3.openStream((TrackingToken) Mockito.any())).thenReturn(blockingStream3);
        MultiStreamableMessageSource.builder().addMessageSource("one", streamableMessageSource).addMessageSource("two", streamableMessageSource2).addMessageSource("three", streamableMessageSource3).build().openStream((TrackingToken) null).skipMessagesWithPayloadTypeOf(genericTrackedEventMessage);
        ((BlockingStream) Mockito.verify(blockingStream)).skipMessagesWithPayloadTypeOf(genericTrackedEventMessage);
        ((BlockingStream) Mockito.verify(blockingStream2)).skipMessagesWithPayloadTypeOf(genericTrackedEventMessage);
        ((BlockingStream) Mockito.verify(blockingStream3)).skipMessagesWithPayloadTypeOf(genericTrackedEventMessage);
    }

    @Test
    void setOnAvailableCallbackReturnsTrueIfAllStreamsReturnTrue() {
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        Runnable runnable = () -> {
            atomicBoolean.set(true);
        };
        CallbackSupportingBlockingStream callbackSupportingBlockingStream = (CallbackSupportingBlockingStream) Mockito.spy(new CallbackSupportingBlockingStream());
        StreamableMessageSource streamableMessageSource = (StreamableMessageSource) Mockito.mock(StreamableMessageSource.class);
        Mockito.when(streamableMessageSource.openStream((TrackingToken) Mockito.any())).thenReturn(callbackSupportingBlockingStream);
        BlockingStream blockingStream = (BlockingStream) Mockito.mock(BlockingStream.class);
        Mockito.when(Boolean.valueOf(blockingStream.setOnAvailableCallback((Runnable) Mockito.any()))).thenReturn(true);
        StreamableMessageSource streamableMessageSource2 = (StreamableMessageSource) Mockito.mock(StreamableMessageSource.class);
        Mockito.when(streamableMessageSource2.openStream((TrackingToken) Mockito.any())).thenReturn(blockingStream);
        BlockingStream blockingStream2 = (BlockingStream) Mockito.mock(BlockingStream.class);
        Mockito.when(Boolean.valueOf(blockingStream2.setOnAvailableCallback((Runnable) Mockito.any()))).thenReturn(true);
        StreamableMessageSource streamableMessageSource3 = (StreamableMessageSource) Mockito.mock(StreamableMessageSource.class);
        Mockito.when(streamableMessageSource3.openStream((TrackingToken) Mockito.any())).thenReturn(blockingStream2);
        Assertions.assertTrue(MultiStreamableMessageSource.builder().addMessageSource("one", streamableMessageSource).addMessageSource("two", streamableMessageSource2).addMessageSource("three", streamableMessageSource3).build().openStream((TrackingToken) null).setOnAvailableCallback(runnable));
        ((CallbackSupportingBlockingStream) Mockito.verify(callbackSupportingBlockingStream)).setOnAvailableCallback(runnable);
        ((BlockingStream) Mockito.verify(blockingStream)).setOnAvailableCallback(runnable);
        ((BlockingStream) Mockito.verify(blockingStream2)).setOnAvailableCallback(runnable);
        callbackSupportingBlockingStream.invokeCallback();
        Assertions.assertTrue(atomicBoolean.get());
    }

    @Test
    void setOnAvailableCallbackReturnsFalseIfOneStreamsReturnsFalse() {
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        Runnable runnable = () -> {
            atomicBoolean.set(true);
        };
        CallbackSupportingBlockingStream callbackSupportingBlockingStream = (CallbackSupportingBlockingStream) Mockito.spy(new CallbackSupportingBlockingStream());
        StreamableMessageSource streamableMessageSource = (StreamableMessageSource) Mockito.mock(StreamableMessageSource.class);
        Mockito.when(streamableMessageSource.openStream((TrackingToken) Mockito.any())).thenReturn(callbackSupportingBlockingStream);
        BlockingStream blockingStream = (BlockingStream) Mockito.mock(BlockingStream.class);
        Mockito.when(Boolean.valueOf(blockingStream.setOnAvailableCallback((Runnable) Mockito.any()))).thenReturn(false);
        StreamableMessageSource streamableMessageSource2 = (StreamableMessageSource) Mockito.mock(StreamableMessageSource.class);
        Mockito.when(streamableMessageSource2.openStream((TrackingToken) Mockito.any())).thenReturn(blockingStream);
        BlockingStream blockingStream2 = (BlockingStream) Mockito.mock(BlockingStream.class);
        Mockito.when(Boolean.valueOf(blockingStream2.setOnAvailableCallback((Runnable) Mockito.any()))).thenReturn(true);
        StreamableMessageSource streamableMessageSource3 = (StreamableMessageSource) Mockito.mock(StreamableMessageSource.class);
        Mockito.when(streamableMessageSource3.openStream((TrackingToken) Mockito.any())).thenReturn(blockingStream2);
        Assertions.assertFalse(MultiStreamableMessageSource.builder().addMessageSource("one", streamableMessageSource).addMessageSource("two", streamableMessageSource2).addMessageSource("three", streamableMessageSource3).build().openStream((TrackingToken) null).setOnAvailableCallback(runnable));
        ((CallbackSupportingBlockingStream) Mockito.verify(callbackSupportingBlockingStream)).setOnAvailableCallback(runnable);
        ((BlockingStream) Mockito.verify(blockingStream)).setOnAvailableCallback(runnable);
        ((BlockingStream) Mockito.verify(blockingStream2)).setOnAvailableCallback(runnable);
        callbackSupportingBlockingStream.invokeCallback();
        Assertions.assertTrue(atomicBoolean.get());
    }
}
