package org.axonframework.eventhandling.pooled;

import java.time.Duration;
import java.time.Instant;
import java.time.temporal.Temporal;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionFactory;
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.common.transaction.NoTransactionManager;
import org.axonframework.common.transaction.TransactionManager;
import org.axonframework.eventhandling.DefaultEventProcessorSpanFactory;
import org.axonframework.eventhandling.EventHandlerInvoker;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.EventTestUtils;
import org.axonframework.eventhandling.EventTrackerStatus;
import org.axonframework.eventhandling.GenericEventMessage;
import org.axonframework.eventhandling.GlobalSequenceTrackingToken;
import org.axonframework.eventhandling.PropagatingErrorHandler;
import org.axonframework.eventhandling.ReplayToken;
import org.axonframework.eventhandling.Segment;
import org.axonframework.eventhandling.TrackingToken;
import org.axonframework.eventhandling.pooled.PooledStreamingEventProcessor;
import org.axonframework.eventhandling.tokenstore.TokenStore;
import org.axonframework.eventhandling.tokenstore.inmemory.InMemoryTokenStore;
import org.axonframework.eventstreaming.StreamableEventSource;
import org.axonframework.eventstreaming.StreamingCondition;
import org.axonframework.messaging.InterceptorChain;
import org.axonframework.messaging.Message;
import org.axonframework.messaging.MessageHandlerInterceptor;
import org.axonframework.messaging.MessageStream;
import org.axonframework.messaging.MessageType;
import org.axonframework.messaging.unitofwork.LegacyUnitOfWork;
import org.axonframework.messaging.unitofwork.ProcessingContext;
import org.axonframework.tracing.TestSpanFactory;
import org.axonframework.utils.AssertUtils;
import org.axonframework.utils.AsyncInMemoryStreamableEventSource;
import org.axonframework.utils.DelegateScheduledExecutorService;
import org.axonframework.utils.MockException;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/axonframework/eventhandling/pooled/PooledStreamingEventProcessorTest.class */
class PooledStreamingEventProcessorTest {
    private static final Logger logger = LoggerFactory.getLogger(PooledStreamingEventProcessorTest.class);
    private static final String PROCESSOR_NAME = "test";
    private PooledStreamingEventProcessor testSubject;
    private EventHandlerInvoker stubEventHandler;
    private AsyncInMemoryStreamableEventSource stubMessageSource;
    private InMemoryTokenStore tokenStore;
    private ScheduledExecutorService coordinatorExecutor;
    private ScheduledExecutorService workerExecutor;
    private TestSpanFactory spanFactory;

    PooledStreamingEventProcessorTest() {
    }

    @BeforeEach
    void setUp() {
        this.stubMessageSource = new AsyncInMemoryStreamableEventSource();
        this.stubEventHandler = (EventHandlerInvoker) Mockito.mock(EventHandlerInvoker.class);
        this.tokenStore = (InMemoryTokenStore) Mockito.spy(new InMemoryTokenStore());
        this.coordinatorExecutor = new DelegateScheduledExecutorService(Executors.newScheduledThreadPool(2));
        this.workerExecutor = new DelegateScheduledExecutorService(Executors.newScheduledThreadPool(8));
        this.spanFactory = new TestSpanFactory();
        setTestSubject(createTestSubject());
        Mockito.when(Boolean.valueOf(this.stubEventHandler.canHandleType((Class) Mockito.any()))).thenReturn(true);
        Mockito.when(Boolean.valueOf(this.stubEventHandler.canHandle((EventMessage) Mockito.any(), (ProcessingContext) Mockito.any(), (Segment) Mockito.any()))).thenReturn(true);
    }

    private void setTestSubject(PooledStreamingEventProcessor pooledStreamingEventProcessor) {
        this.testSubject = pooledStreamingEventProcessor;
    }

    private PooledStreamingEventProcessor createTestSubject() {
        return createTestSubject(builder -> {
            return builder;
        });
    }

    private PooledStreamingEventProcessor createTestSubject(UnaryOperator<PooledStreamingEventProcessor.Builder> unaryOperator) {
        return ((PooledStreamingEventProcessor.Builder) unaryOperator.apply(PooledStreamingEventProcessor.builder().name(PROCESSOR_NAME).eventHandlerInvoker(this.stubEventHandler).errorHandler(PropagatingErrorHandler.instance()).eventSource(this.stubMessageSource).tokenStore(this.tokenStore).transactionManager(NoTransactionManager.instance()).coordinatorExecutor(this.coordinatorExecutor).workerExecutor(this.workerExecutor).initialSegmentCount(8).claimExtensionThreshold(500L).spanFactory(DefaultEventProcessorSpanFactory.builder().spanFactory(this.spanFactory).build()))).build();
    }

    @AfterEach
    void tearDown() {
        this.testSubject.shutDown();
        this.coordinatorExecutor.shutdown();
        this.workerExecutor.shutdown();
    }

    @Test
    void retriesWhenTokenInitializationInitiallyFails() {
        InMemoryTokenStore inMemoryTokenStore = (InMemoryTokenStore) Mockito.spy(this.tokenStore);
        setTestSubject(createTestSubject(builder -> {
            return builder.tokenStore(inMemoryTokenStore);
        }));
        ((InMemoryTokenStore) Mockito.doThrow(new Throwable[]{new RuntimeException("Simulated failure")}).doCallRealMethod().when(inMemoryTokenStore)).initializeTokenSegments((String) Mockito.any(), Mockito.anyInt(), (TrackingToken) Mockito.any());
        List<EventMessage<Integer>> createEvents = createEvents(100);
        AsyncInMemoryStreamableEventSource asyncInMemoryStreamableEventSource = this.stubMessageSource;
        Objects.requireNonNull(asyncInMemoryStreamableEventSource);
        createEvents.forEach(asyncInMemoryStreamableEventSource::publishMessage);
        mockEventHandlerInvoker();
        this.testSubject.start();
        Assertions.assertTrue(this.testSubject.isRunning());
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> {
            Assertions.assertEquals(8, this.testSubject.processingStatus().size());
        });
        AssertUtils.assertWithin(2, TimeUnit.SECONDS, () -> {
            Assertions.assertEquals(8L, IntStream.range(0, 8).mapToObj(i -> {
                return this.tokenStore.fetchToken(PROCESSOR_NAME, i);
            }).filter((v0) -> {
                return Objects.nonNull(v0);
            }).count());
        });
        Assertions.assertEquals(8, this.testSubject.processingStatus().size());
    }

    @Test
    void startShutsDownImmediatelyIfCoordinatorExecutorThrowsAnException() {
        ScheduledExecutorService scheduledExecutorService = (ScheduledExecutorService) Mockito.spy(this.coordinatorExecutor);
        ((ScheduledExecutorService) Mockito.doThrow(new Throwable[]{new IllegalArgumentException("Some exception")}).when(scheduledExecutorService)).submit((Runnable) Mockito.any(Runnable.class));
        setTestSubject(createTestSubject(builder -> {
            return builder.coordinatorExecutor(scheduledExecutorService);
        }));
        PooledStreamingEventProcessor pooledStreamingEventProcessor = this.testSubject;
        Objects.requireNonNull(pooledStreamingEventProcessor);
        Assertions.assertThrows(IllegalArgumentException.class, pooledStreamingEventProcessor::start);
        Assertions.assertFalse(this.testSubject.isRunning());
    }

    @Test
    void secondStartInvocationIsIgnored() {
        ScheduledExecutorService scheduledExecutorService = (ScheduledExecutorService) Mockito.spy(this.coordinatorExecutor);
        setTestSubject(createTestSubject(builder -> {
            return builder.coordinatorExecutor(scheduledExecutorService);
        }));
        this.testSubject.start();
        this.testSubject.start();
        ((ScheduledExecutorService) Mockito.verify(scheduledExecutorService, Mockito.times(1))).submit((Runnable) Mockito.any(Runnable.class));
    }

    @Test
    void startingProcessorClaimsAllAvailableTokens() {
        startAndAssertProcessorClaimsAllTokens();
    }

    private void startAndAssertProcessorClaimsAllTokens() {
        List<EventMessage<Integer>> createEvents = createEvents(100);
        AsyncInMemoryStreamableEventSource asyncInMemoryStreamableEventSource = this.stubMessageSource;
        Objects.requireNonNull(asyncInMemoryStreamableEventSource);
        createEvents.forEach(asyncInMemoryStreamableEventSource::publishMessage);
        mockEventHandlerInvoker();
        this.testSubject.start();
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> {
            Assertions.assertEquals(8, this.testSubject.processingStatus().size());
        });
        AssertUtils.assertWithin(2, TimeUnit.SECONDS, () -> {
            Assertions.assertEquals(8L, IntStream.range(0, 8).mapToObj(i -> {
                return this.tokenStore.fetchToken(PROCESSOR_NAME, i);
            }).filter((v0) -> {
                return Objects.nonNull(v0);
            }).count());
        });
        Assertions.assertEquals(8, this.testSubject.processingStatus().size());
    }

    @Test
    void handlingEventsAreCorrectlyTraced() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(8);
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        mockEventHandlerInvoker();
        ((EventHandlerInvoker) Mockito.doAnswer(invocationOnMock -> {
            Message<?> message = (EventMessage) invocationOnMock.getArgument(0, EventMessage.class);
            copyOnWriteArrayList.add(message);
            this.spanFactory.verifySpanActive("StreamingEventProcessor.batch");
            this.spanFactory.verifySpanActive("StreamingEventProcessor.process", message);
            countDownLatch.countDown();
            return null;
        }).when(this.stubEventHandler)).handle((EventMessage) Mockito.any(), (ProcessingContext) Mockito.any(), (Segment) Mockito.any());
        List<EventMessage<Integer>> createEvents = createEvents(8);
        AsyncInMemoryStreamableEventSource asyncInMemoryStreamableEventSource = this.stubMessageSource;
        Objects.requireNonNull(asyncInMemoryStreamableEventSource);
        createEvents.forEach(asyncInMemoryStreamableEventSource::publishMessage);
        this.testSubject.start();
        Assertions.assertTrue(countDownLatch.await(5L, TimeUnit.SECONDS));
        copyOnWriteArrayList.forEach(message -> {
            AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> {
                this.spanFactory.verifySpanCompleted("StreamingEventProcessor.process", message);
            });
        });
        this.spanFactory.verifySpanCompleted("StreamingEventProcessor.batch");
    }

    @Test
    void handlingEventsHaveSegmentAndTokenInUnitOfWork() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(8);
        mockEventHandlerInvoker();
        ((EventHandlerInvoker) Mockito.doAnswer(invocationOnMock -> {
            ProcessingContext processingContext = (ProcessingContext) invocationOnMock.getArgument(1, ProcessingContext.class);
            boolean isPresent = Segment.fromContext(processingContext).isPresent();
            boolean isPresent2 = TrackingToken.fromContext(processingContext).isPresent();
            if (!isPresent) {
                logger.error("UoW didn't contain the segment!");
                return null;
            }
            if (isPresent2) {
                countDownLatch.countDown();
                return null;
            }
            logger.error("UoW didn't contain the token!");
            return null;
        }).when(this.stubEventHandler)).handle((EventMessage) Mockito.any(), (ProcessingContext) Mockito.any(), (Segment) Mockito.any());
        List<EventMessage<Integer>> createEvents = createEvents(8);
        AsyncInMemoryStreamableEventSource asyncInMemoryStreamableEventSource = this.stubMessageSource;
        Objects.requireNonNull(asyncInMemoryStreamableEventSource);
        createEvents.forEach(asyncInMemoryStreamableEventSource::publishMessage);
        this.testSubject.start();
        Assertions.assertTrue(countDownLatch.await(5L, TimeUnit.SECONDS));
    }

    @Test
    void processorOnlyTriesToClaimAvailableSegments() {
        this.tokenStore.storeToken(new GlobalSequenceTrackingToken(1L), PROCESSOR_NAME, 0);
        this.tokenStore.storeToken(new GlobalSequenceTrackingToken(2L), PROCESSOR_NAME, 1);
        this.tokenStore.storeToken(new GlobalSequenceTrackingToken(1L), PROCESSOR_NAME, 2);
        this.tokenStore.storeToken(new GlobalSequenceTrackingToken(1L), PROCESSOR_NAME, 3);
        Mockito.when(this.tokenStore.fetchAvailableSegments(this.testSubject.getName())).thenReturn(Collections.singletonList(Segment.computeSegment(2, new int[]{0, 1, 2, 3})));
        this.testSubject.start();
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> {
            Assertions.assertEquals(1, this.testSubject.processingStatus().size());
        });
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> {
            Assertions.assertTrue(this.testSubject.processingStatus().containsKey(2));
        });
        ((InMemoryTokenStore) Mockito.verify(this.tokenStore, Mockito.never())).fetchToken((String) Mockito.eq(this.testSubject.getName()), Mockito.intThat(num -> {
            return Arrays.asList(0, 1, 3).contains(num);
        }));
    }

    @Test
    void startingAfterShutdownLetsProcessorProceed() {
        Mockito.when(Boolean.valueOf(this.stubEventHandler.supportsReset())).thenReturn(true);
        this.testSubject.start();
        this.testSubject.shutDown();
        List<EventMessage<Integer>> createEvents = createEvents(100);
        AsyncInMemoryStreamableEventSource asyncInMemoryStreamableEventSource = this.stubMessageSource;
        Objects.requireNonNull(asyncInMemoryStreamableEventSource);
        createEvents.forEach(asyncInMemoryStreamableEventSource::publishMessage);
        this.testSubject.start();
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> {
            Assertions.assertEquals(8, this.testSubject.processingStatus().size());
        });
        AssertUtils.assertWithin(2, TimeUnit.SECONDS, () -> {
            Assertions.assertEquals(8L, IntStream.range(0, 8).mapToObj(i -> {
                return this.tokenStore.fetchToken(PROCESSOR_NAME, i);
            }).filter((v0) -> {
                return Objects.nonNull(v0);
            }).count());
        });
        Assertions.assertEquals(8, this.testSubject.processingStatus().size());
    }

    @Test
    void allTokensUpdatedToLatestValue() {
        List<EventMessage<Integer>> createEvents = createEvents(100);
        AsyncInMemoryStreamableEventSource asyncInMemoryStreamableEventSource = this.stubMessageSource;
        Objects.requireNonNull(asyncInMemoryStreamableEventSource);
        createEvents.forEach(asyncInMemoryStreamableEventSource::publishMessage);
        mockEventHandlerInvoker();
        this.testSubject.start();
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> {
            Assertions.assertEquals(8, this.testSubject.processingStatus().size());
        });
        AssertUtils.assertWithin(6, TimeUnit.SECONDS, () -> {
            Assertions.assertEquals(100L, IntStream.range(0, 8).mapToObj(i -> {
                return this.tokenStore.fetchToken(this.testSubject.getName(), i);
            }).mapToLong(this::tokenPosition).min().orElse(-1L));
        });
    }

    private static List<EventMessage<Integer>> createEvents(int i) {
        return (List) IntStream.range(0, i).mapToObj(i2 -> {
            return new GenericEventMessage(new MessageType("event"), Integer.valueOf(i2));
        }).collect(Collectors.toList());
    }

    private long tokenPosition(TrackingToken trackingToken) {
        if (trackingToken == null) {
            return 0L;
        }
        return trackingToken.position().orElse(0L);
    }

    @Test
    void exceptionWhileHandlingEventAbortsWorker() throws Exception {
        MessageType messageType = new MessageType("event");
        List list = (List) Stream.of((Object[]) new Integer[]{1, 2, 2, 4, 5}).map(num -> {
            return new GenericEventMessage(messageType, num);
        }).collect(Collectors.toList());
        mockEventHandlerInvoker();
        ((EventHandlerInvoker) Mockito.doThrow(new Throwable[]{new RuntimeException("Simulating worker failure")}).doNothing().when(this.stubEventHandler)).handle((EventMessage) Mockito.argThat(eventMessage -> {
            return eventMessage.getIdentifier().equals(((EventMessage) list.get(2)).getIdentifier());
        }), (ProcessingContext) Mockito.any(), (Segment) Mockito.any());
        this.testSubject.start();
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> {
            Assertions.assertEquals(8, this.testSubject.processingStatus().size());
        });
        Assertions.assertEquals(8, this.tokenStore.fetchSegments(PROCESSOR_NAME).length);
        ((EventHandlerInvoker) Mockito.verify(this.stubEventHandler, Mockito.never())).canHandle((EventMessage) Mockito.any(), (ProcessingContext) Mockito.any(), (Segment) Mockito.any());
        list.forEach(eventMessage2 -> {
            this.stubMessageSource.publishMessage(eventMessage2);
        });
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> {
            try {
                ((EventHandlerInvoker) Mockito.verify(this.stubEventHandler)).handle((EventMessage) Mockito.argThat(eventMessage3 -> {
                    return eventMessage3.getIdentifier().equals(((EventMessage) list.get(2)).getIdentifier());
                }), (ProcessingContext) Mockito.any(), (Segment) Mockito.argThat(segment -> {
                    return segment.getSegmentId() == ((Integer) ((EventMessage) list.get(2)).getPayload()).intValue();
                }));
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> {
            Assertions.assertEquals(7, this.testSubject.processingStatus().size());
            Assertions.assertFalse(this.testSubject.processingStatus().containsKey(2));
        });
    }

    @Test
    void workPackageIsAbortedWhenExtendingClaimFails() {
        InMemoryTokenStore inMemoryTokenStore = (InMemoryTokenStore) Mockito.spy(this.tokenStore);
        setTestSubject(createTestSubject(builder -> {
            return builder.tokenStore(inMemoryTokenStore).eventSource(new AsyncInMemoryStreamableEventSource(true)).claimExtensionThreshold(10L);
        }));
        ((InMemoryTokenStore) Mockito.doThrow(new Throwable[]{new MockException("Simulated failure")}).when(inMemoryTokenStore)).extendClaim((String) Mockito.any(), Mockito.anyInt());
        this.testSubject.start();
        AssertUtils.assertWithin(250, TimeUnit.MILLISECONDS, () -> {
            ((InMemoryTokenStore) Mockito.verify(inMemoryTokenStore, Mockito.atLeastOnce())).extendClaim(this.testSubject.getName(), 0);
        });
        AssertUtils.assertWithin(100, TimeUnit.MILLISECONDS, () -> {
            Assertions.assertTrue(this.testSubject.processingStatus().isEmpty());
        });
    }

    @Disabled("TODO #3098 - Support ignoring events by mean of the EventCriteria API")
    @Test
    void handlingUnknownMessageTypeWillAdvanceToken() {
        setTestSubject(createTestSubject(builder -> {
            return builder.initialSegmentCount(1);
        }));
        Mockito.when(Boolean.valueOf(this.stubEventHandler.canHandle((EventMessage) Mockito.any(), (ProcessingContext) Mockito.any(), (Segment) Mockito.any()))).thenReturn(false);
        Mockito.when(Boolean.valueOf(this.stubEventHandler.canHandleType(Integer.class))).thenReturn(false);
        this.stubMessageSource.publishMessage(EventTestUtils.asEventMessage(1337));
        this.testSubject.start();
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> {
            Assertions.assertEquals(1, this.testSubject.processingStatus().size());
        });
        AssertUtils.assertWithin(100, TimeUnit.MILLISECONDS, () -> {
            Assertions.assertEquals(1L, ((EventTrackerStatus) this.testSubject.processingStatus().get(0)).getCurrentPosition().orElse(0L));
        });
        Assertions.assertEquals(1, this.stubMessageSource.getIgnoredEvents().size());
    }

    @Test
    void tokenStoreReturningSingleNullToken() {
        Mockito.when(Boolean.valueOf(this.stubEventHandler.canHandle((EventMessage) Mockito.any(), (ProcessingContext) Mockito.any(), (Segment) Mockito.any()))).thenReturn(false);
        Mockito.when(Boolean.valueOf(this.stubEventHandler.canHandleType(Integer.class))).thenReturn(false);
        this.tokenStore.initializeTokenSegments(this.testSubject.getName(), 2);
        this.tokenStore.storeToken(new GlobalSequenceTrackingToken(0L), this.testSubject.getName(), 1);
        this.testSubject.start();
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> {
            Assertions.assertEquals(2, this.testSubject.processingStatus().size());
        });
    }

    @Disabled("TODO #3098 - Support ignoring events by mean of the EventCriteria API")
    @Test
    void eventsWhichMustBeIgnoredAreNotHandledOnlyValidated() throws Exception {
        setTestSubject(createTestSubject(builder -> {
            return builder.initialSegmentCount(1);
        }));
        Mockito.when(Boolean.valueOf(this.stubEventHandler.canHandle((EventMessage) Mockito.argThat(eventMessage -> {
            return eventMessage != null && Integer.class.equals(eventMessage.getPayloadType());
        }), (ProcessingContext) Mockito.any(), (Segment) Mockito.any()))).thenReturn(false);
        Mockito.when(Boolean.valueOf(this.stubEventHandler.canHandle((EventMessage) Mockito.argThat(eventMessage2 -> {
            return eventMessage2 != null && String.class.equals(eventMessage2.getPayloadType());
        }), (ProcessingContext) Mockito.any(), (Segment) Mockito.any()))).thenReturn(true);
        Mockito.when(Boolean.valueOf(this.stubEventHandler.canHandleType(Integer.class))).thenReturn(false);
        Mockito.when(Boolean.valueOf(this.stubEventHandler.canHandleType(String.class))).thenReturn(true);
        EventMessage<?> asEventMessage = EventTestUtils.asEventMessage(1337);
        EventMessage<?> asEventMessage2 = EventTestUtils.asEventMessage(42);
        EventMessage<?> asEventMessage3 = EventTestUtils.asEventMessage(9001);
        ArrayList arrayList = new ArrayList();
        arrayList.add((Integer) asEventMessage.getPayload());
        arrayList.add((Integer) asEventMessage2.getPayload());
        arrayList.add((Integer) asEventMessage3.getPayload());
        EventMessage<?> asEventMessage4 = EventTestUtils.asEventMessage("some-text");
        EventMessage<?> asEventMessage5 = EventTestUtils.asEventMessage("some-other-text");
        ArrayList arrayList2 = new ArrayList();
        arrayList2.add((String) asEventMessage4.getPayload());
        arrayList2.add((String) asEventMessage5.getPayload());
        ArrayList arrayList3 = new ArrayList();
        arrayList3.add(asEventMessage.getPayload());
        arrayList3.add(asEventMessage2.getPayload());
        arrayList3.add(asEventMessage3.getPayload());
        arrayList3.add(asEventMessage4.getPayload());
        arrayList3.add(asEventMessage5.getPayload());
        this.stubMessageSource.publishMessage(asEventMessage);
        this.stubMessageSource.publishMessage(asEventMessage2);
        this.stubMessageSource.publishMessage(asEventMessage3);
        this.stubMessageSource.publishMessage(asEventMessage4);
        this.stubMessageSource.publishMessage(asEventMessage5);
        this.testSubject.start();
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> {
            Assertions.assertEquals(1, this.testSubject.processingStatus().size());
        });
        ArgumentCaptor forClass = ArgumentCaptor.forClass(EventMessage.class);
        ((EventHandlerInvoker) Mockito.verify(this.stubEventHandler, Mockito.timeout(500L).times(5))).canHandle((EventMessage) forClass.capture(), (ProcessingContext) Mockito.any(), (Segment) Mockito.any());
        List allValues = forClass.getAllValues();
        Assertions.assertEquals(5, allValues.size());
        Iterator it = allValues.iterator();
        while (it.hasNext()) {
            Assertions.assertTrue(arrayList3.contains(((EventMessage) it.next()).getPayload()));
        }
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(EventMessage.class);
        ((EventHandlerInvoker) Mockito.verify(this.stubEventHandler, Mockito.timeout(500L).times(2))).handle((EventMessage) forClass2.capture(), (ProcessingContext) Mockito.any(), (Segment) Mockito.any());
        List allValues2 = forClass2.getAllValues();
        Assertions.assertEquals(2, allValues2.size());
        Iterator it2 = allValues2.iterator();
        while (it2.hasNext()) {
            Assertions.assertTrue(arrayList2.contains(((EventMessage) it2.next()).getPayload()));
        }
        List<EventMessage<?>> ignoredEvents = this.stubMessageSource.getIgnoredEvents();
        Assertions.assertEquals(3, ignoredEvents.size());
        Iterator<EventMessage<?>> it3 = ignoredEvents.iterator();
        while (it3.hasNext()) {
            Assertions.assertTrue(arrayList.contains(it3.next().getPayload()));
        }
    }

    @Test
    void coordinationIsTriggeredThroughEventAvailabilityCallback() {
        AsyncInMemoryStreamableEventSource asyncInMemoryStreamableEventSource = new AsyncInMemoryStreamableEventSource(true);
        setTestSubject(createTestSubject(builder -> {
            return builder.eventSource(asyncInMemoryStreamableEventSource);
        }));
        mockEventHandlerInvoker();
        Stream map = Stream.of((Object[]) new Integer[]{0, 1, 2, 3}).map(num -> {
            return new GenericEventMessage(new MessageType("event"), num);
        });
        Objects.requireNonNull(asyncInMemoryStreamableEventSource);
        map.forEach((v1) -> {
            r1.publishMessage(v1);
        });
        this.testSubject.start();
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> {
            Assertions.assertEquals(8, this.testSubject.processingStatus().size());
        });
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> {
            Assertions.assertEquals(4L, ((Long) this.testSubject.processingStatus().values().stream().map(eventTrackerStatus -> {
                return Long.valueOf(eventTrackerStatus.getCurrentPosition().orElse(-1L));
            }).min((v0, v1) -> {
                return v0.compareTo(v1);
            }).orElse(-1L)).longValue());
        });
        Stream map2 = Stream.of((Object[]) new Integer[]{4, 5, 6, 7}).map(num2 -> {
            return new GenericEventMessage(new MessageType("event"), num2);
        });
        Objects.requireNonNull(asyncInMemoryStreamableEventSource);
        map2.forEach((v1) -> {
            r1.publishMessage(v1);
        });
        asyncInMemoryStreamableEventSource.runOnAvailableCallback();
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> {
            Assertions.assertEquals(8L, ((Long) this.testSubject.processingStatus().values().stream().map(eventTrackerStatus -> {
                return Long.valueOf(eventTrackerStatus.getCurrentPosition().orElse(-1L));
            }).min((v0, v1) -> {
                return v0.compareTo(v1);
            }).orElse(-1L)).longValue());
        });
    }

    @Test
    void shutdownCompletesAfterAbortingWorkPackages() throws InterruptedException, ExecutionException, TimeoutException {
        this.testSubject.start();
        Stream map = Stream.of((Object[]) new Integer[]{1, 2, 2, 4, 5}).map(num -> {
            return new GenericEventMessage(new MessageType("event"), num);
        });
        AsyncInMemoryStreamableEventSource asyncInMemoryStreamableEventSource = this.stubMessageSource;
        Objects.requireNonNull(asyncInMemoryStreamableEventSource);
        map.forEach((v1) -> {
            r1.publishMessage(v1);
        });
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> {
            Assertions.assertFalse(this.testSubject.processingStatus().isEmpty());
        });
        this.testSubject.shutdownAsync().get(1L, TimeUnit.SECONDS);
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> {
            Assertions.assertEquals(0, this.testSubject.processingStatus().size());
        });
        Assertions.assertFalse(this.coordinatorExecutor.isShutdown());
        Assertions.assertFalse(this.workerExecutor.isShutdown());
    }

    @Test
    void shutdownProcessorWhichHasNotStartedYetReturnsCompletedFuture() {
        Assertions.assertTrue(this.testSubject.shutdownAsync().isDone());
    }

    @Test
    void shutdownProcessorAsyncTwiceReturnsSameFuture() {
        this.testSubject.start();
        Assertions.assertSame(this.testSubject.shutdownAsync(), this.testSubject.shutdownAsync());
    }

    @Test
    void startFailsWhenShutdownIsInProgress() throws Exception {
        Mockito.when(Boolean.valueOf(this.stubEventHandler.canHandle((EventMessage) Mockito.any(), (ProcessingContext) Mockito.any(), (Segment) Mockito.any()))).thenReturn(true);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        ((EventHandlerInvoker) Mockito.doAnswer(invocationOnMock -> {
            return Boolean.valueOf(countDownLatch.await(10L, TimeUnit.MILLISECONDS));
        }).when(this.stubEventHandler)).handle((EventMessage) Mockito.any(), (ProcessingContext) Mockito.any(), (Segment) Mockito.any());
        this.testSubject.start();
        Stream map = Stream.of((Object[]) new Integer[]{1, 2, 2, 4, 5}).map(num -> {
            return new GenericEventMessage(new MessageType("event"), num);
        });
        AsyncInMemoryStreamableEventSource asyncInMemoryStreamableEventSource = this.stubMessageSource;
        Objects.requireNonNull(asyncInMemoryStreamableEventSource);
        map.forEach((v1) -> {
            r1.publishMessage(v1);
        });
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> {
            Assertions.assertFalse(this.testSubject.processingStatus().isEmpty());
        });
        CompletableFuture shutdownAsync = this.testSubject.shutdownAsync();
        Assertions.assertThrows(IllegalStateException.class, () -> {
            this.testSubject.start();
        });
        countDownLatch.countDown();
        shutdownAsync.get(1L, TimeUnit.SECONDS);
        Assertions.assertDoesNotThrow(() -> {
            this.testSubject.start();
        });
    }

    @Test
    void isRunningOnlyReturnsTrueForStartedProcessor() {
        Assertions.assertFalse(this.testSubject.isRunning());
        this.testSubject.start();
        Assertions.assertTrue(this.testSubject.isRunning());
    }

    @Test
    void isErrorForFailingMessageSourceOperation() {
        Assertions.assertFalse(this.testSubject.isError());
        this.testSubject.start();
        Assertions.assertFalse(this.testSubject.isError());
        this.stubMessageSource.publishMessage(AsyncInMemoryStreamableEventSource.FAIL_EVENT);
        AssertUtils.assertWithin(500, TimeUnit.MILLISECONDS, () -> {
            Assertions.assertTrue(this.testSubject.isError());
        });
        Stream map = Stream.of((Object[]) new Integer[]{1, 2, 2, 4, 5}).map(num -> {
            return new GenericEventMessage(new MessageType("event"), num);
        });
        AsyncInMemoryStreamableEventSource asyncInMemoryStreamableEventSource = this.stubMessageSource;
        Objects.requireNonNull(asyncInMemoryStreamableEventSource);
        map.forEach((v1) -> {
            r1.publishMessage(v1);
        });
        AssertUtils.assertWithin(1500, TimeUnit.MILLISECONDS, () -> {
            Assertions.assertFalse(this.testSubject.isError());
        });
    }

    @Test
    void isErrorWhenOpeningTheStreamFails() {
        StreamableEventSource streamableEventSource = (StreamableEventSource) Mockito.spy(new AsyncInMemoryStreamableEventSource());
        Mockito.when(streamableEventSource.open((StreamingCondition) Mockito.any())).thenThrow(new Throwable[]{new IllegalStateException("Failed to open the stream")}).thenCallRealMethod();
        setTestSubject(createTestSubject(builder -> {
            return builder.eventSource(streamableEventSource);
        }));
        Assertions.assertFalse(this.testSubject.isError());
        this.testSubject.start();
        AssertUtils.assertWithin(500, TimeUnit.MILLISECONDS, () -> {
            Assertions.assertTrue(this.testSubject.isError());
        });
        Stream map = Stream.of((Object[]) new Integer[]{1, 2, 2, 4, 5}).map(num -> {
            return new GenericEventMessage(new MessageType("event"), num);
        });
        AsyncInMemoryStreamableEventSource asyncInMemoryStreamableEventSource = this.stubMessageSource;
        Objects.requireNonNull(asyncInMemoryStreamableEventSource);
        map.forEach((v1) -> {
            r1.publishMessage(v1);
        });
        AssertUtils.assertWithin(1500, TimeUnit.MILLISECONDS, () -> {
            Assertions.assertFalse(this.testSubject.isError());
        });
    }

    @Test
    void getTokenStoreIdentifier() {
        TokenStore tokenStore = (TokenStore) Mockito.mock(TokenStore.class);
        Mockito.when(tokenStore.retrieveStorageIdentifier()).thenReturn(Optional.of("some-identifier"));
        setTestSubject(createTestSubject(builder -> {
            return builder.tokenStore(tokenStore);
        }));
        Assertions.assertEquals("some-identifier", this.testSubject.getTokenStoreIdentifier());
    }

    @Test
    void releaseSegmentMakesTheTokenUnclaimedForTwiceTheTokenClaimInterval() {
        int i = 0;
        int i2 = 500;
        setTestSubject(createTestSubject(builder -> {
            return builder.initialSegmentCount(1).tokenClaimInterval(i2);
        }));
        this.testSubject.start();
        AssertUtils.assertWithin(500, TimeUnit.MILLISECONDS, () -> {
            Assertions.assertNotNull(this.testSubject.processingStatus().get(Integer.valueOf(i)));
        });
        this.testSubject.releaseSegment(0);
        AssertUtils.assertWithin(500 + 50, TimeUnit.MILLISECONDS, () -> {
            Assertions.assertNull(this.testSubject.processingStatus().get(Integer.valueOf(i)));
        });
        AssertUtils.assertWithin((500 * 2) + 50, TimeUnit.MILLISECONDS, () -> {
            Assertions.assertNotNull(this.testSubject.processingStatus().get(Integer.valueOf(i)));
        });
    }

    @Test
    void splitSegmentIsNotSupported() {
        TokenStore tokenStore = (TokenStore) Mockito.mock(TokenStore.class);
        Mockito.when(Boolean.valueOf(tokenStore.requiresExplicitSegmentInitialization())).thenReturn(false);
        setTestSubject(createTestSubject(builder -> {
            return builder.tokenStore(tokenStore);
        }));
        CompletableFuture splitSegment = this.testSubject.splitSegment(0);
        Assertions.assertTrue(splitSegment.isDone());
        Assertions.assertTrue(splitSegment.isCompletedExceptionally());
        splitSegment.exceptionally(th -> {
            Assertions.assertTrue(th.getClass().isAssignableFrom(UnsupportedOperationException.class));
            return null;
        });
    }

    @Test
    void splitSegment() {
        int i = 0;
        int i2 = 500;
        setTestSubject(createTestSubject(builder -> {
            return builder.initialSegmentCount(1).tokenClaimInterval(i2);
        }));
        this.testSubject.start();
        AssertUtils.assertWithin(500, TimeUnit.MILLISECONDS, () -> {
            Assertions.assertNotNull(this.testSubject.processingStatus().get(Integer.valueOf(i)));
        });
        CompletableFuture splitSegment = this.testSubject.splitSegment(0);
        AssertUtils.assertWithin(500 * 2, TimeUnit.MILLISECONDS, () -> {
            Assertions.assertTrue(splitSegment.isDone());
        });
        Assertions.assertFalse(splitSegment.isCompletedExceptionally());
        AssertUtils.assertWithin(500, TimeUnit.MILLISECONDS, () -> {
            Assertions.assertNotNull(this.testSubject.processingStatus().get(Integer.valueOf(i)));
        });
        AssertUtils.assertWithin(500, TimeUnit.MILLISECONDS, () -> {
            Assertions.assertNotNull(this.testSubject.processingStatus().get(1));
        });
    }

    @Test
    void mergeSegmentIsNotSupported() {
        TokenStore tokenStore = (TokenStore) Mockito.mock(TokenStore.class);
        Mockito.when(Boolean.valueOf(tokenStore.requiresExplicitSegmentInitialization())).thenReturn(false);
        setTestSubject(createTestSubject(builder -> {
            return builder.tokenStore(tokenStore);
        }));
        CompletableFuture mergeSegment = this.testSubject.mergeSegment(0);
        Assertions.assertTrue(mergeSegment.isDone());
        Assertions.assertTrue(mergeSegment.isCompletedExceptionally());
        mergeSegment.exceptionally(th -> {
            Assertions.assertTrue(th.getClass().isAssignableFrom(UnsupportedOperationException.class));
            return null;
        });
    }

    @Test
    void mergeSegment() {
        int i = 0;
        int i2 = 1;
        int i3 = 500;
        setTestSubject(createTestSubject(builder -> {
            return builder.initialSegmentCount(2).tokenClaimInterval(i3);
        }));
        this.testSubject.start();
        AssertUtils.assertWithin(500, TimeUnit.MILLISECONDS, () -> {
            Assertions.assertNotNull(this.testSubject.processingStatus().get(Integer.valueOf(i)));
            Assertions.assertNotNull(this.testSubject.processingStatus().get(Integer.valueOf(i2)));
        });
        CompletableFuture mergeSegment = this.testSubject.mergeSegment(0);
        AssertUtils.assertWithin(500 * 2, TimeUnit.MILLISECONDS, () -> {
            Assertions.assertTrue(mergeSegment.isDone());
        });
        Assertions.assertFalse(mergeSegment.isCompletedExceptionally());
        AssertUtils.assertWithin(500, TimeUnit.MILLISECONDS, () -> {
            Assertions.assertNotNull(this.testSubject.processingStatus().get(Integer.valueOf(i)));
        });
        AssertUtils.assertWithin(500, TimeUnit.MILLISECONDS, () -> {
            Assertions.assertNull(this.testSubject.processingStatus().get(Integer.valueOf(i2)));
        });
    }

    @Test
    void releaseAndClaimSegment() {
        int i = 0;
        int i2 = 5000;
        setTestSubject(createTestSubject(builder -> {
            return builder.initialSegmentCount(2).tokenClaimInterval(i2);
        }));
        this.testSubject.start();
        AssertUtils.assertWithin(5000, TimeUnit.MILLISECONDS, () -> {
            Assertions.assertNotNull(this.testSubject.processingStatus().get(Integer.valueOf(i)));
        });
        this.testSubject.releaseSegment(0, 180L, TimeUnit.SECONDS);
        AssertUtils.assertWithin(5000, TimeUnit.MILLISECONDS, () -> {
            Assertions.assertEquals(1, this.testSubject.processingStatus().size());
        });
        this.testSubject.claimSegment(0);
        AssertUtils.assertWithin(5000, TimeUnit.MILLISECONDS, () -> {
            Assertions.assertEquals(2, this.testSubject.processingStatus().size());
        });
    }

    @Test
    void supportReset() {
        Mockito.when(Boolean.valueOf(this.stubEventHandler.supportsReset())).thenReturn(true);
        Assertions.assertTrue(this.testSubject.supportsReset());
        Mockito.when(Boolean.valueOf(this.stubEventHandler.supportsReset())).thenReturn(false);
        Assertions.assertFalse(this.testSubject.supportsReset());
    }

    @Test
    void resetTokensFailsIfTheProcessorIsStillRunning() {
        this.testSubject.start();
        Assertions.assertThrows(IllegalStateException.class, () -> {
            this.testSubject.resetTokens();
        });
    }

    @Test
    void resetTokens() {
        int i = 2;
        GlobalSequenceTrackingToken globalSequenceTrackingToken = new GlobalSequenceTrackingToken(42L);
        Mockito.when(Boolean.valueOf(this.stubEventHandler.supportsReset())).thenReturn(true);
        setTestSubject(createTestSubject(builder -> {
            return builder.initialSegmentCount(i).initialToken(trackingTokenSource -> {
                return CompletableFuture.completedFuture(globalSequenceTrackingToken);
            });
        }));
        this.testSubject.start();
        AssertUtils.assertWithin(2, TimeUnit.SECONDS, () -> {
            Assertions.assertEquals(i, this.tokenStore.fetchSegments(PROCESSOR_NAME).length);
        });
        this.testSubject.shutDown();
        this.testSubject.resetTokens();
        ((EventHandlerInvoker) Mockito.verify(this.stubEventHandler)).performReset((Object) null, (ProcessingContext) null);
        int[] fetchSegments = this.tokenStore.fetchSegments(PROCESSOR_NAME);
        Assertions.assertEquals(globalSequenceTrackingToken, this.tokenStore.fetchToken(PROCESSOR_NAME, fetchSegments[0]));
        Assertions.assertEquals(globalSequenceTrackingToken, this.tokenStore.fetchToken(PROCESSOR_NAME, fetchSegments[1]));
    }

    @Test
    void resetTokensWithContext() {
        int i = 2;
        GlobalSequenceTrackingToken globalSequenceTrackingToken = new GlobalSequenceTrackingToken(42L);
        Mockito.when(Boolean.valueOf(this.stubEventHandler.supportsReset())).thenReturn(true);
        setTestSubject(createTestSubject(builder -> {
            return builder.initialSegmentCount(i).initialToken(trackingTokenSource -> {
                return CompletableFuture.completedFuture(globalSequenceTrackingToken);
            });
        }));
        this.testSubject.start();
        Awaitility.await().atMost(Duration.ofSeconds(2L)).untilAsserted(() -> {
            Assertions.assertEquals(i, this.tokenStore.fetchSegments(PROCESSOR_NAME).length);
        });
        this.testSubject.shutDown();
        this.testSubject.resetTokens("my-context");
        ((EventHandlerInvoker) Mockito.verify(this.stubEventHandler)).performReset("my-context", (ProcessingContext) null);
        int[] fetchSegments = this.tokenStore.fetchSegments(PROCESSOR_NAME);
        Assertions.assertEquals(globalSequenceTrackingToken, this.tokenStore.fetchToken(PROCESSOR_NAME, fetchSegments[0]));
        Assertions.assertEquals(globalSequenceTrackingToken, this.tokenStore.fetchToken(PROCESSOR_NAME, fetchSegments[1]));
        Awaitility.await().atMost(Duration.ofSeconds(2L)).untilAsserted(() -> {
            ((EventHandlerInvoker) Mockito.verify(this.stubEventHandler, Mockito.times(2))).segmentReleased((Segment) Mockito.any(Segment.class));
        });
    }

    @Test
    void resetTokensFromDefinedPosition() {
        GlobalSequenceTrackingToken globalSequenceTrackingToken = new GlobalSequenceTrackingToken(42L);
        int i = 2;
        TrackingToken createReplayToken = ReplayToken.createReplayToken(globalSequenceTrackingToken, (TrackingToken) null);
        Mockito.when(Boolean.valueOf(this.stubEventHandler.supportsReset())).thenReturn(true);
        setTestSubject(createTestSubject(builder -> {
            return builder.initialSegmentCount(i).initialToken(trackingTokenSource -> {
                return CompletableFuture.completedFuture(globalSequenceTrackingToken);
            });
        }));
        this.testSubject.start();
        AssertUtils.assertWithin(2, TimeUnit.SECONDS, () -> {
            Assertions.assertEquals(i, this.tokenStore.fetchSegments(PROCESSOR_NAME).length);
        });
        this.testSubject.shutDown();
        this.testSubject.resetTokens(trackingTokenSource -> {
            return trackingTokenSource.latestToken();
        });
        ((EventHandlerInvoker) Mockito.verify(this.stubEventHandler)).performReset(Mockito.isNull(), (ProcessingContext) Mockito.any());
        int[] fetchSegments = this.tokenStore.fetchSegments(PROCESSOR_NAME);
        Assertions.assertEquals(createReplayToken, this.tokenStore.fetchToken(PROCESSOR_NAME, fetchSegments[0]));
        Assertions.assertEquals(createReplayToken, this.tokenStore.fetchToken(PROCESSOR_NAME, fetchSegments[1]));
    }

    @Test
    void resetTokensFromDefinedPositionAndWithResetContext() {
        GlobalSequenceTrackingToken globalSequenceTrackingToken = new GlobalSequenceTrackingToken(42L);
        int i = 2;
        TrackingToken createReplayToken = ReplayToken.createReplayToken(globalSequenceTrackingToken, (TrackingToken) null, "my-context");
        Mockito.when(Boolean.valueOf(this.stubEventHandler.supportsReset())).thenReturn(true);
        setTestSubject(createTestSubject(builder -> {
            return builder.initialSegmentCount(i).initialToken(trackingTokenSource -> {
                return CompletableFuture.completedFuture(globalSequenceTrackingToken);
            });
        }));
        this.testSubject.start();
        AssertUtils.assertWithin(2, TimeUnit.SECONDS, () -> {
            Assertions.assertEquals(i, this.tokenStore.fetchSegments(PROCESSOR_NAME).length);
        });
        this.testSubject.shutDown();
        this.testSubject.resetTokens(trackingTokenSource -> {
            return trackingTokenSource.latestToken();
        }, "my-context");
        ((EventHandlerInvoker) Mockito.verify(this.stubEventHandler)).performReset((String) Mockito.eq("my-context"), (ProcessingContext) Mockito.any());
        int[] fetchSegments = this.tokenStore.fetchSegments(PROCESSOR_NAME);
        Assertions.assertEquals(createReplayToken, this.tokenStore.fetchToken(PROCESSOR_NAME, fetchSegments[0]));
        Assertions.assertEquals(createReplayToken, this.tokenStore.fetchToken(PROCESSOR_NAME, fetchSegments[1]));
    }

    @Test
    void maxCapacityDefaultsToShortMax() {
        Assertions.assertEquals(32767, this.testSubject.maxCapacity());
    }

    @Test
    void maxCapacityReturnsConfiguredCapacity() {
        int i = 500;
        setTestSubject(createTestSubject(builder -> {
            return builder.maxSegmentProvider(str -> {
                return i;
            });
        }));
        Assertions.assertEquals(500, this.testSubject.maxCapacity());
    }

    @Test
    void processingStatusIsUpdatedWithTrackingToken() {
        this.testSubject.start();
        mockEventHandlerInvoker();
        Stream map = Stream.of((Object[]) new Integer[]{1, 2, 2, 4, 5}).map(num -> {
            return new GenericEventMessage(new MessageType("event"), num);
        });
        AsyncInMemoryStreamableEventSource asyncInMemoryStreamableEventSource = this.stubMessageSource;
        Objects.requireNonNull(asyncInMemoryStreamableEventSource);
        map.forEach((v1) -> {
            r1.publishMessage(v1);
        });
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> {
            this.testSubject.processingStatus().values().forEach(eventTrackerStatus -> {
                Assertions.assertEquals(5L, eventTrackerStatus.getCurrentPosition().orElse(0L));
            });
        });
    }

    private void mockEventHandlerInvoker() {
        Mockito.when(Boolean.valueOf(this.stubEventHandler.canHandleType((Class) Mockito.any()))).thenReturn(true);
        Mockito.when(Boolean.valueOf(this.stubEventHandler.canHandle((EventMessage) Mockito.any(), (ProcessingContext) Mockito.any(), (Segment) Mockito.any()))).thenAnswer(invocationOnMock -> {
            return Boolean.valueOf(((EventMessage) invocationOnMock.getArgument(0, EventMessage.class)).getPayload().equals(Integer.valueOf(((Segment) invocationOnMock.getArgument(2, Segment.class)).getSegmentId())));
        });
    }

    @Test
    void buildWithNullMessageSourceThrowsAxonConfigurationException() {
        PooledStreamingEventProcessor.Builder builder = PooledStreamingEventProcessor.builder();
        Assertions.assertThrows(AxonConfigurationException.class, () -> {
            builder.eventSource((StreamableEventSource) null);
        });
    }

    @Test
    void buildWithoutMessageSourceThrowsAxonConfigurationException() {
        PooledStreamingEventProcessor.Builder transactionManager = PooledStreamingEventProcessor.builder().tokenStore(new InMemoryTokenStore()).transactionManager(NoTransactionManager.INSTANCE);
        Objects.requireNonNull(transactionManager);
        Assertions.assertThrows(AxonConfigurationException.class, transactionManager::build);
    }

    @Test
    void buildWithNullTokenStoreThrowsAxonConfigurationException() {
        PooledStreamingEventProcessor.Builder builder = PooledStreamingEventProcessor.builder();
        Assertions.assertThrows(AxonConfigurationException.class, () -> {
            builder.tokenStore((TokenStore) null);
        });
    }

    @Test
    void buildWithoutTokenStoreThrowsAxonConfigurationException() {
        PooledStreamingEventProcessor.Builder transactionManager = PooledStreamingEventProcessor.builder().name(PROCESSOR_NAME).eventHandlerInvoker(this.stubEventHandler).eventSource(this.stubMessageSource).transactionManager(NoTransactionManager.INSTANCE);
        Objects.requireNonNull(transactionManager);
        Assertions.assertThrows(AxonConfigurationException.class, transactionManager::build);
    }

    @Test
    void buildWithNullTransactionManagerThrowsAxonConfigurationException() {
        PooledStreamingEventProcessor.Builder builder = PooledStreamingEventProcessor.builder();
        Assertions.assertThrows(AxonConfigurationException.class, () -> {
            builder.transactionManager((TransactionManager) null);
        });
    }

    @Test
    void buildWithoutTransactionManagerThrowsAxonConfigurationException() {
        PooledStreamingEventProcessor.Builder builder = PooledStreamingEventProcessor.builder().name(PROCESSOR_NAME).eventHandlerInvoker(this.stubEventHandler).eventSource(this.stubMessageSource).tokenStore(new InMemoryTokenStore());
        Objects.requireNonNull(builder);
        Assertions.assertThrows(AxonConfigurationException.class, builder::build);
    }

    @Test
    void buildWithNullCoordinatorExecutorThrowsAxonConfigurationException() {
        PooledStreamingEventProcessor.Builder builder = PooledStreamingEventProcessor.builder();
        Assertions.assertThrows(AxonConfigurationException.class, () -> {
            builder.coordinatorExecutor((ScheduledExecutorService) null);
        });
    }

    @Test
    void buildWithNullCoordinatorExecutorBuilderThrowsAxonConfigurationException() {
        PooledStreamingEventProcessor.Builder builder = PooledStreamingEventProcessor.builder();
        Assertions.assertThrows(AxonConfigurationException.class, () -> {
            builder.coordinatorExecutor((Function) null);
        });
    }

    @Test
    void buildWithoutCoordinatorExecutorThrowsAxonConfigurationException() {
        PooledStreamingEventProcessor.Builder transactionManager = PooledStreamingEventProcessor.builder().name(PROCESSOR_NAME).eventHandlerInvoker(this.stubEventHandler).eventSource(this.stubMessageSource).tokenStore(new InMemoryTokenStore()).transactionManager(NoTransactionManager.instance());
        Objects.requireNonNull(transactionManager);
        Assertions.assertThrows(AxonConfigurationException.class, transactionManager::build);
    }

    @Test
    void buildWithNullWorkerExecutorThrowsAxonConfigurationException() {
        PooledStreamingEventProcessor.Builder builder = PooledStreamingEventProcessor.builder();
        Assertions.assertThrows(AxonConfigurationException.class, () -> {
            builder.workerExecutor((ScheduledExecutorService) null);
        });
    }

    @Test
    void buildWithNullWorkerExecutorBuilderThrowsAxonConfigurationException() {
        PooledStreamingEventProcessor.Builder builder = PooledStreamingEventProcessor.builder();
        Assertions.assertThrows(AxonConfigurationException.class, () -> {
            builder.workerExecutor((Function) null);
        });
    }

    @Test
    void buildWithoutWorkerExecutorThrowsAxonConfigurationException() {
        PooledStreamingEventProcessor.Builder coordinatorExecutor = PooledStreamingEventProcessor.builder().name(PROCESSOR_NAME).eventHandlerInvoker(this.stubEventHandler).eventSource(this.stubMessageSource).tokenStore(new InMemoryTokenStore()).transactionManager(NoTransactionManager.instance()).coordinatorExecutor(this.coordinatorExecutor);
        Objects.requireNonNull(coordinatorExecutor);
        Assertions.assertThrows(AxonConfigurationException.class, coordinatorExecutor::build);
    }

    @Test
    void buildWithZeroOrNegativeInitialSegmentCountThrowsAxonConfigurationException() {
        PooledStreamingEventProcessor.Builder builder = PooledStreamingEventProcessor.builder();
        Assertions.assertThrows(AxonConfigurationException.class, () -> {
            builder.initialSegmentCount(0);
        });
        Assertions.assertThrows(AxonConfigurationException.class, () -> {
            builder.initialSegmentCount(-1);
        });
    }

    @Test
    void buildWithNullInitialTokenThrowsAxonConfigurationException() {
        PooledStreamingEventProcessor.Builder builder = PooledStreamingEventProcessor.builder();
        Assertions.assertThrows(AxonConfigurationException.class, () -> {
            builder.initialToken((Function) null);
        });
    }

    @Test
    void buildWithZeroOrNegativeTokenClaimIntervalThrowsAxonConfigurationException() {
        PooledStreamingEventProcessor.Builder builder = PooledStreamingEventProcessor.builder();
        Assertions.assertThrows(AxonConfigurationException.class, () -> {
            builder.tokenClaimInterval(0L);
        });
        Assertions.assertThrows(AxonConfigurationException.class, () -> {
            builder.tokenClaimInterval(-1L);
        });
    }

    @Test
    void buildWithZeroOrNegativeMaxCapacityThrowsAxonConfigurationException() {
        PooledStreamingEventProcessor.Builder builder = PooledStreamingEventProcessor.builder();
        Assertions.assertThrows(AxonConfigurationException.class, () -> {
            builder.maxSegmentProvider(str -> {
                return 0;
            });
        });
        Assertions.assertThrows(AxonConfigurationException.class, () -> {
            builder.maxSegmentProvider(str -> {
                return -1;
            });
        });
    }

    @Test
    void buildWithZeroOrNegativeClaimExtensionThresholdThrowsAxonConfigurationException() {
        PooledStreamingEventProcessor.Builder builder = PooledStreamingEventProcessor.builder();
        Assertions.assertThrows(AxonConfigurationException.class, () -> {
            builder.claimExtensionThreshold(0L);
        });
        Assertions.assertThrows(AxonConfigurationException.class, () -> {
            builder.claimExtensionThreshold(-1L);
        });
    }

    @Test
    void buildWithZeroOrNegativeBatchSizeThrowsAxonConfigurationException() {
        PooledStreamingEventProcessor.Builder builder = PooledStreamingEventProcessor.builder();
        Assertions.assertThrows(AxonConfigurationException.class, () -> {
            builder.batchSize(0);
        });
        Assertions.assertThrows(AxonConfigurationException.class, () -> {
            builder.batchSize(-1);
        });
    }

    @Test
    void isReplaying() {
        mockEventHandlerInvoker();
        Mockito.when(Boolean.valueOf(this.stubEventHandler.supportsReset())).thenReturn(true);
        setTestSubject(createTestSubject(builder -> {
            return builder.initialSegmentCount(1);
        }));
        List<EventMessage<Integer>> createEvents = createEvents(100);
        this.testSubject.start();
        AsyncInMemoryStreamableEventSource asyncInMemoryStreamableEventSource = this.stubMessageSource;
        Objects.requireNonNull(asyncInMemoryStreamableEventSource);
        createEvents.forEach(asyncInMemoryStreamableEventSource::publishMessage);
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> {
            Assertions.assertEquals(1, this.testSubject.processingStatus().size());
            Assertions.assertTrue(((EventTrackerStatus) this.testSubject.processingStatus().get(0)).isCaughtUp());
            Assertions.assertFalse(((EventTrackerStatus) this.testSubject.processingStatus().get(0)).isReplaying());
            Assertions.assertFalse(this.testSubject.isReplaying());
        });
        this.testSubject.shutDown();
        this.testSubject.resetTokens(trackingTokenSource -> {
            return trackingTokenSource.latestToken();
        });
        this.testSubject.start();
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> {
            Assertions.assertEquals(1, this.testSubject.processingStatus().size());
            Assertions.assertTrue(((EventTrackerStatus) this.testSubject.processingStatus().get(0)).isCaughtUp());
            Assertions.assertTrue(((EventTrackerStatus) this.testSubject.processingStatus().get(0)).isReplaying());
            Assertions.assertFalse(this.testSubject.isReplaying());
        });
    }

    @Test
    void isCaughtUpWhenDoneProcessing() throws Exception {
        mockSlowEventHandler();
        setTestSubject(createTestSubject(builder -> {
            return builder.initialSegmentCount(1);
        }));
        List<EventMessage<Integer>> createEvents = createEvents(3);
        AsyncInMemoryStreamableEventSource asyncInMemoryStreamableEventSource = this.stubMessageSource;
        Objects.requireNonNull(asyncInMemoryStreamableEventSource);
        createEvents.forEach(asyncInMemoryStreamableEventSource::publishMessage);
        this.testSubject.start();
        AtomicReference atomicReference = new AtomicReference(null);
        AssertUtils.assertWithin(5, TimeUnit.SECONDS, () -> {
            Assertions.assertEquals(1, this.testSubject.processingStatus().size());
            atomicReference.compareAndSet(null, Instant.now());
        });
        AssertUtils.assertWithin(5, TimeUnit.SECONDS, () -> {
            Assertions.assertTrue(((EventTrackerStatus) this.testSubject.processingStatus().get(0)).isCaughtUp());
        });
        Assertions.assertTrue(Duration.between((Temporal) atomicReference.get(), Instant.now()).getSeconds() >= 2);
    }

    @Test
    void existingEventsBeforeProcessorStartAreConsideredReplayed() throws Exception {
        setTestSubject(createTestSubject(builder -> {
            return builder.initialSegmentCount(1);
        }));
        final CountDownLatch countDownLatch = new CountDownLatch(3);
        this.testSubject.registerHandlerInterceptor(new MessageHandlerInterceptor<EventMessage<?>>(this) { // from class: org.axonframework.eventhandling.pooled.PooledStreamingEventProcessorTest.1
            public Object handle(@Nonnull LegacyUnitOfWork<? extends EventMessage<?>> legacyUnitOfWork, @Nonnull ProcessingContext processingContext, @Nonnull InterceptorChain interceptorChain) throws Exception {
                CountDownLatch countDownLatch2 = countDownLatch;
                legacyUnitOfWork.onCleanup(legacyUnitOfWork2 -> {
                    countDownLatch2.countDown();
                });
                return interceptorChain.proceedSync(processingContext);
            }

            public <M extends EventMessage<?>, R extends Message<?>> MessageStream<R> interceptOnHandle(@Nonnull M m, @Nonnull ProcessingContext processingContext, @Nonnull InterceptorChain<M, R> interceptorChain) {
                CountDownLatch countDownLatch2 = countDownLatch;
                processingContext.doFinally(processingContext2 -> {
                    countDownLatch2.countDown();
                });
                return interceptorChain.proceed(m, processingContext);
            }

            public /* bridge */ /* synthetic */ MessageStream interceptOnHandle(@Nonnull Message message, @Nonnull ProcessingContext processingContext, @Nonnull InterceptorChain interceptorChain) {
                return interceptOnHandle((AnonymousClass1) message, processingContext, (InterceptorChain<AnonymousClass1, R>) interceptorChain);
            }
        });
        List<EventMessage<Integer>> createEvents = createEvents(3);
        AsyncInMemoryStreamableEventSource asyncInMemoryStreamableEventSource = this.stubMessageSource;
        Objects.requireNonNull(asyncInMemoryStreamableEventSource);
        createEvents.forEach(asyncInMemoryStreamableEventSource::publishMessage);
        this.testSubject.start();
        Assertions.assertTrue(countDownLatch.await(5L, TimeUnit.SECONDS), "Expected Unit of Work to have reached clean up phase");
        TrackingToken fetchToken = this.tokenStore.fetchToken(this.testSubject.getName(), 0);
        Assertions.assertTrue(ReplayToken.isReplay(fetchToken), "Not a replay token: " + String.valueOf(fetchToken));
    }

    @Test
    void eventsPublishedAfterProcessorStartAreNotConsideredReplayed() throws Exception {
        setTestSubject(createTestSubject(builder -> {
            return builder.initialSegmentCount(1);
        }));
        final CountDownLatch countDownLatch = new CountDownLatch(3);
        this.testSubject.registerHandlerInterceptor(new MessageHandlerInterceptor<EventMessage<?>>(this) { // from class: org.axonframework.eventhandling.pooled.PooledStreamingEventProcessorTest.2
            public Object handle(@Nonnull LegacyUnitOfWork<? extends EventMessage<?>> legacyUnitOfWork, @Nonnull ProcessingContext processingContext, @Nonnull InterceptorChain interceptorChain) throws Exception {
                CountDownLatch countDownLatch2 = countDownLatch;
                legacyUnitOfWork.onCleanup(legacyUnitOfWork2 -> {
                    countDownLatch2.countDown();
                });
                return interceptorChain.proceedSync(processingContext);
            }

            public <M extends EventMessage<?>, R extends Message<?>> MessageStream<R> interceptOnHandle(@Nonnull M m, @Nonnull ProcessingContext processingContext, @Nonnull InterceptorChain<M, R> interceptorChain) {
                CountDownLatch countDownLatch2 = countDownLatch;
                processingContext.doFinally(processingContext2 -> {
                    countDownLatch2.countDown();
                });
                return interceptorChain.proceed(m, processingContext);
            }

            public /* bridge */ /* synthetic */ MessageStream interceptOnHandle(@Nonnull Message message, @Nonnull ProcessingContext processingContext, @Nonnull InterceptorChain interceptorChain) {
                return interceptOnHandle((AnonymousClass2) message, processingContext, (InterceptorChain<AnonymousClass2, R>) interceptorChain);
            }
        });
        this.stubMessageSource.publishMessage(EventTestUtils.asEventMessage(0));
        this.stubMessageSource.publishMessage(EventTestUtils.asEventMessage(1));
        this.testSubject.start();
        this.stubMessageSource.publishMessage(EventTestUtils.asEventMessage(2));
        Assertions.assertTrue(countDownLatch.await(5L, TimeUnit.SECONDS), "Expected Unit of Work to have reached clean up phase");
        TrackingToken fetchToken = this.tokenStore.fetchToken(this.testSubject.getName(), 0);
        Assertions.assertFalse(ReplayToken.isReplay(fetchToken), "Not a replay token: " + String.valueOf(fetchToken));
    }

    private void mockSlowEventHandler() throws Exception {
        ((EventHandlerInvoker) Mockito.doAnswer(invocationOnMock -> {
            Thread.sleep(1000L);
            return null;
        }).when(this.stubEventHandler)).handle((EventMessage) Mockito.any(), (ProcessingContext) Mockito.any(), (Segment) Mockito.any());
    }

    @Test
    void coordinatorExtendsClaimsEarlierForBusyWorkPackages() throws Exception {
        setTestSubject(createTestSubject(builder -> {
            return builder.initialSegmentCount(1).enableCoordinatorClaimExtension();
        }));
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        mockEventHandlerInvoker();
        ((EventHandlerInvoker) Mockito.doAnswer(invocationOnMock -> {
            atomicBoolean.set(true);
            return Boolean.valueOf(countDownLatch.await(5L, TimeUnit.SECONDS));
        }).when(this.stubEventHandler)).handle((EventMessage) Mockito.any(), (ProcessingContext) Mockito.any(), (Segment) Mockito.any());
        List<EventMessage<Integer>> createEvents = createEvents(42);
        AsyncInMemoryStreamableEventSource asyncInMemoryStreamableEventSource = this.stubMessageSource;
        Objects.requireNonNull(asyncInMemoryStreamableEventSource);
        createEvents.forEach(asyncInMemoryStreamableEventSource::publishMessage);
        this.testSubject.start();
        ConditionFactory atMost = Awaitility.await().pollDelay(Duration.ofMillis(50L)).atMost(Duration.ofSeconds(5L));
        Objects.requireNonNull(atomicBoolean);
        atMost.until(atomicBoolean::get);
        ((InMemoryTokenStore) Mockito.verify(this.tokenStore, Mockito.timeout(5000L))).extendClaim(PROCESSOR_NAME, 0);
        ((InMemoryTokenStore) Mockito.verify(this.tokenStore, Mockito.never())).storeToken((TrackingToken) Mockito.any(), (String) Mockito.eq(PROCESSOR_NAME), Mockito.eq(0));
        countDownLatch.countDown();
        Awaitility.await().pollDelay(Duration.ofMillis(50L)).atMost(Duration.ofSeconds(5L)).until(() -> {
            return Boolean.valueOf(((EventTrackerStatus) this.testSubject.processingStatus().get(0)).isCaughtUp());
        });
        ((InMemoryTokenStore) Mockito.verify(this.tokenStore, Mockito.timeout(5000L).atLeastOnce())).storeToken((TrackingToken) Mockito.any(), (String) Mockito.eq(PROCESSOR_NAME), Mockito.eq(0));
    }

    @Test
    void coordinatorExtendingClaimFailsAndAbortsWorkPackage() throws Exception {
        setTestSubject(createTestSubject(builder -> {
            return builder.initialSegmentCount(1).enableCoordinatorClaimExtension();
        }));
        String str = "bummer";
        ((InMemoryTokenStore) Mockito.doThrow(new Throwable[]{new RuntimeException("bummer")}).when(this.tokenStore)).extendClaim(PROCESSOR_NAME, 0);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        mockEventHandlerInvoker();
        ((EventHandlerInvoker) Mockito.doAnswer(invocationOnMock -> {
            atomicBoolean.set(true);
            return Boolean.valueOf(countDownLatch.await(5L, TimeUnit.SECONDS));
        }).when(this.stubEventHandler)).handle((EventMessage) Mockito.any(), (ProcessingContext) Mockito.any(), (Segment) Mockito.any());
        List<EventMessage<Integer>> createEvents = createEvents(42);
        AsyncInMemoryStreamableEventSource asyncInMemoryStreamableEventSource = this.stubMessageSource;
        Objects.requireNonNull(asyncInMemoryStreamableEventSource);
        createEvents.forEach(asyncInMemoryStreamableEventSource::publishMessage);
        this.testSubject.start();
        ConditionFactory atMost = Awaitility.await().pollDelay(Duration.ofMillis(50L)).atMost(Duration.ofSeconds(5L));
        Objects.requireNonNull(atomicBoolean);
        atMost.until(atomicBoolean::get);
        ((InMemoryTokenStore) Mockito.verify(this.tokenStore, Mockito.timeout(5000L))).extendClaim(PROCESSOR_NAME, 0);
        ((InMemoryTokenStore) Mockito.verify(this.tokenStore, Mockito.never())).storeToken((TrackingToken) Mockito.any(), (String) Mockito.eq(PROCESSOR_NAME), Mockito.eq(0));
        Awaitility.await().pollDelay(Duration.ofMillis(50L)).atMost(Duration.ofSeconds(5L)).until(() -> {
            return Boolean.valueOf(((EventTrackerStatus) this.testSubject.processingStatus().get(0)).getError().getMessage().equals(str));
        });
        countDownLatch.countDown();
    }
}
