package org.axonframework.integrationtests.eventhandling;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionFactory;
import org.axonframework.common.transaction.NoTransactionManager;
import org.axonframework.eventhandling.DomainEventMessage;
import org.axonframework.eventhandling.EventHandlerInvoker;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.EventMessageHandler;
import org.axonframework.eventhandling.EventTrackerStatus;
import org.axonframework.eventhandling.GlobalSequenceTrackingToken;
import org.axonframework.eventhandling.SimpleEventHandlerInvoker;
import org.axonframework.eventhandling.TrackedEventMessage;
import org.axonframework.eventhandling.TrackingEventProcessor;
import org.axonframework.eventhandling.TrackingEventProcessorConfiguration;
import org.axonframework.eventhandling.TrackingToken;
import org.axonframework.eventhandling.tokenstore.TokenStore;
import org.axonframework.eventhandling.tokenstore.UnableToClaimTokenException;
import org.axonframework.eventhandling.tokenstore.inmemory.InMemoryTokenStore;
import org.axonframework.eventsourcing.eventstore.EmbeddedEventStore;
import org.axonframework.eventsourcing.eventstore.inmemory.InMemoryEventStorageEngine;
import org.axonframework.integrationtests.utils.EventTestUtils;
import org.axonframework.integrationtests.utils.MockException;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.mockito.Mockito;

/* loaded from: input_file:org/axonframework/integrationtests/eventhandling/TrackingEventProcessorTest_MultiThreaded.class */
class TrackingEventProcessorTest_MultiThreaded {
    private EmbeddedEventStore eventBus;
    private TokenStore tokenStore;
    private EventHandlerInvoker eventHandlerInvoker;
    private EventMessageHandler mockHandler;
    private TrackingEventProcessor testSubject;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/axonframework/integrationtests/eventhandling/TrackingEventProcessorTest_MultiThreaded$AcknowledgeByThread.class */
    public static class AcknowledgeByThread {
        Map<String, List<EventMessage<?>>> ackedEventsByThreadMap = new ConcurrentHashMap();

        AcknowledgeByThread() {
        }

        void addMessage(Thread thread, EventMessage<?> eventMessage) {
            this.ackedEventsByThreadMap.computeIfAbsent(thread.getName(), str -> {
                return new ArrayList();
            }).add(eventMessage);
        }

        void assertEventsAckedByMultipleThreads() {
            this.ackedEventsByThreadMap.values().forEach(list -> {
                Assertions.assertFalse(list.isEmpty());
            });
        }

        long eventCount() {
            return this.ackedEventsByThreadMap.values().stream().mapToLong((v0) -> {
                return v0.size();
            }).sum();
        }
    }

    TrackingEventProcessorTest_MultiThreaded() {
    }

    @BeforeEach
    void setUp() {
        this.tokenStore = (TokenStore) Mockito.spy(new InMemoryTokenStore());
        this.mockHandler = (EventMessageHandler) Mockito.mock(EventMessageHandler.class);
        Mockito.when(Boolean.valueOf(this.mockHandler.canHandle((EventMessage) Mockito.any()))).thenReturn(true);
        this.eventHandlerInvoker = SimpleEventHandlerInvoker.builder().eventHandlers(Collections.singletonList(this.mockHandler)).sequencingPolicy(eventMessage -> {
            return eventMessage instanceof DomainEventMessage ? Long.valueOf(((DomainEventMessage) eventMessage).getSequenceNumber()) : eventMessage.getIdentifier();
        }).build();
        this.eventBus = EmbeddedEventStore.builder().storageEngine(new InMemoryEventStorageEngine()).build();
        configureProcessor(TrackingEventProcessorConfiguration.forParallelProcessing(2).andEventAvailabilityTimeout(500L, TimeUnit.MILLISECONDS));
    }

    private void configureProcessor(TrackingEventProcessorConfiguration trackingEventProcessorConfiguration) {
        this.testSubject = TrackingEventProcessor.builder().name("test").eventHandlerInvoker(this.eventHandlerInvoker).messageSource(this.eventBus).tokenStore(this.tokenStore).transactionManager(NoTransactionManager.INSTANCE).trackingEventProcessorConfiguration(trackingEventProcessorConfiguration).build();
    }

    @AfterEach
    void tearDown() {
        this.testSubject.shutDown();
        this.eventBus.shutDown();
    }

    @Test
    void processorWorkerCount() {
        this.testSubject.start();
        Awaitility.await("Start").atMost(Duration.ofSeconds(1L)).pollDelay(Duration.ofMillis(50L)).until(() -> {
            return Boolean.valueOf(this.testSubject.activeProcessorThreads() == 2);
        });
        Assertions.assertEquals(2, this.testSubject.processingStatus().size());
        Assertions.assertTrue(this.testSubject.processingStatus().containsKey(0));
        Assertions.assertTrue(this.testSubject.processingStatus().containsKey(1));
        Awaitility.await("Segment Zero Caught Up").atMost(Duration.ofSeconds(1L)).pollDelay(Duration.ofMillis(50L)).until(() -> {
            return Boolean.valueOf(((EventTrackerStatus) this.testSubject.processingStatus().get(0)).isCaughtUp());
        });
        Awaitility.await("Segment One Caught Up").atMost(Duration.ofSeconds(1L)).pollDelay(Duration.ofMillis(50L)).until(() -> {
            return Boolean.valueOf(((EventTrackerStatus) this.testSubject.processingStatus().get(1)).isCaughtUp());
        });
    }

    @Test
    void processorInitializesMoreTokensThanWorkerCount() {
        configureProcessor(TrackingEventProcessorConfiguration.forParallelProcessing(2).andInitialSegmentsCount(4));
        this.testSubject.start();
        Awaitility.await("Start").atMost(Duration.ofSeconds(1L)).pollDelay(Duration.ofMillis(50L)).until(() -> {
            return Boolean.valueOf(this.testSubject.activeProcessorThreads() == 2);
        });
        int[] fetchSegments = this.tokenStore.fetchSegments(this.testSubject.getName());
        Arrays.sort(fetchSegments);
        Assertions.assertArrayEquals(new int[]{0, 1, 2, 3}, fetchSegments);
    }

    @Test
    void processorInitializesAndUsesSameTokens() {
        int i = 6;
        configureProcessor(TrackingEventProcessorConfiguration.forParallelProcessing(6).andInitialSegmentsCount(6));
        this.testSubject.start();
        Awaitility.await("Start").atMost(Duration.ofSeconds(1L)).pollDelay(Duration.ofMillis(50L)).until(() -> {
            return Boolean.valueOf(this.testSubject.activeProcessorThreads() == i);
        });
        int[] fetchSegments = this.tokenStore.fetchSegments(this.testSubject.getName());
        Arrays.sort(fetchSegments);
        Assertions.assertArrayEquals(new int[]{0, 1, 2, 3, 4, 5}, fetchSegments);
    }

    @Test
    void processorWorkerCountWithMultipleSegments() {
        GlobalSequenceTrackingToken globalSequenceTrackingToken = new GlobalSequenceTrackingToken(1L);
        this.tokenStore.storeToken(globalSequenceTrackingToken, "test", 0);
        GlobalSequenceTrackingToken globalSequenceTrackingToken2 = new GlobalSequenceTrackingToken(2L);
        this.tokenStore.storeToken(globalSequenceTrackingToken2, "test", 1);
        this.testSubject.start();
        Awaitility.await("Start").atMost(Duration.ofSeconds(1L)).pollDelay(Duration.ofMillis(50L)).until(() -> {
            return Boolean.valueOf(this.testSubject.activeProcessorThreads() == 2);
        });
        Assertions.assertEquals(2, this.testSubject.processingStatus().size());
        Assertions.assertTrue(this.testSubject.processingStatus().containsKey(0));
        Assertions.assertTrue(this.testSubject.processingStatus().containsKey(1));
        Awaitility.await("Segment Zero").atMost(Duration.ofMillis(500L)).pollDelay(Duration.ofMillis(50L)).until(() -> {
            return Boolean.valueOf(((EventTrackerStatus) this.testSubject.processingStatus().get(0)).getTrackingToken().equals(globalSequenceTrackingToken));
        });
        Awaitility.await("Segment One").atMost(Duration.ofMillis(500L)).pollDelay(Duration.ofMillis(50L)).until(() -> {
            return Boolean.valueOf(((EventTrackerStatus) this.testSubject.processingStatus().get(1)).getTrackingToken().equals(globalSequenceTrackingToken2));
        });
    }

    @Test
    void processorWorkerCountWithMultipleSegmentsClaimFails() throws InterruptedException {
        this.tokenStore.storeToken(new GlobalSequenceTrackingToken(1L), "test", 0);
        this.tokenStore.storeToken(new GlobalSequenceTrackingToken(2L), "test", 1);
        ((TokenStore) Mockito.doThrow(new Throwable[]{new UnableToClaimTokenException("Failed")}).when(this.tokenStore)).extendClaim("test", 0);
        ((TokenStore) Mockito.doThrow(new Throwable[]{new UnableToClaimTokenException("Failed")}).when(this.tokenStore)).fetchToken("test", 0);
        ((TokenStore) Mockito.doThrow(new Throwable[]{new UnableToClaimTokenException("Failed")}).when(this.tokenStore)).extendClaim("test", 1);
        ((TokenStore) Mockito.doThrow(new Throwable[]{new UnableToClaimTokenException("Failed")}).when(this.tokenStore)).fetchToken("test", 1);
        this.testSubject.start();
        Awaitility.await("Start Fails").atMost(Duration.ofSeconds(1L)).pollDelay(Duration.ofMillis(50L)).until(() -> {
            return Boolean.valueOf(this.testSubject.activeProcessorThreads() == 0);
        });
    }

    @Test
    void processorClaimsSegment() {
        this.tokenStore.storeToken(new GlobalSequenceTrackingToken(1L), "test", 0);
        this.tokenStore.storeToken(new GlobalSequenceTrackingToken(2L), "test", 1);
        this.testSubject.start();
        this.eventBus.publish(EventTestUtils.createEvents(10));
        Awaitility.await("Start").atMost(Duration.ofSeconds(1L)).pollDelay(Duration.ofMillis(50L)).until(() -> {
            return Boolean.valueOf(this.testSubject.activeProcessorThreads() == 2);
        });
        ((TokenStore) Mockito.verify(this.tokenStore, Mockito.atLeast(1))).storeToken((TrackingToken) Mockito.any(), (String) Mockito.eq("test"), Mockito.eq(0));
        ((TokenStore) Mockito.verify(this.tokenStore, Mockito.atLeast(1))).storeToken((TrackingToken) Mockito.any(), (String) Mockito.eq("test"), Mockito.eq(1));
    }

    @Test
    void blacklistingSegmentWillHaveProcessorClaimAnotherOne() {
        this.tokenStore.storeToken(new GlobalSequenceTrackingToken(1L), "test", 0);
        this.tokenStore.storeToken(new GlobalSequenceTrackingToken(2L), "test", 1);
        this.tokenStore.storeToken(new GlobalSequenceTrackingToken(2L), "test", 2);
        this.testSubject.start();
        Awaitility.await("Start").atMost(Duration.ofSeconds(1L)).pollDelay(Duration.ofMillis(50L)).until(() -> {
            return Boolean.valueOf(this.testSubject.availableProcessorThreads() == 0);
        });
        Assertions.assertEquals(new HashSet(Arrays.asList(0, 1)), this.testSubject.processingStatus().keySet());
        this.testSubject.releaseSegment(0);
        Awaitility.await("Post Release").atMost(Duration.ofSeconds(5L)).pollDelay(Duration.ofMillis(50L)).until(() -> {
            return Boolean.valueOf(this.testSubject.processingStatus().containsKey(2));
        });
        Assertions.assertEquals(new HashSet(Arrays.asList(2, 1)), this.testSubject.processingStatus().keySet());
        Awaitility.await("Available Threads").atMost(Duration.ofSeconds(2L)).pollDelay(Duration.ofMillis(50L)).until(() -> {
            return Boolean.valueOf(this.testSubject.availableProcessorThreads() == 0);
        });
    }

    @Test
    void processorWorkerCountWithMultipleSegmentsWithOneThread() {
        this.tokenStore.storeToken(new GlobalSequenceTrackingToken(1L), "test", 0);
        this.tokenStore.storeToken(new GlobalSequenceTrackingToken(2L), "test", 1);
        configureProcessor(TrackingEventProcessorConfiguration.forSingleThreadedProcessing());
        this.testSubject.start();
        Awaitility.await("Start").atMost(Duration.ofSeconds(1L)).pollDelay(Duration.ofMillis(50L)).until(() -> {
            return Boolean.valueOf(this.testSubject.activeProcessorThreads() == 1);
        });
    }

    @Test
    void multiThreadSegmentsExceedsWorkerCount() throws Exception {
        configureProcessor(TrackingEventProcessorConfiguration.forParallelProcessing(2).andInitialSegmentsCount(4));
        CountDownLatch countDownLatch = new CountDownLatch(2);
        AcknowledgeByThread acknowledgeByThread = new AcknowledgeByThread();
        ((EventMessageHandler) Mockito.doAnswer(invocationOnMock -> {
            acknowledgeByThread.addMessage(Thread.currentThread(), (EventMessage) invocationOnMock.getArguments()[0]);
            countDownLatch.countDown();
            return null;
        }).when(this.mockHandler)).handle((EventMessage) Mockito.any());
        this.testSubject.start();
        this.eventBus.publish(EventTestUtils.createEvents(4));
        Assertions.assertTrue(countDownLatch.await(5L, TimeUnit.SECONDS), "Expected Handler to have received 2 out of 4 published events. Got " + acknowledgeByThread.eventCount());
        Assertions.assertEquals(2L, acknowledgeByThread.eventCount());
    }

    @Test
    void multiThreadPublishedEventsGetPassedToHandler() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(2);
        AcknowledgeByThread acknowledgeByThread = new AcknowledgeByThread();
        ((EventMessageHandler) Mockito.doAnswer(invocationOnMock -> {
            acknowledgeByThread.addMessage(Thread.currentThread(), (EventMessage) invocationOnMock.getArguments()[0]);
            countDownLatch.countDown();
            return null;
        }).when(this.mockHandler)).handle((EventMessage) Mockito.any());
        this.testSubject.start();
        this.eventBus.publish(EventTestUtils.createEvents(2));
        Assertions.assertTrue(countDownLatch.await(5L, TimeUnit.SECONDS), "Expected Handler to have received 2 published events");
        acknowledgeByThread.assertEventsAckedByMultipleThreads();
        Assertions.assertEquals(2L, acknowledgeByThread.eventCount());
    }

    @Test
    void multiThreadTokenIsStoredWhenEventIsRead() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(2);
        this.testSubject.registerHandlerInterceptor((unitOfWork, interceptorChain) -> {
            unitOfWork.onCleanup(unitOfWork -> {
                countDownLatch.countDown();
            });
            return interceptorChain.proceed();
        });
        this.testSubject.start();
        this.eventBus.publish(EventTestUtils.createEvents(2));
        Assertions.assertTrue(countDownLatch.await(5L, TimeUnit.SECONDS), "Expected Unit of Work to have reached clean up phase");
        ((TokenStore) Mockito.verify(this.tokenStore, Mockito.atLeastOnce())).storeToken((TrackingToken) Mockito.any(), (String) Mockito.any(), Mockito.anyInt());
        Assertions.assertNotNull(this.tokenStore.fetchToken(this.testSubject.getName(), 0));
        Assertions.assertNotNull(this.tokenStore.fetchToken(this.testSubject.getName(), 1));
    }

    @Test
    void multiThreadContinueFromPreviousToken() throws Exception {
        this.tokenStore = (TokenStore) Mockito.spy(new InMemoryTokenStore());
        this.eventBus.publish(EventTestUtils.createEvents(10));
        TrackedEventMessage trackedEventMessage = (TrackedEventMessage) this.eventBus.openStream((TrackingToken) null).nextAvailable();
        this.tokenStore.storeToken(trackedEventMessage.trackingToken(), this.testSubject.getName(), 0);
        Assertions.assertEquals(trackedEventMessage.trackingToken(), this.tokenStore.fetchToken(this.testSubject.getName(), 0));
        AcknowledgeByThread acknowledgeByThread = new AcknowledgeByThread();
        CountDownLatch countDownLatch = new CountDownLatch(9);
        ((EventMessageHandler) Mockito.doAnswer(invocationOnMock -> {
            acknowledgeByThread.addMessage(Thread.currentThread(), (EventMessage) invocationOnMock.getArguments()[0]);
            countDownLatch.countDown();
            return null;
        }).when(this.mockHandler)).handle((EventMessage) Mockito.any());
        configureProcessor(TrackingEventProcessorConfiguration.forParallelProcessing(2));
        this.testSubject.start();
        Assertions.assertTrue(countDownLatch.await(60L, TimeUnit.SECONDS), "Expected 9 invocations on Event Handler by now, missing " + countDownLatch.getCount());
        acknowledgeByThread.assertEventsAckedByMultipleThreads();
        Assertions.assertEquals(9L, acknowledgeByThread.eventCount());
    }

    @Timeout(10)
    @Test
    void multiThreadContinueAfterPause() throws Exception {
        AcknowledgeByThread acknowledgeByThread = new AcknowledgeByThread();
        List<DomainEventMessage<?>> createEvents = EventTestUtils.createEvents(4);
        CountDownLatch countDownLatch = new CountDownLatch(2);
        ((EventMessageHandler) Mockito.doAnswer(invocationOnMock -> {
            acknowledgeByThread.addMessage(Thread.currentThread(), (EventMessage) invocationOnMock.getArguments()[0]);
            countDownLatch.countDown();
            return null;
        }).when(this.mockHandler)).handle((EventMessage) Mockito.any());
        this.testSubject.start();
        this.eventBus.publish(createEvents.subList(0, 2));
        Assertions.assertTrue(countDownLatch.await(5L, TimeUnit.SECONDS), "Expected 2 invocations on Event Handler by now");
        Assertions.assertEquals(2L, acknowledgeByThread.eventCount());
        Awaitility.await("Segment Zero - Phase 1").atMost(Duration.ofSeconds(2L)).pollDelay(Duration.ofMillis(50L)).until(() -> {
            OptionalLong position = this.tokenStore.fetchToken("test", 0).position();
            return Boolean.valueOf(position.isPresent() && position.getAsLong() == 1);
        });
        Awaitility.await("Segment One - Phase 1").atMost(Duration.ofSeconds(2L)).pollDelay(Duration.ofMillis(50L)).until(() -> {
            OptionalLong position = this.tokenStore.fetchToken("test", 1).position();
            return Boolean.valueOf(position.isPresent() && position.getAsLong() == 1);
        });
        CompletableFuture shutdownAsync = this.testSubject.shutdownAsync();
        ConditionFactory pollDelay = Awaitility.await("Shutdown").atMost(Duration.ofSeconds(2L)).pollDelay(Duration.ofMillis(50L));
        Objects.requireNonNull(shutdownAsync);
        pollDelay.until(shutdownAsync::isDone);
        CountDownLatch countDownLatch2 = new CountDownLatch(2);
        ((EventMessageHandler) Mockito.doAnswer(invocationOnMock2 -> {
            acknowledgeByThread.addMessage(Thread.currentThread(), (EventMessage) invocationOnMock2.getArguments()[0]);
            countDownLatch2.countDown();
            return null;
        }).when(this.mockHandler)).handle((EventMessage) Mockito.any());
        this.eventBus.publish(createEvents.subList(2, 4));
        Assertions.assertEquals(2L, countDownLatch2.getCount());
        this.testSubject.start();
        Assertions.assertTrue(countDownLatch2.await(5L, TimeUnit.SECONDS), "Expected 4 invocations on Event Handler by now");
        Assertions.assertEquals(4L, acknowledgeByThread.eventCount());
        GlobalSequenceTrackingToken globalSequenceTrackingToken = new GlobalSequenceTrackingToken(3L);
        Awaitility.await("Segment Zero - Phase 2").atMost(Duration.ofSeconds(2L)).pollDelay(Duration.ofMillis(50L)).until(() -> {
            return Boolean.valueOf(this.tokenStore.fetchToken("test", 0).equals(globalSequenceTrackingToken));
        });
        Awaitility.await("Segment One - Phase 2").atMost(Duration.ofSeconds(2L)).pollDelay(Duration.ofMillis(50L)).until(() -> {
            return Boolean.valueOf(this.tokenStore.fetchToken("test", 1).equals(globalSequenceTrackingToken));
        });
    }

    @Test
    void multiThreadProcessorGoesToRetryModeWhenOpenStreamFails() throws Exception {
        this.eventBus = (EmbeddedEventStore) Mockito.spy(this.eventBus);
        this.tokenStore = new InMemoryTokenStore();
        this.eventBus.publish(EventTestUtils.createEvents(5));
        Mockito.when(this.eventBus.openStream((TrackingToken) Mockito.any())).thenThrow(new Throwable[]{new MockException()}).thenCallRealMethod();
        AcknowledgeByThread acknowledgeByThread = new AcknowledgeByThread();
        CountDownLatch countDownLatch = new CountDownLatch(5);
        ((EventMessageHandler) Mockito.doAnswer(invocationOnMock -> {
            acknowledgeByThread.addMessage(Thread.currentThread(), (EventMessage) invocationOnMock.getArguments()[0]);
            countDownLatch.countDown();
            return null;
        }).when(this.mockHandler)).handle((EventMessage) Mockito.any());
        this.testSubject = TrackingEventProcessor.builder().name("test").eventHandlerInvoker(this.eventHandlerInvoker).messageSource(this.eventBus).tokenStore(this.tokenStore).transactionManager(NoTransactionManager.INSTANCE).build();
        this.testSubject.start();
        Assertions.assertTrue(countDownLatch.await(10L, TimeUnit.SECONDS), "Expected 5 invocations on Event Handler by now");
        Assertions.assertEquals(5L, acknowledgeByThread.eventCount());
        ((EmbeddedEventStore) Mockito.verify(this.eventBus, Mockito.times(2))).openStream((TrackingToken) Mockito.any());
    }

    @Test
    void multiThreadTokensAreStoredWhenUnitOfWorkIsRolledBackOnSecondEvent() throws Exception {
        List<DomainEventMessage<?>> createEvents = EventTestUtils.createEvents(2);
        CountDownLatch countDownLatch = new CountDownLatch(2);
        this.testSubject.registerHandlerInterceptor((unitOfWork, interceptorChain) -> {
            unitOfWork.onCommit(unitOfWork -> {
                if (unitOfWork.getMessage().equals(createEvents.get(1))) {
                    throw new MockException();
                }
            });
            return interceptorChain.proceed();
        });
        this.testSubject.registerHandlerInterceptor((unitOfWork2, interceptorChain2) -> {
            unitOfWork2.onCleanup(unitOfWork2 -> {
                countDownLatch.countDown();
            });
            return interceptorChain2.proceed();
        });
        this.testSubject.start();
        this.eventBus.publish(createEvents);
        Assertions.assertTrue(countDownLatch.await(5L, TimeUnit.SECONDS), "Expected Unit of Work to have reached clean up phase");
        Assertions.assertNotNull(this.tokenStore.fetchToken(this.testSubject.getName(), 0));
        Assertions.assertNotNull(this.tokenStore.fetchToken(this.testSubject.getName(), 1));
    }

    @Test
    void processorIncrementAndDecrementCorrectly() throws InterruptedException {
        configureProcessor(TrackingEventProcessorConfiguration.forParallelProcessing(2).andInitialSegmentsCount(4));
        this.testSubject.start();
        this.testSubject.shutDown();
        this.testSubject.start();
        this.testSubject.shutDown();
        this.testSubject.start();
        Awaitility.await("Start").atMost(Duration.ofSeconds(1L)).pollDelay(Duration.ofMillis(50L)).until(() -> {
            return Boolean.valueOf(this.testSubject.activeProcessorThreads() == 2);
        });
    }
}
