package org.axonframework.integrationtests.eventhandling;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionFactory;
import org.axonframework.common.transaction.NoTransactionManager;
import org.axonframework.common.transaction.Transaction;
import org.axonframework.common.transaction.TransactionManager;
import org.axonframework.eventhandling.DefaultEventBusSpanFactory;
import org.axonframework.eventhandling.DefaultEventProcessorSpanFactory;
import org.axonframework.eventhandling.DomainEventMessage;
import org.axonframework.eventhandling.EventHandlerInvoker;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.EventMessageHandler;
import org.axonframework.eventhandling.EventTrackerStatus;
import org.axonframework.eventhandling.EventTrackerStatusChangeListener;
import org.axonframework.eventhandling.EventUtils;
import org.axonframework.eventhandling.GapAwareTrackingToken;
import org.axonframework.eventhandling.GenericTrackedEventMessage;
import org.axonframework.eventhandling.GlobalSequenceTrackingToken;
import org.axonframework.eventhandling.MultiEventHandlerInvoker;
import org.axonframework.eventhandling.PropagatingErrorHandler;
import org.axonframework.eventhandling.ReplayToken;
import org.axonframework.eventhandling.Segment;
import org.axonframework.eventhandling.SimpleEventHandlerInvoker;
import org.axonframework.eventhandling.TrackedEventMessage;
import org.axonframework.eventhandling.TrackingEventProcessor;
import org.axonframework.eventhandling.TrackingEventProcessorConfiguration;
import org.axonframework.eventhandling.TrackingEventStream;
import org.axonframework.eventhandling.TrackingToken;
import org.axonframework.eventhandling.tokenstore.TokenStore;
import org.axonframework.eventhandling.tokenstore.UnableToClaimTokenException;
import org.axonframework.eventhandling.tokenstore.inmemory.InMemoryTokenStore;
import org.axonframework.eventsourcing.eventstore.EmbeddedEventStore;
import org.axonframework.eventsourcing.eventstore.SequenceEventStorageEngine;
import org.axonframework.eventsourcing.eventstore.inmemory.InMemoryEventStorageEngine;
import org.axonframework.integrationtests.utils.AssertUtils;
import org.axonframework.integrationtests.utils.EventTestUtils;
import org.axonframework.integrationtests.utils.MockException;
import org.axonframework.messaging.Message;
import org.axonframework.messaging.StreamableMessageSource;
import org.axonframework.messaging.unitofwork.CurrentUnitOfWork;
import org.axonframework.serialization.SerializationException;
import org.axonframework.tracing.TestSpanFactory;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.mockito.InOrder;
import org.mockito.Mockito;
import org.mockito.stubbing.OngoingStubbing;
import org.springframework.test.annotation.DirtiesContext;

/* loaded from: input_file:org/axonframework/integrationtests/eventhandling/TrackingEventProcessorTest.class */
class TrackingEventProcessorTest {
    private static final Object NO_RESET_PAYLOAD = null;
    private TrackingEventProcessor testSubject;
    private EmbeddedEventStore eventBus;
    private TokenStore tokenStore;
    private EventHandlerInvoker eventHandlerInvoker;
    private EventMessageHandler mockHandler;
    private List<Long> sleepInstructions;
    private TransactionManager mockTransactionManager;
    private Transaction mockTransaction;
    private TestSpanFactory spanFactory;

    /* loaded from: input_file:org/axonframework/integrationtests/eventhandling/TrackingEventProcessorTest$MyResetContext.class */
    private static class MyResetContext {
        private String property;

        private MyResetContext(String str) {
            this.property = str;
        }

        public String getProperty() {
            return this.property;
        }

        public void setProperty(String str) {
            this.property = str;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            return Objects.equals(this.property, ((MyResetContext) obj).property);
        }

        public int hashCode() {
            if (this.property != null) {
                return this.property.hashCode();
            }
            return 0;
        }
    }

    /* loaded from: input_file:org/axonframework/integrationtests/eventhandling/TrackingEventProcessorTest$StubTrackingEventStream.class */
    private static class StubTrackingEventStream implements TrackingEventStream {
        private final Queue<TrackedEventMessage<?>> eventMessages;

        public StubTrackingEventStream(long... jArr) {
            TrackingToken newInstance = GapAwareTrackingToken.newInstance(-1L, Collections.emptySortedSet());
            this.eventMessages = new LinkedList();
            for (long j : jArr) {
                Long valueOf = Long.valueOf(j);
                newInstance = newInstance.advanceTo(valueOf.longValue(), 1000);
                this.eventMessages.add(new GenericTrackedEventMessage(newInstance, EventTestUtils.createEvent(valueOf.longValue())));
            }
        }

        public Optional<TrackedEventMessage<?>> peek() {
            return this.eventMessages.isEmpty() ? Optional.empty() : Optional.of(this.eventMessages.peek());
        }

        public boolean hasNextAvailable(int i, TimeUnit timeUnit) {
            return !this.eventMessages.isEmpty();
        }

        /* renamed from: nextAvailable, reason: merged with bridge method [inline-methods] */
        public TrackedEventMessage<?> m8nextAvailable() {
            return this.eventMessages.poll();
        }

        public void close() {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/axonframework/integrationtests/eventhandling/TrackingEventProcessorTest$TestError.class */
    public static class TestError extends Error {
        private static final long serialVersionUID = -5579826202840099704L;

        private TestError() {
        }
    }

    TrackingEventProcessorTest() {
    }

    private static TrackingEventStream trackingEventStreamOf(Iterator<TrackedEventMessage<?>> it) {
        return trackingEventStreamOf(it, cls -> {
        });
    }

    private static TrackingEventStream trackingEventStreamOf(final Iterator<TrackedEventMessage<?>> it, final Consumer<Class<?>> consumer) {
        return new TrackingEventStream() { // from class: org.axonframework.integrationtests.eventhandling.TrackingEventProcessorTest.1
            private boolean hasPeeked;
            private TrackedEventMessage<?> peekEvent;

            public Optional<TrackedEventMessage<?>> peek() {
                if (!this.hasPeeked) {
                    if (!hasNextAvailable()) {
                        return Optional.empty();
                    }
                    this.peekEvent = (TrackedEventMessage) it.next();
                    this.hasPeeked = true;
                }
                return Optional.of(this.peekEvent);
            }

            public boolean hasNextAvailable(int i, TimeUnit timeUnit) {
                if (i > 0) {
                    Thread.yield();
                }
                return this.hasPeeked || it.hasNext();
            }

            /* renamed from: nextAvailable, reason: merged with bridge method [inline-methods] */
            public TrackedEventMessage<?> m5nextAvailable() {
                if (!this.hasPeeked) {
                    return (TrackedEventMessage) it.next();
                }
                TrackedEventMessage<?> trackedEventMessage = this.peekEvent;
                this.peekEvent = null;
                this.hasPeeked = false;
                return trackedEventMessage;
            }

            public void close() {
            }

            public void skipMessagesWithPayloadTypeOf(TrackedEventMessage<?> trackedEventMessage) {
                consumer.accept(trackedEventMessage.getPayloadType());
            }
        };
    }

    @BeforeEach
    void setUp() {
        this.spanFactory = new TestSpanFactory();
        this.tokenStore = (TokenStore) Mockito.spy(new InMemoryTokenStore());
        this.mockHandler = (EventMessageHandler) Mockito.mock(EventMessageHandler.class);
        Mockito.when(Boolean.valueOf(this.mockHandler.canHandle((EventMessage) Mockito.any()))).thenReturn(true);
        Mockito.when(Boolean.valueOf(this.mockHandler.supportsReset())).thenReturn(true);
        this.eventHandlerInvoker = (EventHandlerInvoker) Mockito.spy(SimpleEventHandlerInvoker.builder().eventHandlers(new Object[]{this.mockHandler}).listenerInvocationErrorHandler(PropagatingErrorHandler.instance()).build());
        this.mockTransaction = (Transaction) Mockito.mock(Transaction.class);
        this.mockTransactionManager = (TransactionManager) Mockito.mock(TransactionManager.class);
        Mockito.when(this.mockTransactionManager.startTransaction()).thenReturn(this.mockTransaction);
        Mockito.when(this.mockTransactionManager.fetchInTransaction((Supplier) Mockito.any(Supplier.class))).thenAnswer(invocationOnMock -> {
            return ((Supplier) invocationOnMock.getArgument(0)).get();
        });
        ((TransactionManager) Mockito.doAnswer(invocationOnMock2 -> {
            ((Runnable) invocationOnMock2.getArgument(0)).run();
            return null;
        }).when(this.mockTransactionManager)).executeInTransaction((Runnable) Mockito.any(Runnable.class));
        this.eventBus = EmbeddedEventStore.builder().storageEngine(new InMemoryEventStorageEngine()).spanFactory(DefaultEventBusSpanFactory.builder().spanFactory(this.spanFactory).build()).build();
        this.sleepInstructions = new CopyOnWriteArrayList();
        initDefaultProcessor();
    }

    private void initDefaultProcessor() {
        initProcessor(TrackingEventProcessorConfiguration.forSingleThreadedProcessing().andEventAvailabilityTimeout(100L, TimeUnit.MILLISECONDS));
    }

    private void initProcessor(TrackingEventProcessorConfiguration trackingEventProcessorConfiguration) {
        initProcessor(trackingEventProcessorConfiguration, UnaryOperator.identity());
    }

    private void initProcessor(UnaryOperator<TrackingEventProcessor.Builder> unaryOperator) {
        initProcessor(TrackingEventProcessorConfiguration.forSingleThreadedProcessing().andEventAvailabilityTimeout(100L, TimeUnit.MILLISECONDS), unaryOperator);
    }

    private void initProcessor(TrackingEventProcessorConfiguration trackingEventProcessorConfiguration, UnaryOperator<TrackingEventProcessor.Builder> unaryOperator) {
        this.testSubject = new TrackingEventProcessor((TrackingEventProcessor.Builder) unaryOperator.apply(TrackingEventProcessor.builder().name("test").eventHandlerInvoker(this.eventHandlerInvoker).messageSource(this.eventBus).trackingEventProcessorConfiguration(trackingEventProcessorConfiguration).tokenStore(this.tokenStore).transactionManager(this.mockTransactionManager))) { // from class: org.axonframework.integrationtests.eventhandling.TrackingEventProcessorTest.2
            protected void doSleepFor(long j) {
                if (isRunning()) {
                    TrackingEventProcessorTest.this.sleepInstructions.add(Long.valueOf(j));
                    Thread.yield();
                }
            }

            protected void doSleepFor(long j, AtomicBoolean atomicBoolean) {
                if (isRunning()) {
                    TrackingEventProcessorTest.this.sleepInstructions.add(Long.valueOf(j));
                    Thread.yield();
                }
            }
        };
    }

    @AfterEach
    void tearDown() {
        this.testSubject.shutDown();
        this.eventBus.shutDown();
    }

    @Test
    void sequenceEventStorageReceivesEachEventOnlyOnce() throws Exception {
        InMemoryEventStorageEngine inMemoryEventStorageEngine = new InMemoryEventStorageEngine();
        InMemoryEventStorageEngine inMemoryEventStorageEngine2 = new InMemoryEventStorageEngine(2L);
        EmbeddedEventStore build = EmbeddedEventStore.builder().storageEngine(new SequenceEventStorageEngine(inMemoryEventStorageEngine, inMemoryEventStorageEngine2)).build();
        initProcessor(builder -> {
            return builder.messageSource(build);
        });
        inMemoryEventStorageEngine.appendEvents(new EventMessage[]{EventTestUtils.createEvent(EventTestUtils.AGGREGATE, 1L, "message1"), EventTestUtils.createEvent(EventTestUtils.AGGREGATE, 2L, "message2")});
        inMemoryEventStorageEngine2.appendEvents(new EventMessage[]{EventTestUtils.createEvent(EventTestUtils.AGGREGATE, 3L, "message3"), EventTestUtils.createEvent(EventTestUtils.AGGREGATE, 4L, "message4"), EventTestUtils.createEvent(EventTestUtils.AGGREGATE, 5L, "message5")});
        int i = 5;
        CountDownLatch countDownLatch = new CountDownLatch(5);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        ((EventMessageHandler) Mockito.doAnswer(invocationOnMock -> {
            int incrementAndGet = atomicInteger.incrementAndGet();
            countDownLatch.countDown();
            Assertions.assertTrue(incrementAndGet <= i);
            return null;
        }).when(this.mockHandler)).handle((EventMessage) Mockito.any());
        this.testSubject.start();
        Assertions.assertTrue(countDownLatch.await(5L, TimeUnit.SECONDS), "Expected Handler to have received 4 published events");
        Assertions.assertEquals(5, atomicInteger.get(), "Handler should only receive each event once");
    }

    @Test
    void sequenceMaxCapacity() {
        this.testSubject.start();
        Assertions.assertEquals(1, this.testSubject.maxCapacity());
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> {
            Assertions.assertEquals(0, this.testSubject.availableProcessorThreads());
        });
        Assertions.assertEquals(1, this.testSubject.activeProcessorThreads());
    }

    @Test
    void publishedEventsGetPassedToHandler() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(2);
        ((EventMessageHandler) Mockito.doAnswer(invocationOnMock -> {
            countDownLatch.countDown();
            return null;
        }).when(this.mockHandler)).handle((EventMessage) Mockito.any());
        this.testSubject.start();
        Thread.sleep(200L);
        this.eventBus.publish(EventTestUtils.createEvents(2));
        Assertions.assertTrue(countDownLatch.await(5L, TimeUnit.SECONDS), "Expected Handler to have received 2 published events");
    }

    @Test
    void handlersAreTraced() throws Exception {
        initProcessor(builder -> {
            return builder.spanFactory(DefaultEventProcessorSpanFactory.builder().spanFactory(this.spanFactory).build());
        });
        CountDownLatch countDownLatch = new CountDownLatch(2);
        ((EventMessageHandler) Mockito.doAnswer(invocationOnMock -> {
            Message message = (Message) invocationOnMock.getArgument(0, Message.class);
            this.spanFactory.verifySpanActive("StreamingEventProcessor.batch");
            this.spanFactory.verifySpanActive("StreamingEventProcessor.process", message);
            countDownLatch.countDown();
            return null;
        }).when(this.mockHandler)).handle((EventMessage) Mockito.any());
        this.testSubject.start();
        Thread.sleep(200L);
        this.eventBus.publish(EventTestUtils.createEvents(2));
        Assertions.assertTrue(countDownLatch.await(5L, TimeUnit.SECONDS), "Expected Handler to have received 2 published events");
        this.spanFactory.verifySpanCompleted("StreamingEventProcessor.process");
        this.spanFactory.verifySpanCompleted("StreamingEventProcessor.batch");
    }

    @Test
    void blacklist() {
        Mockito.when(Boolean.valueOf(this.mockHandler.canHandle((EventMessage) Mockito.any()))).thenReturn(false);
        Mockito.when(Boolean.valueOf(this.mockHandler.canHandleType(String.class))).thenReturn(false);
        HashSet hashSet = new HashSet();
        EmbeddedEventStore embeddedEventStore = (EmbeddedEventStore) Mockito.mock(EmbeddedEventStore.class);
        GlobalSequenceTrackingToken globalSequenceTrackingToken = new GlobalSequenceTrackingToken(0L);
        List list = (List) EventTestUtils.createEvents(2).stream().map(domainEventMessage -> {
            return EventUtils.asTrackedEventMessage(domainEventMessage, globalSequenceTrackingToken);
        }).collect(Collectors.toList());
        OngoingStubbing when = Mockito.when(embeddedEventStore.openStream((TrackingToken) null));
        Iterator it = list.iterator();
        Objects.requireNonNull(hashSet);
        when.thenReturn(trackingEventStreamOf(it, (v1) -> {
            r2.add(v1);
        }));
        this.testSubject = TrackingEventProcessor.builder().name("test").eventHandlerInvoker(this.eventHandlerInvoker).messageSource(embeddedEventStore).tokenStore(this.tokenStore).transactionManager(NoTransactionManager.INSTANCE).build();
        this.testSubject.start();
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> {
            Assertions.assertEquals(1, hashSet.size());
            Assertions.assertTrue(hashSet.contains(String.class));
        });
    }

    @Test
    void processorExposesErrorStateOnHandlerException() throws Exception {
        ((EventMessageHandler) Mockito.doReturn(Object.class).when(this.mockHandler)).getTargetType();
        AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        ((EventMessageHandler) Mockito.doAnswer(invocationOnMock -> {
            if (atomicBoolean.get()) {
                throw new MockException("Simulating issues");
            }
            return null;
        }).when(this.mockHandler)).handle((EventMessage) Mockito.any());
        int i = 0;
        this.testSubject.start();
        this.eventBus.publish(EventTestUtils.createEvents(2));
        AssertUtils.assertWithin(2, TimeUnit.SECONDS, () -> {
            EventTrackerStatus eventTrackerStatus = (EventTrackerStatus) this.testSubject.processingStatus().get(Integer.valueOf(i));
            Assertions.assertNotNull(eventTrackerStatus);
            Assertions.assertTrue(eventTrackerStatus.isErrorState());
            Assertions.assertEquals(MockException.class, eventTrackerStatus.getError().getClass());
        });
        atomicBoolean.set(false);
        AssertUtils.assertWithin(5, TimeUnit.SECONDS, () -> {
            EventTrackerStatus eventTrackerStatus = (EventTrackerStatus) this.testSubject.processingStatus().get(Integer.valueOf(i));
            Assertions.assertNotNull(eventTrackerStatus);
            Assertions.assertFalse(eventTrackerStatus.isErrorState());
            Assertions.assertNull(eventTrackerStatus.getError());
        });
    }

    @Test
    void handlerIsInvokedInTransactionScope() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        AtomicInteger atomicInteger = new AtomicInteger();
        AtomicInteger atomicInteger2 = new AtomicInteger();
        Mockito.when(this.mockTransactionManager.startTransaction()).thenAnswer(invocationOnMock -> {
            atomicInteger.incrementAndGet();
            return this.mockTransaction;
        });
        ((Transaction) Mockito.doAnswer(invocationOnMock2 -> {
            return Integer.valueOf(atomicInteger.decrementAndGet());
        }).when(this.mockTransaction)).rollback();
        ((Transaction) Mockito.doAnswer(invocationOnMock3 -> {
            return Integer.valueOf(atomicInteger.decrementAndGet());
        }).when(this.mockTransaction)).commit();
        ((EventMessageHandler) Mockito.doAnswer(invocationOnMock4 -> {
            atomicInteger2.set(atomicInteger.get());
            countDownLatch.countDown();
            return null;
        }).when(this.mockHandler)).handle((EventMessage) Mockito.any());
        this.testSubject.start();
        Thread.sleep(200L);
        this.eventBus.publish(EventTestUtils.createEvents(2));
        Assertions.assertTrue(countDownLatch.await(5L, TimeUnit.SECONDS), "Expected Handler to have received 2 published events");
        Assertions.assertEquals(1, atomicInteger2.get());
    }

    @Test
    void processorStopsOnNonTransientExceptionWhenLoadingToken() {
        ((TokenStore) Mockito.doThrow(new Throwable[]{new SerializationException("Faking a serialization issue")}).when(this.tokenStore)).fetchToken("test", 0);
        this.testSubject.start();
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> {
            Assertions.assertFalse(this.testSubject.isRunning(), "Expected processor to have stopped");
        });
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> {
            Assertions.assertTrue(this.testSubject.isError(), "Expected processor to set the error flag");
        });
        Assertions.assertEquals(Collections.emptyList(), this.sleepInstructions);
    }

    @Test
    void processorRetriesOnTransientExceptionWhenLoadingToken() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        ((EventMessageHandler) Mockito.doAnswer(invocationOnMock -> {
            countDownLatch.countDown();
            return null;
        }).when(this.mockHandler)).handle((EventMessage) Mockito.any());
        ((TokenStore) Mockito.doThrow(new Throwable[]{new RuntimeException("Faking a recoverable issue")}).doCallRealMethod().when(this.tokenStore)).fetchToken("test", 0);
        this.testSubject.start();
        this.eventBus.publish(new EventMessage[]{EventTestUtils.createEvent()});
        Assertions.assertTrue(countDownLatch.await(5L, TimeUnit.SECONDS), "Expected Handler to have received published event");
        Assertions.assertTrue(this.testSubject.isRunning());
        Assertions.assertFalse(this.testSubject.isError());
        Assertions.assertEquals(Collections.singletonList(5000L), this.sleepInstructions);
    }

    @Test
    void tokenIsStoredWhenEventIsRead() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this.testSubject.registerHandlerInterceptor((unitOfWork, interceptorChain) -> {
            unitOfWork.onCleanup(unitOfWork -> {
                countDownLatch.countDown();
            });
            return interceptorChain.proceed();
        });
        this.eventBus.publish(new EventMessage[]{EventTestUtils.createEvent()});
        this.testSubject.start();
        Assertions.assertTrue(countDownLatch.await(5L, TimeUnit.SECONDS), "Expected Unit of Work to have reached clean up phase");
        ((TokenStore) Mockito.verify(this.tokenStore)).storeToken((TrackingToken) Mockito.any(), (String) Mockito.eq(this.testSubject.getName()), Mockito.eq(0));
        Assertions.assertNotNull(this.tokenStore.fetchToken(this.testSubject.getName(), 0));
    }

    @Test
    void segmentReleasedIsInvokedOnInvokerWhenSegmentIsReleased() {
        this.eventBus.publish(new EventMessage[]{EventTestUtils.createEvent()});
        this.testSubject.start();
        Awaitility.await().atMost(Duration.ofSeconds(5L)).untilAsserted(() -> {
            ((TokenStore) Mockito.verify(this.tokenStore)).storeToken((TrackingToken) Mockito.any(), (String) Mockito.eq(this.testSubject.getName()), Mockito.eq(0));
        });
        this.testSubject.releaseSegment(0);
        Awaitility.await().atMost(Duration.ofSeconds(5L)).untilAsserted(() -> {
            ((EventHandlerInvoker) Mockito.verify(this.eventHandlerInvoker, Mockito.times(1))).segmentReleased((Segment) Mockito.any(Segment.class));
        });
    }

    @Test
    void tokenIsExtendedAtStartAndStoredAtEndOfEventBatch_WithStoringTokensAfterProcessingSetting() throws Exception {
        initProcessor(TrackingEventProcessorConfiguration.forSingleThreadedProcessing().andBatchSize(100), (v0) -> {
            return v0.storingTokensAfterProcessing();
        });
        CountDownLatch countDownLatch = new CountDownLatch(2);
        AtomicInteger atomicInteger = new AtomicInteger();
        ((TokenStore) Mockito.doAnswer(invocationOnMock -> {
            if (CurrentUnitOfWork.isStarted()) {
                atomicInteger.incrementAndGet();
            }
            return invocationOnMock.callRealMethod();
        }).when(this.tokenStore)).extendClaim(Mockito.anyString(), Mockito.anyInt());
        this.testSubject.registerHandlerInterceptor((unitOfWork, interceptorChain) -> {
            unitOfWork.onCleanup(unitOfWork -> {
                countDownLatch.countDown();
            });
            return interceptorChain.proceed();
        });
        this.testSubject.start();
        this.eventBus.publish(EventTestUtils.createEvents(2));
        Assertions.assertTrue(countDownLatch.await(5L, TimeUnit.SECONDS), "Expected Unit of Work to have reached clean up phase for 2 messages");
        InOrder inOrder = Mockito.inOrder(new Object[]{this.tokenStore});
        ((TokenStore) inOrder.verify(this.tokenStore, Mockito.times(1))).extendClaim((String) Mockito.eq(this.testSubject.getName()), Mockito.anyInt());
        ((TokenStore) inOrder.verify(this.tokenStore, Mockito.atLeastOnce())).storeToken((TrackingToken) Mockito.any(), (String) Mockito.any(), Mockito.anyInt());
        Assertions.assertNotNull(this.tokenStore.fetchToken(this.testSubject.getName(), 0));
        Assertions.assertEquals(1, atomicInteger.get(), "Unexpected number of invocations of token extension in unit of work");
    }

    @Test
    void tokenStoredAtEndOfEventBatchAndNotExtendedWhenUsingANoTransactionManager() throws Exception {
        this.testSubject = TrackingEventProcessor.builder().name("test").eventHandlerInvoker(this.eventHandlerInvoker).trackingEventProcessorConfiguration(TrackingEventProcessorConfiguration.forSingleThreadedProcessing().andBatchSize(100)).messageSource(this.eventBus).tokenStore(this.tokenStore).transactionManager(NoTransactionManager.INSTANCE).build();
        CountDownLatch countDownLatch = new CountDownLatch(2);
        AtomicInteger atomicInteger = new AtomicInteger();
        ((TokenStore) Mockito.doAnswer(invocationOnMock -> {
            if (CurrentUnitOfWork.isStarted()) {
                atomicInteger.incrementAndGet();
            }
            return invocationOnMock.callRealMethod();
        }).when(this.tokenStore)).extendClaim(Mockito.anyString(), Mockito.anyInt());
        this.testSubject.registerHandlerInterceptor((unitOfWork, interceptorChain) -> {
            unitOfWork.onCleanup(unitOfWork -> {
                countDownLatch.countDown();
            });
            return interceptorChain.proceed();
        });
        this.testSubject.start();
        this.eventBus.publish(EventTestUtils.createEvents(2));
        Assertions.assertTrue(countDownLatch.await(5L, TimeUnit.SECONDS), "Expected Unit of Work to have reached clean up phase for 2 messages");
        ((TokenStore) Mockito.verify(this.tokenStore, Mockito.atLeastOnce())).storeToken((TrackingToken) Mockito.any(), (String) Mockito.any(), Mockito.anyInt());
        Assertions.assertNotNull(this.tokenStore.fetchToken(this.testSubject.getName(), 0));
        Assertions.assertEquals(1, atomicInteger.get(), "Unexpected number of invocations of token extension in unit of work");
    }

    @Test
    void tokenStoredAtEndOfEventBatchAndNotExtendedWhenTransactionManagerIsConfigured() throws Exception {
        this.testSubject = TrackingEventProcessor.builder().name("test").eventHandlerInvoker(this.eventHandlerInvoker).trackingEventProcessorConfiguration(TrackingEventProcessorConfiguration.forSingleThreadedProcessing().andBatchSize(100)).messageSource(this.eventBus).tokenStore(this.tokenStore).transactionManager(() -> {
            return (Transaction) Mockito.mock(Transaction.class);
        }).build();
        CountDownLatch countDownLatch = new CountDownLatch(2);
        AtomicInteger atomicInteger = new AtomicInteger();
        ((TokenStore) Mockito.doAnswer(invocationOnMock -> {
            if (CurrentUnitOfWork.isStarted()) {
                atomicInteger.incrementAndGet();
                Assertions.fail("Did not expect an invocation in a Unit of Work");
            }
            return invocationOnMock.callRealMethod();
        }).when(this.tokenStore)).extendClaim(Mockito.anyString(), Mockito.anyInt());
        this.testSubject.registerHandlerInterceptor((unitOfWork, interceptorChain) -> {
            unitOfWork.onCleanup(unitOfWork -> {
                countDownLatch.countDown();
            });
            return interceptorChain.proceed();
        });
        this.testSubject.start();
        this.eventBus.publish(EventTestUtils.createEvents(2));
        Assertions.assertTrue(countDownLatch.await(5L, TimeUnit.SECONDS), "Expected Unit of Work to have reached clean up phase for 2 messages");
        ((TokenStore) Mockito.verify(this.tokenStore, Mockito.times(1))).storeToken((TrackingToken) Mockito.any(), (String) Mockito.any(), Mockito.anyInt());
        Assertions.assertNotNull(this.tokenStore.fetchToken(this.testSubject.getName(), 0));
        Assertions.assertEquals(0, atomicInteger.get(), "Unexpected number of invocations of token extension in unit of work");
    }

    @Test
    void tokenStoredAtEndOfEventBatchAndExtendedWhenTokenClaimIntervalExceeded() throws Exception {
        this.testSubject = TrackingEventProcessor.builder().name("test").trackingEventProcessorConfiguration(TrackingEventProcessorConfiguration.forSingleThreadedProcessing().andEventAvailabilityTimeout(10L, TimeUnit.MILLISECONDS)).eventHandlerInvoker(this.eventHandlerInvoker).messageSource(this.eventBus).tokenStore(this.tokenStore).transactionManager(NoTransactionManager.INSTANCE).build();
        CountDownLatch countDownLatch = new CountDownLatch(2);
        this.testSubject.registerHandlerInterceptor((unitOfWork, interceptorChain) -> {
            unitOfWork.onCleanup(unitOfWork -> {
                countDownLatch.countDown();
            });
            Thread.sleep(50L);
            return interceptorChain.proceed();
        });
        this.testSubject.start();
        this.eventBus.publish(EventTestUtils.createEvents(2));
        Assertions.assertTrue(countDownLatch.await(5L, TimeUnit.SECONDS), "Expected Unit of Work to have reached clean up phase for 2 messages");
        InOrder inOrder = Mockito.inOrder(new Object[]{this.tokenStore});
        ((TokenStore) inOrder.verify(this.tokenStore, Mockito.times(1))).storeToken((TrackingToken) Mockito.any(), (String) Mockito.any(), Mockito.anyInt());
        ((TokenStore) inOrder.verify(this.tokenStore, Mockito.times(1))).extendClaim((String) Mockito.eq(this.testSubject.getName()), Mockito.anyInt());
        Assertions.assertNotNull(this.tokenStore.fetchToken(this.testSubject.getName(), 0));
    }

    @Test
    void tokenIsNotStoredWhenUnitOfWorkIsRolledBack() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this.testSubject.registerHandlerInterceptor((unitOfWork, interceptorChain) -> {
            unitOfWork.onCommit(unitOfWork -> {
                throw new MockException();
            });
            return interceptorChain.proceed();
        });
        this.testSubject.registerHandlerInterceptor((unitOfWork2, interceptorChain2) -> {
            unitOfWork2.onCleanup(unitOfWork2 -> {
                countDownLatch.countDown();
            });
            return interceptorChain2.proceed();
        });
        this.testSubject.start();
        this.eventBus.publish(new EventMessage[]{EventTestUtils.createEvent()});
        Assertions.assertTrue(countDownLatch.await(5L, TimeUnit.SECONDS), "Expected Unit of Work to have reached clean up phase");
        MatcherAssert.assertThat(this.tokenStore.fetchToken(this.testSubject.getName(), 0), CoreMatchers.anyOf(CoreMatchers.nullValue(), CoreMatchers.equalTo(TrackingEventProcessorConfiguration.forSingleThreadedProcessing().getInitialTrackingToken().apply(this.eventBus))));
    }

    @Test
    void continueFromPreviousToken() throws Exception {
        this.tokenStore = new InMemoryTokenStore();
        this.eventBus.publish(EventTestUtils.createEvents(10));
        TrackedEventMessage trackedEventMessage = (TrackedEventMessage) this.eventBus.openStream((TrackingToken) null).nextAvailable();
        this.tokenStore.storeToken(trackedEventMessage.trackingToken(), this.testSubject.getName(), 0);
        Assertions.assertEquals(trackedEventMessage.trackingToken(), this.tokenStore.fetchToken(this.testSubject.getName(), 0));
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        CountDownLatch countDownLatch = new CountDownLatch(9);
        ((EventMessageHandler) Mockito.doAnswer(invocationOnMock -> {
            copyOnWriteArrayList.add((EventMessage) invocationOnMock.getArguments()[0]);
            countDownLatch.countDown();
            return null;
        }).when(this.mockHandler)).handle((EventMessage) Mockito.any());
        this.testSubject = TrackingEventProcessor.builder().name("test").eventHandlerInvoker(this.eventHandlerInvoker).messageSource(this.eventBus).tokenStore(this.tokenStore).transactionManager(NoTransactionManager.INSTANCE).build();
        this.testSubject.start();
        Assertions.assertTrue(countDownLatch.await(5L, TimeUnit.SECONDS), "Expected 9 invocations on Event Handler by now");
        Assertions.assertEquals(9, copyOnWriteArrayList.size());
    }

    @Timeout(10)
    @Test
    @DirtiesContext
    void continueAfterPause() throws Exception {
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        CountDownLatch countDownLatch = new CountDownLatch(2);
        ((EventMessageHandler) Mockito.doAnswer(invocationOnMock -> {
            copyOnWriteArrayList.add((EventMessage) invocationOnMock.getArguments()[0]);
            countDownLatch.countDown();
            return null;
        }).when(this.mockHandler)).handle((EventMessage) Mockito.any());
        this.testSubject.start();
        this.eventBus.publish(EventTestUtils.createEvents(2));
        Assertions.assertTrue(countDownLatch.await(5L, TimeUnit.SECONDS), "Expected 2 invocations on Event Handler by now");
        Assertions.assertEquals(2, copyOnWriteArrayList.size());
        this.testSubject.shutDown();
        while (this.testSubject.activeProcessorThreads() > 0) {
            Thread.sleep(1L);
        }
        CountDownLatch countDownLatch2 = new CountDownLatch(2);
        ((EventMessageHandler) Mockito.doAnswer(invocationOnMock2 -> {
            copyOnWriteArrayList.add((EventMessage) invocationOnMock2.getArguments()[0]);
            countDownLatch2.countDown();
            return null;
        }).when(this.mockHandler)).handle((EventMessage) Mockito.any());
        this.eventBus.publish(EventTestUtils.createEvents(2));
        Assertions.assertEquals(2L, countDownLatch2.getCount());
        this.testSubject.start();
        Assertions.assertTrue(countDownLatch2.await(5L, TimeUnit.SECONDS), "Expected 4 invocations on Event Handler by now");
        Assertions.assertEquals(4, copyOnWriteArrayList.size());
    }

    @Test
    @DirtiesContext
    void processorGoesToRetryModeWhenOpenStreamFails() throws Exception {
        this.eventBus = (EmbeddedEventStore) Mockito.spy(this.eventBus);
        this.tokenStore = new InMemoryTokenStore();
        this.eventBus.publish(EventTestUtils.createEvents(5));
        Mockito.when(this.eventBus.openStream((TrackingToken) Mockito.any())).thenThrow(new Throwable[]{new MockException()}).thenCallRealMethod();
        ArrayList arrayList = new ArrayList();
        CountDownLatch countDownLatch = new CountDownLatch(5);
        ((EventMessageHandler) Mockito.doAnswer(invocationOnMock -> {
            arrayList.add((EventMessage) invocationOnMock.getArguments()[0]);
            countDownLatch.countDown();
            return null;
        }).when(this.mockHandler)).handle((EventMessage) Mockito.any());
        this.testSubject = TrackingEventProcessor.builder().name("test").eventHandlerInvoker(this.eventHandlerInvoker).messageSource(this.eventBus).tokenStore(this.tokenStore).transactionManager(NoTransactionManager.INSTANCE).build();
        this.testSubject.start();
        Thread.sleep(200L);
        Assertions.assertTrue(countDownLatch.await(10L, TimeUnit.SECONDS), "Expected 5 invocations on Event Handler by now");
        Assertions.assertEquals(5, arrayList.size());
        ((EmbeddedEventStore) Mockito.verify(this.eventBus, Mockito.times(2))).openStream((TrackingToken) Mockito.any());
    }

    @Test
    @DirtiesContext
    void processorSetsAndUnsetsErrorState() throws Exception {
        this.eventBus = (EmbeddedEventStore) Mockito.spy(this.eventBus);
        this.tokenStore = new InMemoryTokenStore();
        this.eventBus.publish(EventTestUtils.createEvents(5));
        AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        Mockito.when(this.eventBus.openStream((TrackingToken) Mockito.any())).then(invocationOnMock -> {
            if (atomicBoolean.get()) {
                throw new MockException();
            }
            return invocationOnMock.callRealMethod();
        });
        ArrayList arrayList = new ArrayList();
        CountDownLatch countDownLatch = new CountDownLatch(5);
        ((EventMessageHandler) Mockito.doAnswer(invocationOnMock2 -> {
            arrayList.add((EventMessage) invocationOnMock2.getArguments()[0]);
            countDownLatch.countDown();
            return null;
        }).when(this.mockHandler)).handle((EventMessage) Mockito.any());
        this.testSubject = TrackingEventProcessor.builder().name("test").eventHandlerInvoker(this.eventHandlerInvoker).messageSource(this.eventBus).tokenStore(this.tokenStore).transactionManager(NoTransactionManager.INSTANCE).build();
        this.testSubject.start();
        Thread.sleep(200L);
        MatcherAssert.assertThat("no healthy processor when open stream fails", !this.testSubject.processingStatus().values().stream().filter(eventTrackerStatus -> {
            return !eventTrackerStatus.isErrorState();
        }).findFirst().isPresent());
        atomicBoolean.set(false);
        Assertions.assertTrue(countDownLatch.await(10L, TimeUnit.SECONDS), "Expected 5 invocations on Event Handler by now");
        Assertions.assertEquals(5, arrayList.size());
        MatcherAssert.assertThat("no processor in error state when open stream succeeds again", !this.testSubject.processingStatus().values().stream().filter((v0) -> {
            return v0.isErrorState();
        }).findFirst().isPresent());
    }

    @Test
    void firstTokenIsStoredWhenUnitOfWorkIsRolledBackOnSecondEvent() throws Exception {
        List<DomainEventMessage<?>> createEvents = EventTestUtils.createEvents(2);
        CountDownLatch countDownLatch = new CountDownLatch(2);
        this.testSubject.registerHandlerInterceptor((unitOfWork, interceptorChain) -> {
            unitOfWork.onCommit(unitOfWork -> {
                if (unitOfWork.getMessage().equals(createEvents.get(1))) {
                    throw new MockException();
                }
            });
            return interceptorChain.proceed();
        });
        this.testSubject.registerHandlerInterceptor((unitOfWork2, interceptorChain2) -> {
            unitOfWork2.onCleanup(unitOfWork2 -> {
                countDownLatch.countDown();
            });
            return interceptorChain2.proceed();
        });
        this.testSubject.start();
        Thread.sleep(200L);
        this.eventBus.publish(createEvents);
        Assertions.assertTrue(countDownLatch.await(5L, TimeUnit.SECONDS), "Expected Unit of Work to have reached clean up phase");
        ((TokenStore) Mockito.verify(this.tokenStore, Mockito.atLeastOnce())).storeToken((TrackingToken) Mockito.any(), (String) Mockito.any(), Mockito.anyInt());
        Assertions.assertNotNull(this.tokenStore.fetchToken(this.testSubject.getName(), 0));
    }

    @Test
    @DirtiesContext
    void eventsWithTheSameTokenAreProcessedInTheSameBatch() throws Exception {
        this.eventBus.shutDown();
        this.eventBus = (EmbeddedEventStore) Mockito.mock(EmbeddedEventStore.class);
        GlobalSequenceTrackingToken globalSequenceTrackingToken = new GlobalSequenceTrackingToken(0L);
        List list = (List) EventTestUtils.createEvents(2).stream().map(domainEventMessage -> {
            return EventUtils.asTrackedEventMessage(domainEventMessage, globalSequenceTrackingToken);
        }).collect(Collectors.toList());
        Mockito.when(this.eventBus.openStream((TrackingToken) null)).thenReturn(trackingEventStreamOf(list.iterator()));
        this.testSubject = TrackingEventProcessor.builder().name("test").eventHandlerInvoker(this.eventHandlerInvoker).messageSource(this.eventBus).tokenStore(this.tokenStore).transactionManager(NoTransactionManager.INSTANCE).build();
        this.testSubject.registerHandlerInterceptor((unitOfWork, interceptorChain) -> {
            unitOfWork.onCommit(unitOfWork -> {
                if (unitOfWork.getMessage().equals(list.get(1))) {
                    throw new MockException();
                }
            });
            return interceptorChain.proceed();
        });
        CountDownLatch countDownLatch = new CountDownLatch(2);
        this.testSubject.registerHandlerInterceptor((unitOfWork2, interceptorChain2) -> {
            unitOfWork2.onCleanup(unitOfWork2 -> {
                countDownLatch.countDown();
            });
            return interceptorChain2.proceed();
        });
        this.testSubject.start();
        Thread.sleep(200L);
        Assertions.assertTrue(countDownLatch.await(5L, TimeUnit.SECONDS), "Expected Unit of Work to have reached clean up phase");
        ((TokenStore) Mockito.verify(this.tokenStore, Mockito.atLeastOnce())).storeToken((TrackingToken) Mockito.any(), (String) Mockito.any(), Mockito.anyInt());
        Assertions.assertNull(this.tokenStore.fetchToken(this.testSubject.getName(), 0));
    }

    @Test
    void resetCausesEventsToBeReplayedWithCorrectContext() throws Exception {
        Mockito.when(Boolean.valueOf(this.mockHandler.supportsReset())).thenReturn(true);
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        CopyOnWriteArrayList copyOnWriteArrayList2 = new CopyOnWriteArrayList();
        CopyOnWriteArrayList copyOnWriteArrayList3 = new CopyOnWriteArrayList();
        int i = 0;
        ((EventMessageHandler) Mockito.doAnswer(invocationOnMock -> {
            EventMessage eventMessage = (EventMessage) invocationOnMock.getArgument(0);
            if (ReplayToken.isReplay(eventMessage)) {
                copyOnWriteArrayList2.add(eventMessage.getIdentifier());
                copyOnWriteArrayList3.add(ReplayToken.replayContext(eventMessage, MyResetContext.class).orElse(null));
            }
            copyOnWriteArrayList.add(eventMessage.getIdentifier());
            return null;
        }).when(this.mockHandler)).handle((EventMessage) Mockito.any());
        this.testSubject.start();
        awaitProcessorStarted();
        int i2 = 4;
        this.eventBus.publish(EventTestUtils.createEvents(4));
        Awaitility.await("Handle Events - Initial").pollDelay(Duration.ofMillis(50L)).atMost(Duration.ofMillis(4000L)).untilAsserted(() -> {
            Assertions.assertEquals(i2, copyOnWriteArrayList.size(), () -> {
                return "Actually handled [" + copyOnWriteArrayList.size() + "] instead of expected [" + i2 + "]";
            });
        });
        this.testSubject.shutDown();
        MyResetContext myResetContext = new MyResetContext("one");
        this.testSubject.resetTokens(myResetContext);
        MyResetContext myResetContext2 = new MyResetContext("two");
        this.testSubject.resetTokens(myResetContext2);
        this.testSubject.start();
        Awaitility.await("Handle Events - Replay").atMost(Duration.ofSeconds(2L)).pollDelay(Duration.ofMillis(50L)).until(() -> {
            return Boolean.valueOf(copyOnWriteArrayList.size() == 8);
        });
        Assertions.assertEquals(copyOnWriteArrayList.subList(0, 4), copyOnWriteArrayList.subList(4, 8));
        Assertions.assertEquals(copyOnWriteArrayList.subList(4, 8), copyOnWriteArrayList2);
        Assertions.assertEquals(4, copyOnWriteArrayList3.size());
        Stream stream = copyOnWriteArrayList3.stream();
        Objects.requireNonNull(myResetContext2);
        Assertions.assertTrue(stream.allMatch(myResetContext2::equals));
        Assertions.assertTrue(((EventTrackerStatus) this.testSubject.processingStatus().get(0)).isReplaying());
        Assertions.assertTrue(((EventTrackerStatus) this.testSubject.processingStatus().get(0)).getCurrentPosition().isPresent());
        Assertions.assertTrue(((EventTrackerStatus) this.testSubject.processingStatus().get(0)).getResetPosition().isPresent());
        long asLong = ((EventTrackerStatus) this.testSubject.processingStatus().get(0)).getCurrentPosition().getAsLong();
        this.eventBus.publish(EventTestUtils.createEvents(1));
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> {
            Assertions.assertFalse(((EventTrackerStatus) this.testSubject.processingStatus().get(Integer.valueOf(i))).isReplaying());
        });
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> {
            Assertions.assertFalse(((EventTrackerStatus) this.testSubject.processingStatus().get(Integer.valueOf(i))).getResetPosition().isPresent());
        });
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> {
            Assertions.assertTrue(((EventTrackerStatus) this.testSubject.processingStatus().get(Integer.valueOf(i))).getCurrentPosition().isPresent());
        });
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> {
            Assertions.assertTrue(((EventTrackerStatus) this.testSubject.processingStatus().get(Integer.valueOf(i))).getCurrentPosition().getAsLong() > asLong);
        });
        ((EventHandlerInvoker) Mockito.verify(this.eventHandlerInvoker, Mockito.times(1))).performReset(myResetContext);
        ((EventHandlerInvoker) Mockito.verify(this.eventHandlerInvoker, Mockito.times(1))).performReset(myResetContext2);
    }

    @Test
    void resetTokensPassesOnResetContextToResetHandlers() throws Exception {
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        Mockito.when(Boolean.valueOf(this.mockHandler.supportsReset())).thenReturn(true);
        ((EventMessageHandler) Mockito.doAnswer(invocationOnMock -> {
            copyOnWriteArrayList.add(((EventMessage) invocationOnMock.getArgument(0)).getIdentifier());
            return null;
        }).when(this.mockHandler)).handle((EventMessage) Mockito.any());
        this.eventBus.publish(EventTestUtils.createEvents(4));
        this.testSubject.start();
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> {
            Assertions.assertEquals(4, copyOnWriteArrayList.size());
        });
        this.testSubject.shutDown();
        this.testSubject.resetTokens("reset-context");
        this.testSubject.start();
        ((EventHandlerInvoker) Mockito.verify(this.eventHandlerInvoker)).performReset("reset-context");
    }

    @Test
    void resetToPositionCausesCertainEventsToBeReplayed() throws Exception {
        Mockito.when(Boolean.valueOf(this.mockHandler.supportsReset())).thenReturn(true);
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        CopyOnWriteArrayList copyOnWriteArrayList2 = new CopyOnWriteArrayList();
        int i = 0;
        ((EventMessageHandler) Mockito.doAnswer(invocationOnMock -> {
            EventMessage eventMessage = (EventMessage) invocationOnMock.getArgument(0);
            if (ReplayToken.isReplay(eventMessage)) {
                copyOnWriteArrayList2.add(eventMessage.getIdentifier());
            }
            copyOnWriteArrayList.add(eventMessage.getIdentifier());
            return null;
        }).when(this.mockHandler)).handle((EventMessage) Mockito.any());
        this.testSubject.start();
        awaitProcessorStarted();
        this.eventBus.publish(EventTestUtils.createEvents(4));
        Awaitility.await("Handle Events").atMost(Duration.ofSeconds(5L)).pollDelay(Duration.ofMillis(50L)).until(() -> {
            return Boolean.valueOf(copyOnWriteArrayList.size() == 4);
        });
        this.testSubject.shutDown();
        this.testSubject.resetTokens(streamableMessageSource -> {
            return new GlobalSequenceTrackingToken(1L);
        });
        this.testSubject.start();
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> {
            Assertions.assertEquals(6, copyOnWriteArrayList.size());
        });
        Assertions.assertFalse(copyOnWriteArrayList2.contains(copyOnWriteArrayList.get(0)));
        Assertions.assertFalse(copyOnWriteArrayList2.contains(copyOnWriteArrayList.get(1)));
        Assertions.assertEquals(copyOnWriteArrayList.subList(2, 4), copyOnWriteArrayList.subList(4, 6));
        Assertions.assertEquals(copyOnWriteArrayList.subList(4, 6), copyOnWriteArrayList2);
        Assertions.assertTrue(((EventTrackerStatus) this.testSubject.processingStatus().get(0)).isReplaying());
        Assertions.assertTrue(((EventTrackerStatus) this.testSubject.processingStatus().get(0)).getCurrentPosition().isPresent());
        Assertions.assertTrue(((EventTrackerStatus) this.testSubject.processingStatus().get(0)).getResetPosition().isPresent());
        long asLong = ((EventTrackerStatus) this.testSubject.processingStatus().get(0)).getResetPosition().getAsLong();
        this.eventBus.publish(EventTestUtils.createEvents(1));
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> {
            Assertions.assertFalse(((EventTrackerStatus) this.testSubject.processingStatus().get(Integer.valueOf(i))).isReplaying());
        });
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> {
            Assertions.assertFalse(((EventTrackerStatus) this.testSubject.processingStatus().get(Integer.valueOf(i))).getResetPosition().isPresent());
        });
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> {
            Assertions.assertTrue(((EventTrackerStatus) this.testSubject.processingStatus().get(Integer.valueOf(i))).getCurrentPosition().isPresent());
        });
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> {
            Assertions.assertTrue(((EventTrackerStatus) this.testSubject.processingStatus().get(Integer.valueOf(i))).getCurrentPosition().getAsLong() > asLong);
        });
        ((EventHandlerInvoker) Mockito.verify(this.eventHandlerInvoker)).performReset(NO_RESET_PAYLOAD);
    }

    @Test
    void resetOnInitializeWithTokenResetToThatToken() throws Exception {
        this.testSubject = new TrackingEventProcessor(TrackingEventProcessor.builder().name("test").eventHandlerInvoker(this.eventHandlerInvoker).messageSource(this.eventBus).tokenStore(this.tokenStore).transactionManager(NoTransactionManager.INSTANCE).trackingEventProcessorConfiguration(TrackingEventProcessorConfiguration.forSingleThreadedProcessing().andInitialTrackingToken(streamableMessageSource -> {
            return new GlobalSequenceTrackingToken(1L);
        }))) { // from class: org.axonframework.integrationtests.eventhandling.TrackingEventProcessorTest.3
            protected void doSleepFor(long j) {
                if (isRunning()) {
                    TrackingEventProcessorTest.this.sleepInstructions.add(Long.valueOf(j));
                }
            }

            protected void doSleepFor(long j, AtomicBoolean atomicBoolean) {
                if (isRunning()) {
                    TrackingEventProcessorTest.this.sleepInstructions.add(Long.valueOf(j));
                }
            }
        };
        Mockito.when(Boolean.valueOf(this.mockHandler.supportsReset())).thenReturn(true);
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        CopyOnWriteArrayList copyOnWriteArrayList2 = new CopyOnWriteArrayList();
        int i = 0;
        ((EventMessageHandler) Mockito.doAnswer(invocationOnMock -> {
            EventMessage eventMessage = (EventMessage) invocationOnMock.getArgument(0);
            if (ReplayToken.isReplay(eventMessage)) {
                copyOnWriteArrayList2.add(eventMessage.getIdentifier());
            }
            copyOnWriteArrayList.add(eventMessage.getIdentifier());
            return null;
        }).when(this.mockHandler)).handle((EventMessage) Mockito.any());
        this.eventBus.publish(EventTestUtils.createEvents(4));
        this.testSubject.start();
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> {
            Assertions.assertEquals(2, copyOnWriteArrayList.size());
        });
        this.testSubject.shutDown();
        this.testSubject.resetTokens();
        this.testSubject.start();
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> {
            Assertions.assertEquals(4, copyOnWriteArrayList.size());
        });
        Assertions.assertEquals(copyOnWriteArrayList.subList(0, 2), copyOnWriteArrayList.subList(2, 4));
        Assertions.assertEquals(copyOnWriteArrayList.subList(2, 4), copyOnWriteArrayList2);
        Assertions.assertTrue(((EventTrackerStatus) this.testSubject.processingStatus().get(0)).isReplaying());
        Assertions.assertTrue(((EventTrackerStatus) this.testSubject.processingStatus().get(0)).getCurrentPosition().isPresent());
        Assertions.assertTrue(((EventTrackerStatus) this.testSubject.processingStatus().get(0)).getResetPosition().isPresent());
        long asLong = ((EventTrackerStatus) this.testSubject.processingStatus().get(0)).getResetPosition().getAsLong();
        this.eventBus.publish(EventTestUtils.createEvents(1));
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> {
            Assertions.assertFalse(((EventTrackerStatus) this.testSubject.processingStatus().get(Integer.valueOf(i))).isReplaying());
        });
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> {
            Assertions.assertFalse(((EventTrackerStatus) this.testSubject.processingStatus().get(Integer.valueOf(i))).getResetPosition().isPresent());
        });
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> {
            Assertions.assertTrue(((EventTrackerStatus) this.testSubject.processingStatus().get(Integer.valueOf(i))).getCurrentPosition().isPresent());
        });
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> {
            Assertions.assertTrue(((EventTrackerStatus) this.testSubject.processingStatus().get(Integer.valueOf(i))).getCurrentPosition().getAsLong() > asLong);
        });
        ((EventHandlerInvoker) Mockito.verify(this.eventHandlerInvoker)).performReset(NO_RESET_PAYLOAD);
    }

    @Test
    void resetBeforeStartingPerformsANormalRun() throws Exception {
        Mockito.when(Boolean.valueOf(this.mockHandler.supportsReset())).thenReturn(true);
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        CopyOnWriteArrayList copyOnWriteArrayList2 = new CopyOnWriteArrayList();
        ((EventMessageHandler) Mockito.doAnswer(invocationOnMock -> {
            EventMessage eventMessage = (EventMessage) invocationOnMock.getArgument(0);
            if (ReplayToken.isReplay(eventMessage)) {
                copyOnWriteArrayList2.add(eventMessage.getIdentifier());
            }
            copyOnWriteArrayList.add(eventMessage.getIdentifier());
            return null;
        }).when(this.mockHandler)).handle((EventMessage) Mockito.any());
        this.testSubject.resetTokens();
        this.testSubject.start();
        awaitProcessorStarted();
        int i = 4;
        this.eventBus.publish(EventTestUtils.createEvents(4));
        Awaitility.await("Handle Events").pollDelay(Duration.ofMillis(50L)).atMost(Duration.ofMillis(2500L)).untilAsserted(() -> {
            Assertions.assertEquals(i, copyOnWriteArrayList.size(), () -> {
                return "Actually handled [" + copyOnWriteArrayList.size() + "] instead of expected [" + i + "]";
            });
        });
        Assertions.assertEquals(0, copyOnWriteArrayList2.size());
        Assertions.assertFalse(((EventTrackerStatus) this.testSubject.processingStatus().get(0)).isReplaying());
        Assertions.assertFalse(((EventTrackerStatus) this.testSubject.processingStatus().get(0)).getResetPosition().isPresent());
        Assertions.assertTrue(((EventTrackerStatus) this.testSubject.processingStatus().get(0)).getCurrentPosition().isPresent());
        Assertions.assertTrue(((EventTrackerStatus) this.testSubject.processingStatus().get(0)).getCurrentPosition().getAsLong() > 0);
        ((EventHandlerInvoker) Mockito.verify(this.eventHandlerInvoker)).performReset(NO_RESET_PAYLOAD);
    }

    private void awaitProcessorStarted() {
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> {
            Assertions.assertEquals(1, this.testSubject.activeProcessorThreads());
        });
    }

    @Test
    void replayFlagAvailableWhenReplayInDifferentOrder() throws Exception {
        StreamableMessageSource streamableMessageSource = (StreamableMessageSource) Mockito.mock(StreamableMessageSource.class);
        this.testSubject = TrackingEventProcessor.builder().name("test").eventHandlerInvoker(this.eventHandlerInvoker).messageSource(streamableMessageSource).tokenStore(this.tokenStore).transactionManager(NoTransactionManager.INSTANCE).build();
        Mockito.when(streamableMessageSource.openStream((TrackingToken) Mockito.any())).thenReturn(new StubTrackingEventStream(0, 1, 2, 5)).thenReturn(new StubTrackingEventStream(0, 1, 2, 3, 4, 5, 6, 7));
        Mockito.when(Boolean.valueOf(this.eventHandlerInvoker.supportsReset())).thenReturn(true);
        ((EventHandlerInvoker) Mockito.doReturn(true).when(this.eventHandlerInvoker)).canHandle((EventMessage) Mockito.any(), (Segment) Mockito.any());
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        CopyOnWriteArrayList copyOnWriteArrayList2 = new CopyOnWriteArrayList();
        ((EventHandlerInvoker) Mockito.doAnswer(invocationOnMock -> {
            copyOnWriteArrayList.add(((TrackedEventMessage) invocationOnMock.getArgument(0)).trackingToken());
            return null;
        }).when(this.eventHandlerInvoker)).handle((EventMessage) Mockito.any(), (Segment) Mockito.any());
        this.testSubject.start();
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> {
            Assertions.assertEquals(4, copyOnWriteArrayList.size());
        });
        this.testSubject.shutDown();
        ((EventHandlerInvoker) Mockito.doAnswer(invocationOnMock2 -> {
            copyOnWriteArrayList2.add(((TrackedEventMessage) invocationOnMock2.getArgument(0)).trackingToken());
            return null;
        }).when(this.eventHandlerInvoker)).handle((EventMessage) Mockito.any(), (Segment) Mockito.any());
        this.testSubject.resetTokens();
        this.testSubject.start();
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> {
            Assertions.assertEquals(8, copyOnWriteArrayList2.size());
        });
        Assertions.assertEquals(GapAwareTrackingToken.newInstance(5L, Arrays.asList(3L, 4L)), copyOnWriteArrayList.get(3));
        Assertions.assertTrue(copyOnWriteArrayList2.get(0) instanceof ReplayToken);
        Assertions.assertTrue(copyOnWriteArrayList2.get(5) instanceof ReplayToken);
        Assertions.assertEquals(GapAwareTrackingToken.newInstance(6L, Collections.emptySortedSet()), copyOnWriteArrayList2.get(6));
        ((EventHandlerInvoker) Mockito.verify(this.eventHandlerInvoker)).performReset(NO_RESET_PAYLOAD);
    }

    @Test
    void resetRejectedWhileRunning() {
        this.testSubject.start();
        TrackingEventProcessor trackingEventProcessor = this.testSubject;
        Objects.requireNonNull(trackingEventProcessor);
        Assertions.assertThrows(IllegalStateException.class, trackingEventProcessor::resetTokens);
    }

    @Test
    void resetNotSupportedWhenInvokerDoesNotSupportReset() {
        Mockito.when(Boolean.valueOf(this.mockHandler.supportsReset())).thenReturn(false);
        Assertions.assertFalse(this.testSubject.supportsReset());
    }

    @Test
    void resetRejectedWhenInvokerDoesNotSupportReset() {
        Mockito.when(Boolean.valueOf(this.mockHandler.supportsReset())).thenReturn(false);
        TrackingEventProcessor trackingEventProcessor = this.testSubject;
        Objects.requireNonNull(trackingEventProcessor);
        Assertions.assertThrows(IllegalStateException.class, trackingEventProcessor::resetTokens);
    }

    @Test
    void resetRejectedIfNotAllTokensCanBeClaimed() {
        this.tokenStore.initializeTokenSegments("test", 4);
        Mockito.when(this.tokenStore.fetchToken("test", 3)).thenThrow(new Throwable[]{new UnableToClaimTokenException("Mock")});
        TrackingEventProcessor trackingEventProcessor = this.testSubject;
        Objects.requireNonNull(trackingEventProcessor);
        Assertions.assertThrows(UnableToClaimTokenException.class, trackingEventProcessor::resetTokens);
        ((TokenStore) Mockito.verify(this.tokenStore, Mockito.never())).storeToken((TrackingToken) Mockito.isNull(), Mockito.anyString(), Mockito.anyInt());
    }

    @Test
    void resetTokensPassesOnResetContextToTrackingToken() throws Exception {
        AtomicInteger atomicInteger = new AtomicInteger();
        Mockito.when(Boolean.valueOf(this.mockHandler.supportsReset())).thenReturn(true);
        ((EventMessageHandler) Mockito.doAnswer(invocationOnMock -> {
            atomicInteger.incrementAndGet();
            return null;
        }).when(this.mockHandler)).handle((EventMessage) Mockito.any());
        this.eventBus.publish(EventTestUtils.createEvents(4));
        this.testSubject.start();
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> {
            Assertions.assertEquals(4, atomicInteger.get());
        });
        this.testSubject.shutDown();
        this.testSubject.resetTokens("reset-context");
        this.testSubject.start();
        Assertions.assertEquals("reset-context", ReplayToken.replayContext(this.tokenStore.fetchToken("test", 0), String.class).orElse(null));
    }

    @Test
    void whenFailureDuringInit() throws InterruptedException {
        ((TokenStore) Mockito.doThrow(new Throwable[]{new RuntimeException("Faking issue during fetchSegments")}).doCallRealMethod().when(this.tokenStore)).fetchSegments(Mockito.anyString());
        ((TokenStore) Mockito.doThrow(new Throwable[]{new RuntimeException("Faking issue during initializeTokenSegments")}).doNothing().when(this.tokenStore)).initializeTokenSegments(Mockito.anyString(), Mockito.anyInt());
        this.testSubject.start();
        for (int i = 0; i < 250 && this.testSubject.activeProcessorThreads() < 1; i++) {
            Thread.sleep(10L);
        }
        Assertions.assertEquals(1, this.testSubject.activeProcessorThreads());
    }

    @Test
    void updateActiveSegmentsWhenBatchIsEmpty() throws Exception {
        StreamableMessageSource streamableMessageSource = (StreamableMessageSource) Mockito.mock(StreamableMessageSource.class);
        this.testSubject = TrackingEventProcessor.builder().name("test").eventHandlerInvoker(this.eventHandlerInvoker).messageSource(streamableMessageSource).tokenStore(this.tokenStore).transactionManager(NoTransactionManager.INSTANCE).build();
        Mockito.when(streamableMessageSource.openStream((TrackingToken) Mockito.any())).thenReturn(new StubTrackingEventStream(0, 1, 2, 5));
        ((EventHandlerInvoker) Mockito.doReturn(true, new Object[]{false}).when(this.eventHandlerInvoker)).canHandle((EventMessage) Mockito.any(), (Segment) Mockito.any());
        this.testSubject.start();
        waitForStatus("processor thread started", 200L, TimeUnit.MILLISECONDS, map -> {
            return map.containsKey(0);
        });
        waitForStatus("Segment 0 caught up", 5L, TimeUnit.SECONDS, map2 -> {
            return ((EventTrackerStatus) map2.get(0)).isCaughtUp();
        });
        Assertions.assertTrue(((EventTrackerStatus) this.testSubject.processingStatus().get(0)).getTrackingToken().covers(GapAwareTrackingToken.newInstance(5L, Arrays.asList(3L, 4L))));
    }

    @Test
    void releaseSegment() {
        this.testSubject.start();
        AssertUtils.assertWithin(5, TimeUnit.SECONDS, () -> {
            Assertions.assertEquals(1, this.testSubject.activeProcessorThreads());
        });
        this.testSubject.releaseSegment(0, 2L, TimeUnit.SECONDS);
        AssertUtils.assertWithin(2, TimeUnit.SECONDS, () -> {
            Assertions.assertEquals(0, this.testSubject.activeProcessorThreads());
        });
        AssertUtils.assertWithin(5, TimeUnit.SECONDS, () -> {
            Assertions.assertEquals(1, this.testSubject.activeProcessorThreads());
        });
    }

    @Test
    void releaseAndClaimSegmentWillOverrideReleaseDuration() {
        this.testSubject.start();
        AssertUtils.assertWithin(5, TimeUnit.SECONDS, () -> {
            Assertions.assertEquals(1, this.testSubject.activeProcessorThreads());
        });
        this.testSubject.releaseSegment(0, 180L, TimeUnit.SECONDS);
        AssertUtils.assertWithin(2, TimeUnit.SECONDS, () -> {
            Assertions.assertEquals(0, this.testSubject.activeProcessorThreads());
        });
        this.testSubject.claimSegment(0).join();
        AssertUtils.assertWithin(5, TimeUnit.SECONDS, () -> {
            Assertions.assertEquals(1, this.testSubject.activeProcessorThreads());
        });
    }

    @Test
    void hasAvailableSegments() {
        Assertions.assertEquals(1, this.testSubject.availableProcessorThreads());
        this.testSubject.start();
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> {
            Assertions.assertEquals(0, this.testSubject.availableProcessorThreads());
        });
        this.testSubject.releaseSegment(0);
        AssertUtils.assertWithin(2, TimeUnit.SECONDS, () -> {
            Assertions.assertEquals(1, this.testSubject.availableProcessorThreads());
        });
    }

    @Timeout(10)
    @Test
    void splitSegments() throws InterruptedException {
        this.tokenStore.initializeTokenSegments(this.testSubject.getName(), 1);
        this.testSubject.start();
        waitForSegmentStart(0);
        Assertions.assertTrue(((Boolean) this.testSubject.splitSegment(0).join()).booleanValue(), "Expected split to succeed");
        Assertions.assertArrayEquals(new int[]{0, 1}, this.tokenStore.fetchSegments(this.testSubject.getName()));
        waitForSegmentStart(0);
    }

    @Timeout(10)
    @Test
    void mergeSegments() throws InterruptedException {
        this.tokenStore.initializeTokenSegments(this.testSubject.getName(), 2);
        this.testSubject.start();
        while (this.testSubject.processingStatus().isEmpty()) {
            Thread.sleep(10L);
        }
        Assertions.assertTrue(((Boolean) this.testSubject.mergeSegment(0).join()).booleanValue(), "Expected merge to succeed");
        waitForProcessingStatus(0, (v0) -> {
            return v0.isMerging();
        });
        waitForSegmentStart(0);
        Assertions.assertArrayEquals(new int[]{0}, this.tokenStore.fetchSegments(this.testSubject.getName()));
        publishEvents(1);
        waitForProcessingStatus(0, eventTrackerStatus -> {
            return !eventTrackerStatus.isMerging();
        });
    }

    @Timeout(10)
    @Test
    void mergeSegments_BothClaimedByProcessor() throws Exception {
        initProcessor(TrackingEventProcessorConfiguration.forParallelProcessing(2).andEventAvailabilityTimeout(10L, TimeUnit.MILLISECONDS).andBatchSize(100));
        this.tokenStore.initializeTokenSegments(this.testSubject.getName(), 2);
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        int i = 0;
        Mockito.when(this.mockHandler.handle((EventMessage) Mockito.any())).thenAnswer(invocationOnMock -> {
            return Boolean.valueOf(copyOnWriteArrayList.add((EventMessage) invocationOnMock.getArgument(0)));
        });
        publishEvents(10);
        this.testSubject.start();
        waitForActiveThreads(2);
        AssertUtils.assertWithin(5, TimeUnit.SECONDS, () -> {
            Assertions.assertEquals(10L, copyOnWriteArrayList.stream().map((v0) -> {
                return v0.getIdentifier();
            }).distinct().count(), "Expected message to be handled");
        });
        Assertions.assertFalse(((EventTrackerStatus) this.testSubject.processingStatus().get(0)).isMerging());
        Assertions.assertFalse(((EventTrackerStatus) this.testSubject.processingStatus().get(0)).mergeCompletedPosition().isPresent());
        AssertUtils.assertWithin(50, TimeUnit.MILLISECONDS, () -> {
            Assertions.assertTrue(((Boolean) this.testSubject.mergeSegment(i).join()).booleanValue(), "Expected merge to succeed");
        });
        EventTrackerStatus waitForProcessingStatus = waitForProcessingStatus(0, (v0) -> {
            return v0.isMerging();
        });
        Assertions.assertTrue(waitForProcessingStatus.mergeCompletedPosition().isPresent());
        long asLong = waitForProcessingStatus.mergeCompletedPosition().getAsLong();
        Assertions.assertArrayEquals(new int[]{0}, this.tokenStore.fetchSegments(this.testSubject.getName()));
        publishEvents(1);
        EventTrackerStatus waitForProcessingStatus2 = waitForProcessingStatus(0, eventTrackerStatus -> {
            return !eventTrackerStatus.isMerging();
        });
        Assertions.assertFalse(waitForProcessingStatus2.mergeCompletedPosition().isPresent());
        Assertions.assertTrue(waitForProcessingStatus2.getCurrentPosition().isPresent());
        Assertions.assertTrue(waitForProcessingStatus2.getCurrentPosition().getAsLong() > asLong);
    }

    @Timeout(10)
    @Test
    void mergeSegments_WithExplicitReleaseOther() throws Exception {
        initProcessor(TrackingEventProcessorConfiguration.forParallelProcessing(2));
        this.tokenStore.initializeTokenSegments(this.testSubject.getName(), 2);
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        ArrayList arrayList = new ArrayList();
        int i = 0;
        for (int i2 = 0; i2 < 10; i2++) {
            arrayList.add(EventTestUtils.createEvent(UUID.randomUUID().toString(), 0L));
        }
        Mockito.when(this.mockHandler.handle((EventMessage) Mockito.any())).thenAnswer(invocationOnMock -> {
            return Boolean.valueOf(copyOnWriteArrayList.add((EventMessage) invocationOnMock.getArgument(0)));
        });
        this.eventBus.publish(arrayList);
        this.testSubject.start();
        waitForActiveThreads(2);
        this.testSubject.releaseSegment(1);
        waitForSegmentRelease(1);
        AssertUtils.assertWithin(50, TimeUnit.MILLISECONDS, () -> {
            Assertions.assertTrue(((Boolean) this.testSubject.mergeSegment(i).join()).booleanValue(), "Expected merge to succeed");
        });
        Assertions.assertArrayEquals(new int[]{0}, this.tokenStore.fetchSegments(this.testSubject.getName()));
        waitForSegmentStart(0);
        while (!((Boolean) Optional.ofNullable((EventTrackerStatus) this.testSubject.processingStatus().get(0)).map((v0) -> {
            return v0.isCaughtUp();
        }).orElse(false)).booleanValue()) {
            Thread.sleep(10L);
        }
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> {
            Assertions.assertEquals(10, copyOnWriteArrayList.size());
        });
    }

    @Timeout(10)
    @Test
    void doubleSplitAndMerge() throws Exception {
        this.tokenStore.initializeTokenSegments(this.testSubject.getName(), 1);
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        int i = 0;
        Mockito.when(this.mockHandler.handle((EventMessage) Mockito.any())).thenAnswer(invocationOnMock -> {
            return Boolean.valueOf(copyOnWriteArrayList.add((EventMessage) invocationOnMock.getArgument(0)));
        });
        publishEvents(10);
        this.testSubject.start();
        waitForActiveThreads(1);
        AssertUtils.assertWithin(50, TimeUnit.MILLISECONDS, () -> {
            Assertions.assertTrue(((Boolean) this.testSubject.splitSegment(i).join()).booleanValue(), "Expected split to succeed");
        });
        waitForActiveThreads(1);
        AssertUtils.assertWithin(50, TimeUnit.MILLISECONDS, () -> {
            Assertions.assertTrue(((Boolean) this.testSubject.splitSegment(i).join()).booleanValue(), "Expected split to succeed");
        });
        waitForActiveThreads(1);
        Assertions.assertArrayEquals(new int[]{0, 1, 2}, this.tokenStore.fetchSegments(this.testSubject.getName()));
        publishEvents(20);
        waitForProcessingStatus(0, (v0) -> {
            return v0.isCaughtUp();
        });
        Assertions.assertFalse(((EventTrackerStatus) this.testSubject.processingStatus().get(0)).isMerging());
        Assertions.assertTrue(((Boolean) this.testSubject.mergeSegment(0).join()).booleanValue(), "Expected merge to succeed");
        Assertions.assertArrayEquals(new int[]{0, 1}, this.tokenStore.fetchSegments(this.testSubject.getName()));
        publishEvents(10);
        waitForSegmentStart(0);
        waitForProcessingStatus(0, eventTrackerStatus -> {
            return eventTrackerStatus.getSegment().getMask() == 1 && eventTrackerStatus.isCaughtUp();
        });
        Assertions.assertTrue(((Boolean) this.testSubject.mergeSegment(0).join()).booleanValue(), "Expected merge to succeed");
        Assertions.assertArrayEquals(new int[]{0}, this.tokenStore.fetchSegments(this.testSubject.getName()));
        AssertUtils.assertWithin(3, TimeUnit.SECONDS, () -> {
            Assertions.assertEquals(40, copyOnWriteArrayList.size());
        });
    }

    @Timeout(10)
    @Test
    void mergeSegmentWithDifferentProcessingGroupsAndSequencingPolicies() throws Exception {
        EventMessageHandler eventMessageHandler = (EventMessageHandler) Mockito.mock(EventMessageHandler.class);
        Mockito.when(Boolean.valueOf(eventMessageHandler.canHandle((EventMessage) Mockito.any()))).thenReturn(true);
        Mockito.when(Boolean.valueOf(eventMessageHandler.supportsReset())).thenReturn(true);
        SimpleEventHandlerInvoker build = SimpleEventHandlerInvoker.builder().eventHandlers(new Object[]{Collections.singleton(eventMessageHandler)}).sequencingPolicy(eventMessage -> {
            return 0;
        }).build();
        initProcessor(TrackingEventProcessorConfiguration.forParallelProcessing(2).andBatchSize(5), builder -> {
            return builder.eventHandlerInvoker(new MultiEventHandlerInvoker(new EventHandlerInvoker[]{this.eventHandlerInvoker, build}));
        });
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        Mockito.when(this.mockHandler.handle((EventMessage) Mockito.any())).thenAnswer(invocationOnMock -> {
            return Boolean.valueOf(copyOnWriteArrayList.add((TrackedEventMessage) invocationOnMock.getArgument(0)));
        });
        publishEvents(10);
        this.testSubject.start();
        while (true) {
            if (this.testSubject.processingStatus().size() >= 2 && this.testSubject.processingStatus().values().stream().allMatch((v0) -> {
                return v0.isCaughtUp();
            })) {
                break;
            } else {
                Thread.sleep(10L);
            }
        }
        this.testSubject.releaseSegment(1);
        while (true) {
            if (this.testSubject.processingStatus().size() == 1 && this.testSubject.processingStatus().values().stream().allMatch((v0) -> {
                return v0.isCaughtUp();
            })) {
                break;
            } else {
                Thread.sleep(10L);
            }
        }
        publishEvents(10);
        this.testSubject.mergeSegment(0);
        publishEvents(10);
        while (true) {
            if (this.testSubject.processingStatus().size() == 1 && this.testSubject.processingStatus().values().stream().allMatch((v0) -> {
                return v0.isCaughtUp();
            })) {
                int i = 30;
                Awaitility.await("Handled Events - After Merge").pollDelay(Duration.ofMillis(50L)).atMost(Duration.ofSeconds(5L)).untilAsserted(() -> {
                    Assertions.assertEquals(i, copyOnWriteArrayList.size(), () -> {
                        return "Actually handled [" + copyOnWriteArrayList.size() + "] instead of expected [" + i + "]";
                    });
                });
                return;
            }
            Thread.sleep(10L);
        }
    }

    @Timeout(20)
    @Test
    void mergeSegmentsDuringReplay() throws Exception {
        initProcessor(TrackingEventProcessorConfiguration.forParallelProcessing(2));
        this.tokenStore.initializeTokenSegments(this.testSubject.getName(), 2);
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        CopyOnWriteArrayList copyOnWriteArrayList2 = new CopyOnWriteArrayList();
        Mockito.when(this.mockHandler.handle((EventMessage) Mockito.any())).thenAnswer(invocationOnMock -> {
            TrackedEventMessage trackedEventMessage = (TrackedEventMessage) invocationOnMock.getArgument(0);
            if (ReplayToken.isReplay(trackedEventMessage)) {
                copyOnWriteArrayList2.add(trackedEventMessage);
                return null;
            }
            copyOnWriteArrayList.add(trackedEventMessage);
            return null;
        });
        for (int i = 0; i < 10; i++) {
            this.eventBus.publish(new EventMessage[]{EventTestUtils.createEvent(UUID.randomUUID().toString(), 0L)});
        }
        this.testSubject.start();
        while (true) {
            if (this.testSubject.processingStatus().size() >= 2 && this.testSubject.processingStatus().values().stream().allMatch((v0) -> {
                return v0.isCaughtUp();
            })) {
                this.testSubject.shutDown();
                this.testSubject.resetTokens();
                this.testSubject.releaseSegment(1);
                this.testSubject.start();
                waitForActiveThreads(1);
                Thread.yield();
                CompletableFuture mergeSegment = this.testSubject.mergeSegment(0);
                publishEvents(20);
                Assertions.assertTrue(((Boolean) mergeSegment.join()).booleanValue(), "Expected merge to succeed");
                Assertions.assertArrayEquals(new int[]{0}, this.tokenStore.fetchSegments(this.testSubject.getName()));
                waitForSegmentStart(0);
                AssertUtils.assertWithin(10, TimeUnit.SECONDS, () -> {
                    Assertions.assertEquals(30, copyOnWriteArrayList.size());
                });
                Thread.sleep(100L);
                Assertions.assertEquals(30, copyOnWriteArrayList.size());
                Assertions.assertEquals(copyOnWriteArrayList2.stream().map((v0) -> {
                    return v0.getIdentifier();
                }).distinct().count(), copyOnWriteArrayList2.size());
                return;
            }
            Thread.sleep(10L);
        }
    }

    @Test
    void replayDuringIncompleteMerge() throws Exception {
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        List list = (List) IntStream.range(0, 10).mapToObj(i -> {
            return EventTestUtils.createEvent(UUID.randomUUID().toString(), 0L);
        }).collect(Collectors.toList());
        Duration ofMillis = Duration.ofMillis(25L);
        int i2 = 0;
        this.tokenStore.initializeTokenSegments(this.testSubject.getName(), 2);
        initProcessor(TrackingEventProcessorConfiguration.forParallelProcessing(2));
        Mockito.when(this.mockHandler.handle((EventMessage) Mockito.any())).thenAnswer(invocationOnMock -> {
            TrackedEventMessage trackedEventMessage = (TrackedEventMessage) invocationOnMock.getArgument(0);
            if (ReplayToken.isReplay(trackedEventMessage)) {
                return null;
            }
            return Boolean.valueOf(copyOnWriteArrayList.add(trackedEventMessage));
        });
        this.eventBus.publish(list);
        this.testSubject.start();
        Awaitility.await().pollDelay(ofMillis).atMost(Duration.ofMillis(250L)).until(() -> {
            return Boolean.valueOf(this.testSubject.processingStatus().size() >= 2);
        });
        this.testSubject.releaseSegment(1);
        while (this.testSubject.processingStatus().containsKey(1)) {
            Thread.yield();
        }
        publishEvents(10);
        Assertions.assertTrue(((Boolean) this.testSubject.mergeSegment(0).join()).booleanValue(), "Expected merge to succeed");
        Awaitility.await("Merge segments").pollDelay(ofMillis).atMost(Duration.ofMillis(1000L)).until(() -> {
            return Boolean.valueOf(this.testSubject.processingStatus().size() == 1);
        });
        this.testSubject.shutDown();
        this.testSubject.resetTokens();
        publishEvents(10);
        this.testSubject.start();
        Awaitility.await().pollDelay(ofMillis).atMost(Duration.ofMillis(250L)).until(() -> {
            return Boolean.valueOf(!this.testSubject.processingStatus().isEmpty());
        });
        Assertions.assertArrayEquals(new int[]{0}, this.tokenStore.fetchSegments(this.testSubject.getName()));
        Awaitility.await().pollDelay(ofMillis).atMost(Duration.ofMillis(250L)).until(() -> {
            return Boolean.valueOf(this.testSubject.processingStatus().containsKey(Integer.valueOf(i2)));
        });
        Awaitility.await().pollDelay(ofMillis).atMost(Duration.ofSeconds(1L)).until(() -> {
            return Boolean.valueOf(((EventTrackerStatus) this.testSubject.processingStatus().get(Integer.valueOf(i2))).isCaughtUp());
        });
        int i3 = 30;
        Awaitility.await("Handle Events - Replay").pollDelay(ofMillis).atMost(Duration.ofMillis(4000L)).untilAsserted(() -> {
            Assertions.assertEquals(i3, copyOnWriteArrayList.size(), () -> {
                return "Actually handled [" + copyOnWriteArrayList.size() + "] instead of expected [" + i3 + "]";
            });
        });
    }

    @Timeout(10)
    @Test
    void mergeWithIncompatibleSegmentRejected() throws InterruptedException {
        initProcessor(TrackingEventProcessorConfiguration.forParallelProcessing(3));
        this.testSubject.start();
        waitForActiveThreads(3);
        Assertions.assertTrue(this.testSubject.processingStatus().containsKey(0));
        Assertions.assertTrue(this.testSubject.processingStatus().containsKey(1));
        Assertions.assertTrue(this.testSubject.processingStatus().containsKey(2));
        this.testSubject.releaseSegment(0);
        this.testSubject.releaseSegment(2);
        this.testSubject.processingStatus().values().forEach(eventTrackerStatus -> {
            Objects.requireNonNull(eventTrackerStatus);
            Assertions.assertFalse(eventTrackerStatus::isMerging);
        });
        while (this.testSubject.processingStatus().size() > 1) {
            Thread.sleep(10L);
        }
        Assertions.assertFalse(((Boolean) this.testSubject.mergeSegment(1).join()).booleanValue(), "Expected merge to be rejected");
    }

    @Timeout(10)
    @Test
    void mergeWithSingleSegmentRejected() throws InterruptedException {
        initProcessor(TrackingEventProcessorConfiguration.forParallelProcessing(1));
        this.testSubject.start();
        waitForActiveThreads(1);
        Assertions.assertFalse(((Boolean) this.testSubject.mergeSegment(0).join()).booleanValue(), "Expected merge to be rejected");
        Assertions.assertFalse(((EventTrackerStatus) this.testSubject.processingStatus().get(0)).isMerging());
    }

    @Timeout(10)
    @Test
    void mergeInvertedSegmentOrder() throws InterruptedException {
        initProcessor(TrackingEventProcessorConfiguration.forParallelProcessing(4));
        this.testSubject.start();
        waitForActiveThreads(4);
        Assertions.assertTrue(((Boolean) this.testSubject.mergeSegment(3).join()).booleanValue(), "Expected merge to succeed");
        ((TokenStore) Mockito.verify(this.tokenStore)).deleteToken("test", 3);
        ((TokenStore) Mockito.verify(this.tokenStore)).storeToken((TrackingToken) Mockito.any(), (String) Mockito.eq("test"), Mockito.eq(1));
    }

    @Test
    void thrownErrorBubblesUp() {
        AtomicReference atomicReference = new AtomicReference();
        EventHandlerInvoker eventHandlerInvoker = (EventHandlerInvoker) Mockito.mock(EventHandlerInvoker.class);
        Mockito.when(Boolean.valueOf(eventHandlerInvoker.canHandle((EventMessage) Mockito.any(), (Segment) Mockito.any()))).thenThrow(new Throwable[]{new TestError()});
        initProcessor(TrackingEventProcessorConfiguration.forSingleThreadedProcessing().andThreadFactory(str -> {
            return runnable -> {
                return new Thread(() -> {
                    try {
                        runnable.run();
                    } catch (Throwable th) {
                        atomicReference.set(th);
                    }
                });
            };
        }), builder -> {
            return builder.eventHandlerInvoker(eventHandlerInvoker);
        });
        this.eventBus.publish(EventTestUtils.createEvents(1));
        this.testSubject.start();
        AssertUtils.assertWithin(2, TimeUnit.SECONDS, () -> {
            Assertions.assertTrue(this.testSubject.isError());
        });
        AssertUtils.assertWithin(15, TimeUnit.SECONDS, () -> {
            Assertions.assertTrue(atomicReference.get() instanceof TestError);
        });
    }

    @Test
    void retrievingStorageIdentifierWillCacheResults() {
        String tokenStoreIdentifier = this.testSubject.getTokenStoreIdentifier();
        InOrder inOrder = Mockito.inOrder(new Object[]{this.mockTransactionManager, this.tokenStore});
        ((TransactionManager) inOrder.verify(this.mockTransactionManager)).fetchInTransaction((Supplier) Mockito.any());
        ((TokenStore) inOrder.verify(this.tokenStore, Mockito.times(1))).retrieveStorageIdentifier();
        String tokenStoreIdentifier2 = this.testSubject.getTokenStoreIdentifier();
        ((TokenStore) Mockito.verify(this.tokenStore, Mockito.times(1))).retrieveStorageIdentifier();
        Assertions.assertEquals(tokenStoreIdentifier, tokenStoreIdentifier2);
    }

    @Test
    void publishedEventsUpdateStatusAndHitChangeListener() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(2);
        ((EventMessageHandler) Mockito.doAnswer(invocationOnMock -> {
            countDownLatch.countDown();
            return null;
        }).when(this.mockHandler)).handle((EventMessage) Mockito.any());
        CountDownLatch countDownLatch2 = new CountDownLatch(2);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        AtomicInteger atomicInteger2 = new AtomicInteger(0);
        AtomicInteger atomicInteger3 = new AtomicInteger(0);
        initProcessor(TrackingEventProcessorConfiguration.forSingleThreadedProcessing().andEventTrackerStatusChangeListener(map -> {
            Assertions.assertEquals(1, map.size());
            EventTrackerStatus eventTrackerStatus = (EventTrackerStatus) map.get(0);
            if (eventTrackerStatus.trackerAdded()) {
                atomicInteger.getAndIncrement();
            } else if (eventTrackerStatus.trackerRemoved()) {
                atomicInteger3.getAndIncrement();
            } else {
                atomicInteger2.getAndIncrement();
            }
            countDownLatch2.countDown();
        }));
        this.testSubject.start();
        Thread.sleep(200L);
        publishEvents(2);
        Assertions.assertTrue(countDownLatch.await(5L, TimeUnit.SECONDS));
        Assertions.assertTrue(countDownLatch2.await(5L, TimeUnit.SECONDS));
        Assertions.assertEquals(1, atomicInteger.get());
        Assertions.assertEquals(1, atomicInteger2.get());
        Assertions.assertEquals(0, atomicInteger3.get());
    }

    @Test
    void publishedEventsUpdateStatusAndHitChangeListenerIncludingPositions() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(2);
        ((EventMessageHandler) Mockito.doAnswer(invocationOnMock -> {
            countDownLatch.countDown();
            return null;
        }).when(this.mockHandler)).handle((EventMessage) Mockito.any());
        final CountDownLatch countDownLatch2 = new CountDownLatch(4);
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        final AtomicInteger atomicInteger2 = new AtomicInteger(0);
        final AtomicInteger atomicInteger3 = new AtomicInteger(0);
        initProcessor(TrackingEventProcessorConfiguration.forSingleThreadedProcessing().andEventTrackerStatusChangeListener(new EventTrackerStatusChangeListener() { // from class: org.axonframework.integrationtests.eventhandling.TrackingEventProcessorTest.4
            public void onEventTrackerStatusChange(Map<Integer, EventTrackerStatus> map) {
                Assertions.assertEquals(1, map.size());
                EventTrackerStatus eventTrackerStatus = map.get(0);
                if (eventTrackerStatus.trackerAdded()) {
                    atomicInteger.getAndIncrement();
                } else if (eventTrackerStatus.trackerRemoved()) {
                    atomicInteger3.getAndIncrement();
                } else {
                    atomicInteger2.getAndIncrement();
                }
                countDownLatch2.countDown();
            }

            public boolean validatePositions() {
                return true;
            }
        }));
        this.testSubject.start();
        Thread.sleep(200L);
        publishEvents(2);
        Assertions.assertTrue(countDownLatch.await(5L, TimeUnit.SECONDS));
        Assertions.assertTrue(countDownLatch2.await(5L, TimeUnit.SECONDS));
        Assertions.assertEquals(1, atomicInteger.get());
        Assertions.assertEquals(3, atomicInteger2.get());
        Assertions.assertEquals(0, atomicInteger3.get());
    }

    @Timeout(15)
    @Test
    void splitAndMergeInfluenceOnChangeListenerInvocations() throws InterruptedException {
        int i = 0;
        int i2 = 1;
        CountDownLatch countDownLatch = new CountDownLatch(4);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        CountDownLatch countDownLatch3 = new CountDownLatch(3);
        initProcessor(TrackingEventProcessorConfiguration.forParallelProcessing(4).andInitialSegmentsCount(2).andEventAvailabilityTimeout(50L, TimeUnit.MILLISECONDS).andEventTrackerStatusChangeListener(map -> {
            Assertions.assertEquals(1, map.size());
            EventTrackerStatus eventTrackerStatus = (EventTrackerStatus) map.values().iterator().next();
            if (eventTrackerStatus.trackerAdded()) {
                countDownLatch.countDown();
            } else if (eventTrackerStatus.trackerRemoved()) {
                countDownLatch3.countDown();
            } else {
                countDownLatch2.countDown();
            }
        }));
        this.tokenStore.initializeTokenSegments(this.testSubject.getName(), 1);
        publishEvents(2);
        this.testSubject.start();
        AssertUtils.assertWithin(5, TimeUnit.SECONDS, () -> {
            Assertions.assertTrue(this.testSubject.processingStatus().containsKey(Integer.valueOf(i)));
        });
        Assertions.assertTrue(((Boolean) this.testSubject.splitSegment(0).join()).booleanValue(), "Expected split to succeed");
        Assertions.assertArrayEquals(new int[]{0, 1}, this.tokenStore.fetchSegments(this.testSubject.getName()));
        AssertUtils.assertWithin(5, TimeUnit.SECONDS, () -> {
            Assertions.assertTrue(this.testSubject.processingStatus().containsKey(Integer.valueOf(i2)));
        });
        AssertUtils.assertWithin(500, TimeUnit.MILLISECONDS, () -> {
            Assertions.assertTrue(((Boolean) this.testSubject.mergeSegment(i).join()).booleanValue(), "Expected merge to succeed");
        });
        Assertions.assertArrayEquals(new int[]{0}, this.tokenStore.fetchSegments(this.testSubject.getName()));
        AssertUtils.assertWithin(5, TimeUnit.SECONDS, () -> {
            Assertions.assertTrue(this.testSubject.processingStatus().containsKey(Integer.valueOf(i)));
        });
        Assertions.assertTrue(countDownLatch.await(5L, TimeUnit.SECONDS));
        Assertions.assertTrue(countDownLatch2.await(5L, TimeUnit.SECONDS));
        Assertions.assertTrue(countDownLatch3.await(5L, TimeUnit.SECONDS));
    }

    @Test
    void caughtUpSetToTrueAfterWaitingForEventAvailabilityTimeout() {
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        final AtomicBoolean atomicBoolean2 = new AtomicBoolean(true);
        TrackingEventProcessorConfiguration andEventAvailabilityTimeout = TrackingEventProcessorConfiguration.forParallelProcessing(2).andInitialSegmentsCount(1).andEventAvailabilityTimeout(100L, TimeUnit.MILLISECONDS);
        EmbeddedEventStore embeddedEventStore = new EmbeddedEventStore(EmbeddedEventStore.builder().storageEngine(new InMemoryEventStorageEngine())) { // from class: org.axonframework.integrationtests.eventhandling.TrackingEventProcessorTest.5
            /* renamed from: openStream, reason: merged with bridge method [inline-methods] */
            public TrackingEventStream m6openStream(TrackingToken trackingToken) {
                final TrackingEventStream openStream = super.openStream(trackingToken);
                return new TrackingEventStream() { // from class: org.axonframework.integrationtests.eventhandling.TrackingEventProcessorTest.5.1
                    public Optional<TrackedEventMessage<?>> peek() {
                        return openStream.peek();
                    }

                    public boolean hasNextAvailable(int i, TimeUnit timeUnit) throws InterruptedException {
                        atomicBoolean.set(true);
                        boolean hasNextAvailable = openStream.hasNextAvailable(i, timeUnit);
                        atomicBoolean2.set(false);
                        Thread.sleep(5L);
                        return hasNextAvailable;
                    }

                    /* renamed from: nextAvailable, reason: merged with bridge method [inline-methods] */
                    public TrackedEventMessage<?> m7nextAvailable() throws InterruptedException {
                        return (TrackedEventMessage) openStream.nextAvailable();
                    }

                    public void close() {
                        openStream.close();
                    }
                };
            }
        };
        initProcessor(andEventAvailabilityTimeout, builder -> {
            return builder.messageSource(embeddedEventStore);
        });
        this.testSubject.start();
        AssertUtils.assertWithin(100, TimeUnit.MILLISECONDS, () -> {
            Assertions.assertTrue(atomicBoolean.get());
        });
        Objects.requireNonNull(atomicBoolean2);
        AssertUtils.assertUntil(atomicBoolean2::get, Duration.ofMillis(10L), () -> {
            EventTrackerStatus eventTrackerStatus = (EventTrackerStatus) this.testSubject.processingStatus().get(0);
            Assertions.assertNotNull(eventTrackerStatus);
            Assertions.assertFalse(eventTrackerStatus.isCaughtUp());
        });
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> {
            Assertions.assertTrue(((EventTrackerStatus) this.testSubject.processingStatus().get(0)).isCaughtUp());
        });
    }

    @Test
    void refuseStartDuringShutdown() throws Exception {
        initProcessor(TrackingEventProcessorConfiguration.forSingleThreadedProcessing().andEventAvailabilityTimeout(1000L, TimeUnit.MILLISECONDS));
        CountDownLatch countDownLatch = new CountDownLatch(1);
        publishEvents(10);
        ((EventHandlerInvoker) Mockito.doAnswer(invocationOnMock -> {
            countDownLatch.countDown();
            Thread.sleep(100L);
            return invocationOnMock.callRealMethod();
        }).when(this.eventHandlerInvoker)).handle((EventMessage) Mockito.any(), (Segment) Mockito.any());
        this.testSubject.start();
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> {
            Assertions.assertFalse(this.testSubject.processingStatus().isEmpty());
        });
        countDownLatch.await();
        this.testSubject.shutdownAsync();
        Assertions.assertTrue(((IllegalStateException) Assertions.assertThrows(IllegalStateException.class, () -> {
            this.testSubject.start();
        })).getMessage().contains("pending shutdown"));
    }

    @Timeout(1000)
    @Test
    void isReplayingWhenNotCaughtUp() throws Exception {
        Mockito.when(Boolean.valueOf(this.mockHandler.supportsReset())).thenReturn(true);
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        CopyOnWriteArrayList copyOnWriteArrayList2 = new CopyOnWriteArrayList();
        int i = 0;
        int i2 = 750;
        AtomicInteger atomicInteger = new AtomicInteger(0);
        ((EventMessageHandler) Mockito.doAnswer(invocationOnMock -> {
            EventMessage eventMessage = (EventMessage) invocationOnMock.getArgument(0);
            if (ReplayToken.isReplay(eventMessage)) {
                copyOnWriteArrayList2.add(eventMessage.getIdentifier());
            }
            copyOnWriteArrayList.add(eventMessage.getIdentifier());
            Thread.sleep(atomicInteger.get());
            return null;
        }).when(this.mockHandler)).handle((EventMessage) Mockito.any());
        this.testSubject.start();
        awaitProcessorStarted();
        this.eventBus.publish(EventTestUtils.createEvents(750));
        Awaitility.await("Handled Events").atMost(Duration.ofSeconds(2L)).pollDelay(Duration.ofMillis(50L)).until(() -> {
            return Boolean.valueOf(copyOnWriteArrayList.size() == i2);
        });
        Assertions.assertEquals(0, copyOnWriteArrayList2.size());
        this.testSubject.shutDown();
        this.testSubject.resetTokens();
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> {
            Assertions.assertTrue(this.testSubject.processingStatus().isEmpty());
        });
        atomicInteger.set(10);
        this.testSubject.start();
        AssertUtils.assertWithin(100, TimeUnit.MILLISECONDS, () -> {
            Assertions.assertFalse(this.testSubject.processingStatus().isEmpty());
            Assertions.assertFalse(((EventTrackerStatus) this.testSubject.processingStatus().get(Integer.valueOf(i))).isCaughtUp());
            Assertions.assertTrue(((EventTrackerStatus) this.testSubject.processingStatus().get(Integer.valueOf(i))).isReplaying());
            Assertions.assertTrue(this.testSubject.isReplaying());
        });
        atomicInteger.set(0);
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> {
            Assertions.assertTrue(((EventTrackerStatus) this.testSubject.processingStatus().get(Integer.valueOf(i))).isCaughtUp());
            Assertions.assertTrue(((EventTrackerStatus) this.testSubject.processingStatus().get(Integer.valueOf(i))).isReplaying());
        });
    }

    @Test
    void processorOnlyTriesToClaimAvailableSegments() {
        this.tokenStore.storeToken(new GlobalSequenceTrackingToken(1L), "test", 0);
        this.tokenStore.storeToken(new GlobalSequenceTrackingToken(2L), "test", 1);
        this.tokenStore.storeToken(new GlobalSequenceTrackingToken(1L), "test", 2);
        this.tokenStore.storeToken(new GlobalSequenceTrackingToken(1L), "test", 3);
        Mockito.when(this.tokenStore.fetchAvailableSegments(this.testSubject.getName())).thenReturn(Collections.singletonList(Segment.computeSegment(2, new int[]{0, 1, 2, 3})));
        this.testSubject.start();
        this.eventBus.publish(EventTestUtils.createEvents(1));
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> {
            Assertions.assertEquals(1, this.testSubject.processingStatus().size());
        });
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> {
            Assertions.assertTrue(this.testSubject.processingStatus().containsKey(2));
        });
        ((TokenStore) Mockito.verify(this.tokenStore, Mockito.never())).fetchToken((String) Mockito.eq(this.testSubject.getName()), Mockito.intThat(num -> {
            return Arrays.asList(0, 1, 3).contains(num);
        }));
    }

    @Test
    void shutdownTerminatesWorkersAfterConfiguredWorkerTerminationTimeout() throws Exception {
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        initProcessor(TrackingEventProcessorConfiguration.forParallelProcessing(2).andInitialSegmentsCount(2).andBatchSize(100).andThreadFactory(str -> {
            return runnable -> {
                Thread thread = new Thread(runnable, str);
                copyOnWriteArrayList.add(thread);
                return thread;
            };
        }).andWorkerTerminationTimeout(50, TimeUnit.MILLISECONDS).andEventAvailabilityTimeout(20L, TimeUnit.SECONDS));
        this.testSubject.start();
        Thread.sleep(500L);
        Assertions.assertEquals(2, copyOnWriteArrayList.size());
        CompletableFuture shutdownAsync = this.testSubject.shutdownAsync();
        ConditionFactory atMost = Awaitility.await().pollDelay(Duration.ofMillis(25L)).atMost(Duration.ofMillis(50 * 4));
        Objects.requireNonNull(shutdownAsync);
        atMost.until(shutdownAsync::isDone);
        Assertions.assertFalse(((Thread) copyOnWriteArrayList.get(0)).isAlive());
        Assertions.assertFalse(((Thread) copyOnWriteArrayList.get(1)).isAlive());
    }

    @Test
    void existingEventsBeforeProcessorStartAreConsideredReplayed() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(3);
        this.testSubject.registerHandlerInterceptor((unitOfWork, interceptorChain) -> {
            unitOfWork.onCleanup(unitOfWork -> {
                countDownLatch.countDown();
            });
            return interceptorChain.proceed();
        });
        this.eventBus.publish(new EventMessage[]{EventTestUtils.createEvent(0L)});
        this.eventBus.publish(new EventMessage[]{EventTestUtils.createEvent(1L)});
        this.eventBus.publish(new EventMessage[]{EventTestUtils.createEvent(2L)});
        this.testSubject.start();
        Assertions.assertTrue(countDownLatch.await(5L, TimeUnit.SECONDS), "Expected Unit of Work to have reached clean up phase");
        TrackingToken fetchToken = this.tokenStore.fetchToken(this.testSubject.getName(), 0);
        Assertions.assertTrue(ReplayToken.isReplay(fetchToken), "Not a replay token: " + fetchToken);
    }

    @Test
    void eventsPublishedAfterProcessorStartAreNotConsideredReplayed() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(2);
        this.testSubject.registerHandlerInterceptor((unitOfWork, interceptorChain) -> {
            unitOfWork.onCleanup(unitOfWork -> {
                countDownLatch.countDown();
            });
            unitOfWork.onCleanup(unitOfWork2 -> {
                countDownLatch2.countDown();
            });
            return interceptorChain.proceed();
        });
        this.eventBus.publish(new EventMessage[]{EventTestUtils.createEvent(0L)});
        ((TokenStore) Mockito.doAnswer(invocationOnMock -> {
            return invocationOnMock.callRealMethod();
        }).when(this.tokenStore)).storeToken((TrackingToken) Mockito.any(), Mockito.anyString(), Mockito.anyInt());
        ((TokenStore) Mockito.doAnswer(invocationOnMock2 -> {
            return invocationOnMock2.callRealMethod();
        }).when(this.tokenStore)).initializeTokenSegments(Mockito.anyString(), Mockito.anyInt(), (TrackingToken) Mockito.any());
        this.testSubject.start();
        countDownLatch.await();
        this.eventBus.publish(new EventMessage[]{EventTestUtils.createEvent(2L)});
        Assertions.assertTrue(countDownLatch2.await(5L, TimeUnit.SECONDS), "Expected Unit of Work to have reached clean up phase");
        TrackingToken fetchToken = this.tokenStore.fetchToken(this.testSubject.getName(), 0);
        Assertions.assertFalse(ReplayToken.isReplay(fetchToken), "Not a replay token: " + fetchToken);
    }

    private void waitForStatus(String str, long j, TimeUnit timeUnit, Predicate<Map<Integer, EventTrackerStatus>> predicate) throws InterruptedException {
        long currentTimeMillis = System.currentTimeMillis() + timeUnit.toMillis(j);
        while (!predicate.test(this.testSubject.processingStatus())) {
            if (currentTimeMillis < System.currentTimeMillis()) {
                Assertions.fail("Expected state '" + str + "'' within " + j + " " + timeUnit.name());
            }
            Thread.sleep(10L);
        }
    }

    private void waitForSegmentStart(int i) throws InterruptedException {
        while (!this.testSubject.processingStatus().containsKey(Integer.valueOf(i))) {
            Thread.sleep(10L);
        }
    }

    private void waitForSegmentRelease(int i) throws InterruptedException {
        while (this.testSubject.processingStatus().containsKey(Integer.valueOf(i))) {
            Thread.sleep(10L);
        }
    }

    private void waitForActiveThreads(int i) throws InterruptedException {
        while (this.testSubject.processingStatus().size() < i) {
            Thread.sleep(10L);
        }
    }

    private EventTrackerStatus waitForProcessingStatus(int i, Predicate<EventTrackerStatus> predicate) throws InterruptedException {
        Object obj = this.testSubject.processingStatus().get(Integer.valueOf(i));
        while (true) {
            EventTrackerStatus eventTrackerStatus = (EventTrackerStatus) obj;
            Optional ofNullable = Optional.ofNullable(eventTrackerStatus);
            Objects.requireNonNull(predicate);
            if (((Boolean) ofNullable.map((v1) -> {
                return r1.test(v1);
            }).orElse(false)).booleanValue()) {
                return eventTrackerStatus;
            }
            Thread.sleep(1L);
            obj = this.testSubject.processingStatus().get(Integer.valueOf(i));
        }
    }

    private void publishEvents(int i) {
        for (int i2 = 0; i2 < i; i2++) {
            this.eventBus.publish(new EventMessage[]{EventTestUtils.createEvent(UUID.randomUUID().toString(), 0L)});
        }
    }
}
