package org.axonframework.config;

import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.axonframework.common.ReflectionUtils;
import org.axonframework.common.Registration;
import org.axonframework.common.transaction.NoTransactionManager;
import org.axonframework.common.transaction.Transaction;
import org.axonframework.common.transaction.TransactionManager;
import org.axonframework.config.EventProcessingConfigurer;
import org.axonframework.eventhandling.AbstractEventProcessor;
import org.axonframework.eventhandling.AnnotationEventHandlerAdapter;
import org.axonframework.eventhandling.ErrorContext;
import org.axonframework.eventhandling.ErrorHandler;
import org.axonframework.eventhandling.EventHandler;
import org.axonframework.eventhandling.EventHandlerInvoker;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.EventMessageHandler;
import org.axonframework.eventhandling.EventProcessor;
import org.axonframework.eventhandling.EventProcessorSpanFactory;
import org.axonframework.eventhandling.GenericEventMessage;
import org.axonframework.eventhandling.ListenerInvocationErrorHandler;
import org.axonframework.eventhandling.MultiEventHandlerInvoker;
import org.axonframework.eventhandling.PropagatingErrorHandler;
import org.axonframework.eventhandling.SimpleEventBus;
import org.axonframework.eventhandling.SimpleEventHandlerInvoker;
import org.axonframework.eventhandling.SubscribingEventProcessor;
import org.axonframework.eventhandling.TrackingEventProcessor;
import org.axonframework.eventhandling.TrackingEventProcessorConfiguration;
import org.axonframework.eventhandling.TrackingToken;
import org.axonframework.eventhandling.async.FullConcurrencyPolicy;
import org.axonframework.eventhandling.async.SequentialPolicy;
import org.axonframework.eventhandling.deadletter.DeadLetteringEventHandlerInvoker;
import org.axonframework.eventhandling.pooled.PooledStreamingEventProcessor;
import org.axonframework.eventhandling.tokenstore.inmemory.InMemoryTokenStore;
import org.axonframework.eventsourcing.eventstore.EmbeddedEventStore;
import org.axonframework.eventsourcing.eventstore.EventStore;
import org.axonframework.eventsourcing.eventstore.inmemory.InMemoryEventStorageEngine;
import org.axonframework.lifecycle.LifecycleHandlerInvocationException;
import org.axonframework.messaging.InterceptorChain;
import org.axonframework.messaging.MessageHandlerInterceptor;
import org.axonframework.messaging.SubscribableMessageSource;
import org.axonframework.messaging.deadletter.Decisions;
import org.axonframework.messaging.deadletter.EnqueuePolicy;
import org.axonframework.messaging.deadletter.SequencedDeadLetterQueue;
import org.axonframework.messaging.interceptors.CorrelationDataInterceptor;
import org.axonframework.messaging.unitofwork.RollbackConfigurationType;
import org.axonframework.messaging.unitofwork.UnitOfWork;
import org.axonframework.tracing.TestSpanFactory;
import org.axonframework.utils.AssertUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;

@ExtendWith({MockitoExtension.class})
/* loaded from: input_file:org/axonframework/config/EventProcessingModuleTest.class */
class EventProcessingModuleTest {
    private EventStore eventStoreOne;
    private EventStore eventStoreTwo;
    private Configurer configurer;

    @ProcessingGroup("processingGroup")
    /* loaded from: input_file:org/axonframework/config/EventProcessingModuleTest$AnnotatedBean.class */
    public static class AnnotatedBean {
    }

    /* loaded from: input_file:org/axonframework/config/EventProcessingModuleTest$AnnotatedBeanSubclass.class */
    public static class AnnotatedBeanSubclass extends AnnotatedBean {
    }

    @ProcessingGroup("my-saga-processing-group")
    /* loaded from: input_file:org/axonframework/config/EventProcessingModuleTest$CustomSaga.class */
    private static class CustomSaga {
        private CustomSaga() {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    @ProcessingGroup("pooled-streaming")
    /* loaded from: input_file:org/axonframework/config/EventProcessingModuleTest$PooledStreamingEventHandler.class */
    public static class PooledStreamingEventHandler {
        private PooledStreamingEventHandler() {
        }

        @EventHandler
        public void handle(String str) {
        }
    }

    /* loaded from: input_file:org/axonframework/config/EventProcessingModuleTest$StubErrorHandler.class */
    private static class StubErrorHandler implements ErrorHandler, ListenerInvocationErrorHandler {
        private final CountDownLatch latch;
        private final AtomicInteger errorCounter;

        private StubErrorHandler(int i) {
            this.errorCounter = new AtomicInteger();
            this.latch = new CountDownLatch(i);
        }

        public void handleError(@Nonnull ErrorContext errorContext) {
            this.errorCounter.incrementAndGet();
            this.latch.countDown();
        }

        public void onError(@Nonnull Exception exc, @Nonnull EventMessage<?> eventMessage, @Nonnull EventMessageHandler eventMessageHandler) {
            this.errorCounter.incrementAndGet();
            this.latch.countDown();
        }

        public int getErrorCounter() {
            return this.errorCounter.get();
        }

        public boolean await(long j, TimeUnit timeUnit) throws InterruptedException {
            return this.latch.await(j, timeUnit);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/axonframework/config/EventProcessingModuleTest$StubEventProcessor.class */
    public static class StubEventProcessor implements EventProcessor {
        private final String name;
        private final EventHandlerInvoker eventHandlerInvoker;
        private final List<MessageHandlerInterceptor<? super EventMessage<?>>> interceptors = new ArrayList();

        public StubEventProcessor(String str, EventHandlerInvoker eventHandlerInvoker) {
            this.name = str;
            this.eventHandlerInvoker = eventHandlerInvoker;
        }

        public String getName() {
            return this.name;
        }

        public EventHandlerInvoker getEventHandlerInvoker() {
            return this.eventHandlerInvoker;
        }

        public List<?> getEventHandlers() {
            return (List) ((SimpleEventHandlerInvoker) getEventHandlerInvoker().delegates().get(0)).eventHandlers().stream().map(eventMessageHandler -> {
                try {
                    return ReflectionUtils.getFieldValue(AnnotationEventHandlerAdapter.class.getDeclaredField("annotatedEventListener"), eventMessageHandler);
                } catch (NoSuchFieldException e) {
                    return null;
                }
            }).collect(Collectors.toList());
        }

        public Registration registerHandlerInterceptor(@Nonnull MessageHandlerInterceptor<? super EventMessage<?>> messageHandlerInterceptor) {
            this.interceptors.add(messageHandlerInterceptor);
            return () -> {
                return this.interceptors.remove(messageHandlerInterceptor);
            };
        }

        public List<MessageHandlerInterceptor<? super EventMessage<?>>> getHandlerInterceptors() {
            return this.interceptors;
        }

        public void start() {
        }

        public void shutDown() {
        }

        public boolean isRunning() {
            return true;
        }

        public boolean isError() {
            return false;
        }
    }

    /* loaded from: input_file:org/axonframework/config/EventProcessingModuleTest$StubInterceptor.class */
    private static class StubInterceptor implements MessageHandlerInterceptor<EventMessage<?>> {
        private StubInterceptor() {
        }

        public Object handle(@Nonnull UnitOfWork<? extends EventMessage<?>> unitOfWork, @Nonnull InterceptorChain interceptorChain) throws Exception {
            return interceptorChain.proceed();
        }
    }

    /* loaded from: input_file:org/axonframework/config/EventProcessingModuleTest$StubTransactionManager.class */
    private static class StubTransactionManager implements TransactionManager {
        private final CountDownLatch transactionCommitted;

        private StubTransactionManager(CountDownLatch countDownLatch) {
            this.transactionCommitted = countDownLatch;
        }

        public Transaction startTransaction() {
            return new Transaction() { // from class: org.axonframework.config.EventProcessingModuleTest.StubTransactionManager.1
                public void commit() {
                    StubTransactionManager.this.transactionCommitted.countDown();
                }

                public void rollback() {
                }
            };
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    @ProcessingGroup("subscribing")
    /* loaded from: input_file:org/axonframework/config/EventProcessingModuleTest$SubscribingEventHandler.class */
    public static class SubscribingEventHandler {
        private SubscribingEventHandler() {
        }

        @EventHandler
        public void handle(Integer num, UnitOfWork<?> unitOfWork) {
            throw new IllegalStateException();
        }

        @EventHandler
        public void handle(Boolean bool) {
            throw new IllegalStateException();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    @ProcessingGroup("tracking")
    /* loaded from: input_file:org/axonframework/config/EventProcessingModuleTest$TrackingEventHandler.class */
    public static class TrackingEventHandler {
        private TrackingEventHandler() {
        }

        @EventHandler
        public void handle(String str) {
        }

        @EventHandler
        public void handle(Integer num, UnitOfWork<?> unitOfWork) {
            throw new IllegalStateException();
        }

        @EventHandler
        public void handle(Boolean bool) {
            throw new IllegalStateException();
        }
    }

    EventProcessingModuleTest() {
    }

    @BeforeEach
    void setUp() {
        this.configurer = DefaultConfigurer.defaultConfiguration();
        this.eventStoreOne = (EventStore) Mockito.spy(EmbeddedEventStore.builder().storageEngine(new InMemoryEventStorageEngine()).build());
        this.eventStoreTwo = (EventStore) Mockito.spy(EmbeddedEventStore.builder().storageEngine(new InMemoryEventStorageEngine()).build());
        this.eventStoreOne.publish(new EventMessage[]{GenericEventMessage.asEventMessage("test1")});
        this.eventStoreTwo.publish(new EventMessage[]{GenericEventMessage.asEventMessage("test2")});
    }

    @Test
    void assignmentRules() {
        HashMap hashMap = new HashMap();
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        AnnotatedBean annotatedBean = new AnnotatedBean();
        AnnotatedBeanSubclass annotatedBeanSubclass = new AnnotatedBeanSubclass();
        String str = "concurrent";
        this.configurer.eventProcessing().registerEventProcessorFactory((str2, configuration, eventHandlerInvoker) -> {
            StubEventProcessor stubEventProcessor = new StubEventProcessor(str2, eventHandlerInvoker);
            hashMap.put(str2, stubEventProcessor);
            return stubEventProcessor;
        }).assignHandlerInstancesMatching("java.util.concurrent", str::equals).registerEventHandler(configuration2 -> {
            return new Object();
        }).registerEventHandler(configuration3 -> {
            return "";
        }).registerEventHandler(configuration4 -> {
            return "concurrent";
        }).registerEventHandler(configuration5 -> {
            return concurrentHashMap;
        }).registerEventHandler(configuration6 -> {
            return annotatedBean;
        }).registerEventHandler(configuration7 -> {
            return annotatedBeanSubclass;
        });
        Assertions.assertEquals(3, this.configurer.start().eventProcessingConfiguration().eventProcessors().size());
        Assertions.assertTrue(((StubEventProcessor) hashMap.get("java.util.concurrent")).getEventHandlers().contains("concurrent"));
        Assertions.assertTrue(((StubEventProcessor) hashMap.get("java.util.concurrent")).getEventHandlers().contains(concurrentHashMap));
        Assertions.assertTrue(((StubEventProcessor) hashMap.get("java.lang")).getEventHandlers().contains(""));
        Assertions.assertTrue(((StubEventProcessor) hashMap.get("processingGroup")).getEventHandlers().contains(annotatedBean));
        Assertions.assertTrue(((StubEventProcessor) hashMap.get("processingGroup")).getEventHandlers().contains(annotatedBeanSubclass));
    }

    @Test
    void byTypeAssignmentRules() {
        HashMap hashMap = new HashMap();
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        AnnotatedBean annotatedBean = new AnnotatedBean();
        AnnotatedBeanSubclass annotatedBeanSubclass = new AnnotatedBeanSubclass();
        EventProcessingConfigurer registerEventProcessorFactory = this.configurer.eventProcessing().registerEventProcessorFactory((str, configuration, eventHandlerInvoker) -> {
            StubEventProcessor stubEventProcessor = new StubEventProcessor(str, eventHandlerInvoker);
            hashMap.put(str, stubEventProcessor);
            return stubEventProcessor;
        });
        Class<ConcurrentHashMap> cls = ConcurrentHashMap.class;
        ConcurrentHashMap.class.getClass();
        registerEventProcessorFactory.assignHandlerTypesMatching("special", cls::isAssignableFrom).registerEventHandler(configuration2 -> {
            return new Object();
        }).registerEventHandler(configuration3 -> {
            return "";
        }).registerEventHandler(configuration4 -> {
            return "concurrent";
        }).registerEventHandler(configuration5 -> {
            return concurrentHashMap;
        }).registerEventHandler(configuration6 -> {
            return annotatedBean;
        }).registerEventHandler(configuration7 -> {
            return annotatedBeanSubclass;
        });
        Assertions.assertEquals(3, this.configurer.start().eventProcessingConfiguration().eventProcessors().size());
        Assertions.assertTrue(((StubEventProcessor) hashMap.get("java.lang")).getEventHandlers().contains("concurrent"));
        Assertions.assertTrue(((StubEventProcessor) hashMap.get("special")).getEventHandlers().contains(concurrentHashMap));
        Assertions.assertTrue(((StubEventProcessor) hashMap.get("java.lang")).getEventHandlers().contains(""));
        Assertions.assertTrue(((StubEventProcessor) hashMap.get("processingGroup")).getEventHandlers().contains(annotatedBean));
        Assertions.assertTrue(((StubEventProcessor) hashMap.get("processingGroup")).getEventHandlers().contains(annotatedBeanSubclass));
    }

    @Test
    void processorsDefaultToSubscribingWhenUsingSimpleEventBus() {
        EventProcessingConfiguration eventProcessingConfiguration = DefaultConfigurer.defaultConfiguration().configureEventBus(configuration -> {
            return SimpleEventBus.builder().build();
        }).eventProcessing(eventProcessingConfigurer -> {
            eventProcessingConfigurer.registerEventHandler(configuration2 -> {
                return new SubscribingEventHandler();
            }).registerEventHandler(configuration3 -> {
                return new TrackingEventHandler();
            });
        }).start().eventProcessingConfiguration();
        Assertions.assertTrue(eventProcessingConfiguration.eventProcessor("subscribing").isPresent());
        Assertions.assertTrue(((Boolean) eventProcessingConfiguration.eventProcessor("subscribing").map(eventProcessor -> {
            return Boolean.valueOf(eventProcessor instanceof SubscribingEventProcessor);
        }).orElse(false)).booleanValue());
        Assertions.assertTrue(eventProcessingConfiguration.eventProcessor("tracking").isPresent());
        Assertions.assertTrue(((Boolean) eventProcessingConfiguration.eventProcessor("tracking").map(eventProcessor2 -> {
            return Boolean.valueOf(eventProcessor2 instanceof SubscribingEventProcessor);
        }).orElse(false)).booleanValue());
    }

    @Test
    void assigningATrackingProcessorFailsWhenUsingSimpleEventBus() {
        Configurer eventProcessing = DefaultConfigurer.defaultConfiguration().configureEventBus(configuration -> {
            return SimpleEventBus.builder().build();
        }).eventProcessing(eventProcessingConfigurer -> {
            eventProcessingConfigurer.registerEventHandler(configuration2 -> {
                return new SubscribingEventHandler();
            }).registerEventHandler(configuration3 -> {
                return new TrackingEventHandler();
            }).registerTrackingEventProcessor("tracking");
        });
        eventProcessing.getClass();
        Assertions.assertThrows(LifecycleHandlerInvocationException.class, eventProcessing::start);
    }

    @Test
    void assignmentRulesOverrideThoseWithLowerPriority() {
        HashMap hashMap = new HashMap();
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        String str = "concurrent";
        String str2 = "concurrent";
        this.configurer.eventProcessing().registerEventProcessorFactory((str3, configuration, eventHandlerInvoker) -> {
            StubEventProcessor stubEventProcessor = new StubEventProcessor(str3, eventHandlerInvoker);
            hashMap.put(str3, stubEventProcessor);
            return stubEventProcessor;
        }).assignHandlerInstancesMatching("java.util.concurrent", str::equals).assignHandlerInstancesMatching("java.util.concurrent2", 1, str2::equals).registerEventHandler(configuration2 -> {
            return new Object();
        }).registerEventHandler(configuration3 -> {
            return "";
        }).registerEventHandler(configuration4 -> {
            return "concurrent";
        }).registerEventHandler(configuration5 -> {
            return concurrentHashMap;
        });
        Assertions.assertEquals(3, this.configurer.start().eventProcessingConfiguration().eventProcessors().size());
        Assertions.assertTrue(((StubEventProcessor) hashMap.get("java.util.concurrent2")).getEventHandlers().contains("concurrent"));
        Assertions.assertTrue(((StubEventProcessor) hashMap.get("java.util.concurrent2")).getHandlerInterceptors().iterator().next() instanceof CorrelationDataInterceptor);
        Assertions.assertTrue(((StubEventProcessor) hashMap.get("java.util.concurrent")).getEventHandlers().contains(concurrentHashMap));
        Assertions.assertTrue(((StubEventProcessor) hashMap.get("java.util.concurrent")).getHandlerInterceptors().iterator().next() instanceof CorrelationDataInterceptor);
        Assertions.assertTrue(((StubEventProcessor) hashMap.get("java.lang")).getEventHandlers().contains(""));
        Assertions.assertTrue(((StubEventProcessor) hashMap.get("java.lang")).getHandlerInterceptors().iterator().next() instanceof CorrelationDataInterceptor);
    }

    @Test
    void defaultAssignToKeepsAnnotationScanning() {
        HashMap hashMap = new HashMap();
        AnnotatedBean annotatedBean = new AnnotatedBean();
        Object obj = new Object();
        String str = "concurrent";
        this.configurer.eventProcessing().registerEventProcessorFactory((str2, configuration, eventHandlerInvoker) -> {
            StubEventProcessor stubEventProcessor = new StubEventProcessor(str2, eventHandlerInvoker);
            hashMap.put(str2, stubEventProcessor);
            return stubEventProcessor;
        }).assignHandlerInstancesMatching("java.util.concurrent", str::equals).byDefaultAssignTo("default").registerEventHandler(configuration2 -> {
            return obj;
        }).registerEventHandler(configuration3 -> {
            return "concurrent";
        }).registerEventHandler(configuration4 -> {
            return annotatedBean;
        });
        this.configurer.start();
        Assertions.assertEquals(3, hashMap.size());
        Assertions.assertTrue(((StubEventProcessor) hashMap.get("default")).getEventHandlers().contains(obj));
        Assertions.assertTrue(((StubEventProcessor) hashMap.get("java.util.concurrent")).getEventHandlers().contains("concurrent"));
        Assertions.assertTrue(((StubEventProcessor) hashMap.get("processingGroup")).getEventHandlers().contains(annotatedBean));
    }

    @Test
    void typeAssignmentWithCustomDefault() {
        EventProcessingConfigurer eventProcessing = this.configurer.eventProcessing();
        Class<String> cls = String.class;
        String.class.getClass();
        eventProcessing.assignHandlerTypesMatching("myGroup", (v1) -> {
            return r2.equals(v1);
        }).byDefaultAssignHandlerTypesTo(cls2 -> {
            return Object.class.equals(cls2) ? "obj" : cls2.getSimpleName() + "CustomProcessor";
        }).registerSaga(Object.class).registerSaga(ConcurrentMap.class).registerSaga(String.class).registerEventHandler(configuration -> {
            return new HashMap();
        });
        EventProcessingConfiguration eventProcessingConfiguration = this.configurer.start().eventProcessingConfiguration();
        Assertions.assertEquals("myGroup", eventProcessingConfiguration.sagaProcessingGroup(String.class));
        Assertions.assertEquals("obj", eventProcessingConfiguration.sagaProcessingGroup(Object.class));
        Assertions.assertEquals("ConcurrentMapCustomProcessor", eventProcessingConfiguration.sagaProcessingGroup(ConcurrentMap.class));
        Assertions.assertEquals(4, eventProcessingConfiguration.eventProcessors().size());
        Assertions.assertTrue(eventProcessingConfiguration.eventProcessor("myGroup").isPresent());
        Assertions.assertTrue(eventProcessingConfiguration.eventProcessor("obj").isPresent());
        Assertions.assertTrue(eventProcessingConfiguration.eventProcessor("java.util").isPresent());
        Assertions.assertTrue(eventProcessingConfiguration.eventProcessor("ConcurrentMapCustomProcessor").isPresent());
    }

    @Test
    void typeAssignment() {
        this.configurer.eventProcessing().assignHandlerTypesMatching("myGroup", cls -> {
            return "java.lang".equals(cls.getPackage().getName());
        }).registerSaga(Object.class).registerSaga(ConcurrentMap.class).registerSaga(String.class).registerEventHandler(configuration -> {
            return new HashMap();
        });
        EventProcessingConfiguration eventProcessingConfiguration = this.configurer.start().eventProcessingConfiguration();
        Assertions.assertEquals("myGroup", eventProcessingConfiguration.sagaProcessingGroup(String.class));
        Assertions.assertEquals("myGroup", eventProcessingConfiguration.sagaProcessingGroup(Object.class));
        Assertions.assertEquals("ConcurrentMapProcessor", eventProcessingConfiguration.sagaProcessingGroup(ConcurrentMap.class));
        Assertions.assertEquals(3, eventProcessingConfiguration.eventProcessors().size());
        Assertions.assertTrue(eventProcessingConfiguration.eventProcessor("myGroup").isPresent());
        Assertions.assertTrue(eventProcessingConfiguration.eventProcessor("java.util").isPresent());
        Assertions.assertTrue(eventProcessingConfiguration.eventProcessor("ConcurrentMapProcessor").isPresent());
    }

    @Test
    void assignSequencingPolicy() throws NoSuchFieldException {
        Object obj = new Object();
        Object obj2 = new Object();
        SequentialPolicy sequentialPolicy = new SequentialPolicy();
        FullConcurrencyPolicy fullConcurrencyPolicy = new FullConcurrencyPolicy();
        EventProcessingConfigurer registerEventHandler = this.configurer.eventProcessing().registerEventHandler(configuration -> {
            return obj;
        }).registerEventHandler(configuration2 -> {
            return obj2;
        });
        obj2.getClass();
        registerEventHandler.assignHandlerInstancesMatching("special", obj2::equals).byDefaultAssignTo("default").registerDefaultSequencingPolicy(configuration3 -> {
            return sequentialPolicy;
        }).registerSequencingPolicy("special", configuration4 -> {
            return fullConcurrencyPolicy;
        });
        Configuration start = this.configurer.start();
        Optional eventProcessor = start.eventProcessingConfiguration().eventProcessor("default", AbstractEventProcessor.class);
        Assertions.assertTrue(eventProcessor.isPresent());
        AbstractEventProcessor abstractEventProcessor = (AbstractEventProcessor) eventProcessor.get();
        Optional eventProcessor2 = start.eventProcessingConfiguration().eventProcessor("special", AbstractEventProcessor.class);
        Assertions.assertTrue(eventProcessor2.isPresent());
        AbstractEventProcessor abstractEventProcessor2 = (AbstractEventProcessor) eventProcessor2.get();
        MultiEventHandlerInvoker multiEventHandlerInvoker = (MultiEventHandlerInvoker) ReflectionUtils.getFieldValue(AbstractEventProcessor.class.getDeclaredField("eventHandlerInvoker"), abstractEventProcessor);
        MultiEventHandlerInvoker multiEventHandlerInvoker2 = (MultiEventHandlerInvoker) ReflectionUtils.getFieldValue(AbstractEventProcessor.class.getDeclaredField("eventHandlerInvoker"), abstractEventProcessor2);
        Assertions.assertEquals(sequentialPolicy, ((SimpleEventHandlerInvoker) multiEventHandlerInvoker.delegates().get(0)).getSequencingPolicy());
        Assertions.assertEquals(fullConcurrencyPolicy, ((SimpleEventHandlerInvoker) multiEventHandlerInvoker2.delegates().get(0)).getSequencingPolicy());
    }

    @Test
    void createSubscribingEventProcessorIfSubscribableMessageSourceDefinitionBuilderPresent(@Mock EventProcessingConfigurer.SubscribableMessageSourceDefinitionBuilder subscribableMessageSourceDefinitionBuilder, @Mock SubscribableMessageSourceDefinition<EventMessage<?>> subscribableMessageSourceDefinition, @Mock SubscribableMessageSource subscribableMessageSource) {
        Mockito.when(subscribableMessageSourceDefinitionBuilder.build("pooled-streaming")).thenReturn(subscribableMessageSourceDefinition);
        Mockito.when(subscribableMessageSourceDefinitionBuilder.build("tracking")).thenReturn(subscribableMessageSourceDefinition);
        Mockito.when(subscribableMessageSourceDefinition.create((Configuration) Mockito.any())).thenReturn(subscribableMessageSource);
        this.configurer.eventProcessing().registerEventHandler(configuration -> {
            return new PooledStreamingEventHandler();
        }).registerEventHandler(configuration2 -> {
            return new TrackingEventHandler();
        }).usingSubscribingEventProcessors(subscribableMessageSourceDefinitionBuilder);
        Map eventProcessors = this.configurer.start().eventProcessingConfiguration().eventProcessors();
        eventProcessors.forEach((str, eventProcessor) -> {
        });
        Assertions.assertEquals(2, eventProcessors.size());
        ((EventProcessingConfigurer.SubscribableMessageSourceDefinitionBuilder) Mockito.verify(subscribableMessageSourceDefinitionBuilder, Mockito.times(2))).build(Mockito.anyString());
    }

    @Test
    void assignInterceptors() {
        StubInterceptor stubInterceptor = new StubInterceptor();
        StubInterceptor stubInterceptor2 = new StubInterceptor();
        String str = "concurrent";
        this.configurer.eventProcessing().registerEventProcessor("default", (str2, configuration, eventHandlerInvoker) -> {
            return new StubEventProcessor(str2, eventHandlerInvoker);
        }).byDefaultAssignTo("default").assignHandlerInstancesMatching("concurrent", 1, str::equals).registerEventHandler(configuration2 -> {
            return new Object();
        }).registerEventHandler(configuration3 -> {
            return "concurrent";
        }).registerHandlerInterceptor("default", configuration4 -> {
            return stubInterceptor;
        }).registerDefaultHandlerInterceptor((configuration5, str3) -> {
            return stubInterceptor2;
        });
        Optional eventProcessor = this.configurer.start().eventProcessingConfiguration().eventProcessor("default");
        Assertions.assertTrue(eventProcessor.isPresent());
        Assertions.assertEquals(3, ((EventProcessor) eventProcessor.get()).getHandlerInterceptors().size());
    }

    @Test
    void configureMonitor() throws Exception {
        MessageCollectingMonitor messageCollectingMonitor = new MessageCollectingMonitor();
        MessageCollectingMonitor messageCollectingMonitor2 = new MessageCollectingMonitor(1);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        buildComplexEventHandlingConfiguration(countDownLatch);
        this.configurer.eventProcessing().registerMessageMonitor("subscribing", configuration -> {
            return messageCollectingMonitor;
        }).registerMessageMonitor("tracking", configuration2 -> {
            return messageCollectingMonitor2;
        });
        Configuration start = this.configurer.start();
        try {
            start.eventBus().publish(new EventMessage[]{new GenericEventMessage("test")});
            Assertions.assertEquals(1, messageCollectingMonitor.getMessages().size());
            Assertions.assertTrue(messageCollectingMonitor2.await(10L, TimeUnit.SECONDS));
            Assertions.assertTrue(countDownLatch.await(10L, TimeUnit.SECONDS));
            start.shutdown();
        } catch (Throwable th) {
            start.shutdown();
            throw th;
        }
    }

    @Test
    void configureSpanFactory() {
        TestSpanFactory testSpanFactory = new TestSpanFactory();
        buildComplexEventHandlingConfiguration(new CountDownLatch(1));
        this.configurer.configureSpanFactory(configuration -> {
            return testSpanFactory;
        });
        Configuration start = this.configurer.start();
        try {
            EventMessage genericEventMessage = new GenericEventMessage("test");
            start.eventBus().publish(new EventMessage[]{genericEventMessage});
            testSpanFactory.verifySpanCompleted("EventProcessor.process", genericEventMessage);
            AssertUtils.assertWithin(2, TimeUnit.SECONDS, () -> {
                testSpanFactory.verifySpanCompleted("StreamingEventProcessor.process", genericEventMessage);
            });
            start.shutdown();
        } catch (Throwable th) {
            start.shutdown();
            throw th;
        }
    }

    @Test
    void configureDefaultListenerInvocationErrorHandler() throws Exception {
        EventMessage genericEventMessage = new GenericEventMessage(true);
        StubErrorHandler stubErrorHandler = new StubErrorHandler(2);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        buildComplexEventHandlingConfiguration(countDownLatch);
        this.configurer.eventProcessing().registerDefaultListenerInvocationErrorHandler(configuration -> {
            return stubErrorHandler;
        });
        Configuration start = this.configurer.start();
        try {
            start.eventBus().publish(new EventMessage[]{genericEventMessage});
            Assertions.assertTrue(countDownLatch.await(10L, TimeUnit.SECONDS));
            Assertions.assertTrue(stubErrorHandler.await(10L, TimeUnit.SECONDS));
            Assertions.assertEquals(2, stubErrorHandler.getErrorCounter());
            start.shutdown();
        } catch (Throwable th) {
            start.shutdown();
            throw th;
        }
    }

    @Test
    void configureListenerInvocationErrorHandlerPerEventProcessor() throws Exception {
        EventMessage genericEventMessage = new GenericEventMessage(true);
        StubErrorHandler stubErrorHandler = new StubErrorHandler(1);
        StubErrorHandler stubErrorHandler2 = new StubErrorHandler(1);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        buildComplexEventHandlingConfiguration(countDownLatch);
        this.configurer.eventProcessing().registerListenerInvocationErrorHandler("subscribing", configuration -> {
            return stubErrorHandler;
        }).registerListenerInvocationErrorHandler("tracking", configuration2 -> {
            return stubErrorHandler2;
        });
        Configuration start = this.configurer.start();
        try {
            start.eventBus().publish(new EventMessage[]{genericEventMessage});
            Assertions.assertEquals(1, stubErrorHandler.getErrorCounter());
            Assertions.assertTrue(countDownLatch.await(10L, TimeUnit.SECONDS));
            Assertions.assertTrue(stubErrorHandler2.await(10L, TimeUnit.SECONDS));
            Assertions.assertEquals(1, stubErrorHandler2.getErrorCounter());
            start.shutdown();
        } catch (Throwable th) {
            start.shutdown();
            throw th;
        }
    }

    @Test
    void configureDefaultErrorHandler() throws Exception {
        EventMessage genericEventMessage = new GenericEventMessage(1000);
        StubErrorHandler stubErrorHandler = new StubErrorHandler(2);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        buildComplexEventHandlingConfiguration(countDownLatch);
        this.configurer.eventProcessing().registerDefaultListenerInvocationErrorHandler(configuration -> {
            return PropagatingErrorHandler.instance();
        }).registerDefaultErrorHandler(configuration2 -> {
            return stubErrorHandler;
        });
        Configuration start = this.configurer.start();
        try {
            start.eventBus().publish(new EventMessage[]{genericEventMessage});
            Assertions.assertTrue(countDownLatch.await(10L, TimeUnit.SECONDS));
            Assertions.assertTrue(stubErrorHandler.await(10L, TimeUnit.SECONDS));
            Assertions.assertEquals(2, stubErrorHandler.getErrorCounter());
            start.shutdown();
        } catch (Throwable th) {
            start.shutdown();
            throw th;
        }
    }

    @Test
    void trackingProcessorsUsesConfiguredDefaultStreamableMessageSource() {
        this.configurer.eventProcessing().configureDefaultStreamableMessageSource(configuration -> {
            return this.eventStoreOne;
        });
        this.configurer.eventProcessing().usingTrackingEventProcessors();
        this.configurer.registerEventHandler(configuration2 -> {
            return new TrackingEventHandler();
        });
        Optional eventProcessor = this.configurer.start().eventProcessingConfiguration().eventProcessor("tracking", TrackingEventProcessor.class);
        Assertions.assertTrue(eventProcessor.isPresent());
        Assertions.assertEquals(this.eventStoreOne, ((TrackingEventProcessor) eventProcessor.get()).getMessageSource());
    }

    @Test
    void trackingProcessorsUsesSpecificSource() {
        this.configurer.eventProcessing().configureDefaultStreamableMessageSource(configuration -> {
            return this.eventStoreOne;
        }).registerTrackingEventProcessor("tracking", configuration2 -> {
            return this.eventStoreTwo;
        }).registerEventHandler(configuration3 -> {
            return new TrackingEventHandler();
        });
        Optional eventProcessor = this.configurer.start().eventProcessingConfiguration().eventProcessor("tracking", TrackingEventProcessor.class);
        Assertions.assertTrue(eventProcessor.isPresent());
        Assertions.assertEquals(this.eventStoreTwo, ((TrackingEventProcessor) eventProcessor.get()).getMessageSource());
    }

    @Test
    void subscribingProcessorsUsesConfiguredDefaultSubscribableMessageSource() {
        this.configurer.eventProcessing().configureDefaultSubscribableMessageSource(configuration -> {
            return this.eventStoreOne;
        });
        this.configurer.eventProcessing().usingSubscribingEventProcessors();
        this.configurer.registerEventHandler(configuration2 -> {
            return new SubscribingEventHandler();
        });
        Optional eventProcessor = this.configurer.start().eventProcessingConfiguration().eventProcessor("subscribing");
        Assertions.assertTrue(eventProcessor.isPresent());
        Assertions.assertEquals(this.eventStoreOne, ((SubscribingEventProcessor) eventProcessor.get()).getMessageSource());
    }

    @Test
    void subscribingProcessorsUsesSpecificSource() {
        this.configurer.eventProcessing().configureDefaultSubscribableMessageSource(configuration -> {
            return this.eventStoreOne;
        }).registerSubscribingEventProcessor("subscribing", configuration2 -> {
            return this.eventStoreTwo;
        }).registerEventHandler(configuration3 -> {
            return new SubscribingEventHandler();
        });
        Optional eventProcessor = this.configurer.start().eventProcessingConfiguration().eventProcessor("subscribing");
        Assertions.assertTrue(eventProcessor.isPresent());
        Assertions.assertEquals(this.eventStoreTwo, ((SubscribingEventProcessor) eventProcessor.get()).getMessageSource());
    }

    @Test
    void configureErrorHandlerPerEventProcessor() throws Exception {
        EventMessage genericEventMessage = new GenericEventMessage(1000);
        StubErrorHandler stubErrorHandler = new StubErrorHandler(1);
        StubErrorHandler stubErrorHandler2 = new StubErrorHandler(1);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        buildComplexEventHandlingConfiguration(countDownLatch);
        this.configurer.eventProcessing().registerDefaultListenerInvocationErrorHandler(configuration -> {
            return PropagatingErrorHandler.instance();
        }).registerErrorHandler("subscribing", configuration2 -> {
            return stubErrorHandler;
        }).registerErrorHandler("tracking", configuration3 -> {
            return stubErrorHandler2;
        });
        Configuration start = this.configurer.start();
        try {
            start.eventBus().publish(new EventMessage[]{genericEventMessage});
            Assertions.assertEquals(1, stubErrorHandler.getErrorCounter());
            Assertions.assertTrue(countDownLatch.await(10L, TimeUnit.SECONDS));
            Assertions.assertTrue(stubErrorHandler2.await(10L, TimeUnit.SECONDS));
            Assertions.assertEquals(1, stubErrorHandler2.getErrorCounter());
            start.shutdown();
        } catch (Throwable th) {
            start.shutdown();
            throw th;
        }
    }

    @Test
    void packageOfObject() {
        Assertions.assertEquals(EventProcessingModule.class.getPackage().getName(), EventProcessingModule.packageOfObject(this));
    }

    @Test
    void defaultTrackingEventProcessingConfiguration() throws NoSuchFieldException {
        Object obj = new Object();
        TrackingEventProcessorConfiguration forParallelProcessing = TrackingEventProcessorConfiguration.forParallelProcessing(4);
        this.configurer.eventProcessing().usingTrackingEventProcessors().configureDefaultStreamableMessageSource(configuration -> {
            return this.eventStoreOne;
        }).byDefaultAssignTo("default").registerEventHandler(configuration2 -> {
            return obj;
        }).registerEventHandler(configuration3 -> {
            return new TrackingEventHandler();
        }).registerTrackingEventProcessorConfiguration(configuration4 -> {
            return forParallelProcessing;
        });
        Configuration start = this.configurer.start();
        Optional eventProcessor = start.eventProcessingConfiguration().eventProcessor("tracking", TrackingEventProcessor.class);
        Assertions.assertTrue(eventProcessor.isPresent());
        Assertions.assertEquals(4, ((Integer) ReflectionUtils.getFieldValue(TrackingEventProcessor.class.getDeclaredField("segmentsSize"), (TrackingEventProcessor) eventProcessor.get())).intValue());
        Optional eventProcessor2 = start.eventProcessingConfiguration().eventProcessor("default", TrackingEventProcessor.class);
        Assertions.assertTrue(eventProcessor2.isPresent());
        Assertions.assertEquals(4, ((Integer) ReflectionUtils.getFieldValue(TrackingEventProcessor.class.getDeclaredField("segmentsSize"), (TrackingEventProcessor) eventProcessor2.get())).intValue());
    }

    @Test
    void customTrackingEventProcessingConfiguration() throws NoSuchFieldException {
        Object obj = new Object();
        TrackingEventProcessorConfiguration forParallelProcessing = TrackingEventProcessorConfiguration.forParallelProcessing(4);
        this.configurer.eventProcessing().usingTrackingEventProcessors().configureDefaultStreamableMessageSource(configuration -> {
            return this.eventStoreOne;
        }).byDefaultAssignTo("default").registerEventHandler(configuration2 -> {
            return obj;
        }).registerEventHandler(configuration3 -> {
            return new TrackingEventHandler();
        }).registerTrackingEventProcessorConfiguration("tracking", configuration4 -> {
            return forParallelProcessing;
        });
        Configuration start = this.configurer.start();
        Optional eventProcessor = start.eventProcessingConfiguration().eventProcessor("tracking", TrackingEventProcessor.class);
        Assertions.assertTrue(eventProcessor.isPresent());
        Assertions.assertEquals(4, ((Integer) ReflectionUtils.getFieldValue(TrackingEventProcessor.class.getDeclaredField("segmentsSize"), (TrackingEventProcessor) eventProcessor.get())).intValue());
        Optional eventProcessor2 = start.eventProcessingConfiguration().eventProcessor("default", TrackingEventProcessor.class);
        Assertions.assertTrue(eventProcessor2.isPresent());
        Assertions.assertEquals(1, ((Integer) ReflectionUtils.getFieldValue(TrackingEventProcessor.class.getDeclaredField("segmentsSize"), (TrackingEventProcessor) eventProcessor2.get())).intValue());
    }

    @Test
    void sagaTrackingProcessorConstructionUsesDefaultSagaProcessorConfigIfNoCustomizationIsPresent() throws NoSuchFieldException {
        this.configurer.eventProcessing().usingTrackingEventProcessors().configureDefaultStreamableMessageSource(configuration -> {
            return this.eventStoreOne;
        }).registerSaga(Object.class);
        Optional eventProcessor = this.configurer.start().eventProcessingConfiguration().eventProcessor("ObjectProcessor", TrackingEventProcessor.class);
        Assertions.assertTrue(eventProcessor.isPresent());
        TrackingEventProcessor trackingEventProcessor = (TrackingEventProcessor) eventProcessor.get();
        Assertions.assertEquals(1, ((Integer) ReflectionUtils.getFieldValue(TrackingEventProcessor.class.getDeclaredField("segmentsSize"), trackingEventProcessor)).intValue());
        ((Function) ReflectionUtils.getFieldValue(TrackingEventProcessor.class.getDeclaredField("initialTrackingTokenBuilder"), trackingEventProcessor)).apply(this.eventStoreTwo);
        ((EventStore) Mockito.verify(this.eventStoreTwo, Mockito.times(0))).createTailToken();
        ((EventStore) Mockito.verify(this.eventStoreTwo)).createHeadToken();
    }

    @Test
    void sagaTrackingProcessorConstructionDoesNotPickDefaultSagaProcessorConfigForCustomProcessor() throws NoSuchFieldException {
        this.configurer.eventProcessing().assignProcessingGroup(str -> {
            return "custom-processor";
        }).registerTrackingEventProcessor("custom-processor", configuration -> {
            return this.eventStoreOne;
        }).registerSaga(CustomSaga.class);
        Optional eventProcessor = this.configurer.start().eventProcessingConfiguration().eventProcessor("custom-processor", TrackingEventProcessor.class);
        Assertions.assertTrue(eventProcessor.isPresent());
        TrackingEventProcessor trackingEventProcessor = (TrackingEventProcessor) eventProcessor.get();
        Assertions.assertEquals(1, ((Integer) ReflectionUtils.getFieldValue(TrackingEventProcessor.class.getDeclaredField("segmentsSize"), trackingEventProcessor)).intValue());
        Assertions.assertEquals(0L, ((TrackingToken) ((Function) ReflectionUtils.getFieldValue(TrackingEventProcessor.class.getDeclaredField("initialTrackingTokenBuilder"), trackingEventProcessor)).apply(this.eventStoreTwo)).position().orElse(-1L));
        ((EventStore) Mockito.verify(this.eventStoreTwo)).createHeadToken();
    }

    @Test
    void sagaTrackingProcessorConstructionDoesNotPickDefaultSagaProcessorConfigForCustomTrackingProcessorBuilder() throws NoSuchFieldException {
        TrackingEventProcessorConfiguration forParallelProcessing = TrackingEventProcessorConfiguration.forParallelProcessing(3);
        this.configurer.eventProcessing().registerTrackingEventProcessor("ObjectProcessor", configuration -> {
            return this.eventStoreOne;
        }, configuration2 -> {
            return forParallelProcessing;
        }).registerSaga(Object.class);
        Optional eventProcessor = this.configurer.start().eventProcessingConfiguration().eventProcessor("ObjectProcessor", TrackingEventProcessor.class);
        Assertions.assertTrue(eventProcessor.isPresent());
        TrackingEventProcessor trackingEventProcessor = (TrackingEventProcessor) eventProcessor.get();
        Assertions.assertEquals(3, ((Integer) ReflectionUtils.getFieldValue(TrackingEventProcessor.class.getDeclaredField("segmentsSize"), trackingEventProcessor)).intValue());
        Assertions.assertEquals(0L, ((TrackingToken) ((Function) ReflectionUtils.getFieldValue(TrackingEventProcessor.class.getDeclaredField("initialTrackingTokenBuilder"), trackingEventProcessor)).apply(this.eventStoreTwo)).position().orElse(-1L));
        ((EventStore) Mockito.verify(this.eventStoreTwo)).createHeadToken();
    }

    @Test
    void sagaTrackingProcessorConstructionDoesNotPickDefaultSagaProcessorConfigForCustomConfigInstance() throws NoSuchFieldException {
        TrackingEventProcessorConfiguration forParallelProcessing = TrackingEventProcessorConfiguration.forParallelProcessing(4);
        this.configurer.eventProcessing().usingTrackingEventProcessors().configureDefaultStreamableMessageSource(configuration -> {
            return this.eventStoreOne;
        }).registerSaga(Object.class).registerTrackingEventProcessorConfiguration("ObjectProcessor", configuration2 -> {
            return forParallelProcessing;
        });
        Optional eventProcessor = this.configurer.start().eventProcessingConfiguration().eventProcessor("ObjectProcessor", TrackingEventProcessor.class);
        Assertions.assertTrue(eventProcessor.isPresent());
        TrackingEventProcessor trackingEventProcessor = (TrackingEventProcessor) eventProcessor.get();
        Assertions.assertEquals(4, ((Integer) ReflectionUtils.getFieldValue(TrackingEventProcessor.class.getDeclaredField("segmentsSize"), trackingEventProcessor)).intValue());
        Assertions.assertEquals(0L, ((TrackingToken) ((Function) ReflectionUtils.getFieldValue(TrackingEventProcessor.class.getDeclaredField("initialTrackingTokenBuilder"), trackingEventProcessor)).apply(this.eventStoreTwo)).position().orElse(-1L));
        ((EventStore) Mockito.verify(this.eventStoreTwo)).createHeadToken();
    }

    @Test
    void sagaTrackingProcessorConstructionDoesNotPickDefaultSagaProcessorConfigForCustomDefaultConfig() throws NoSuchFieldException {
        TrackingEventProcessorConfiguration forParallelProcessing = TrackingEventProcessorConfiguration.forParallelProcessing(4);
        this.configurer.eventProcessing().usingTrackingEventProcessors().configureDefaultStreamableMessageSource(configuration -> {
            return this.eventStoreOne;
        }).registerSaga(Object.class).registerTrackingEventProcessorConfiguration(configuration2 -> {
            return forParallelProcessing;
        });
        Optional eventProcessor = this.configurer.start().eventProcessingConfiguration().eventProcessor("ObjectProcessor", TrackingEventProcessor.class);
        Assertions.assertTrue(eventProcessor.isPresent());
        TrackingEventProcessor trackingEventProcessor = (TrackingEventProcessor) eventProcessor.get();
        Assertions.assertEquals(4, ((Integer) ReflectionUtils.getFieldValue(TrackingEventProcessor.class.getDeclaredField("segmentsSize"), trackingEventProcessor)).intValue());
        Assertions.assertEquals(0L, ((TrackingToken) ((Function) ReflectionUtils.getFieldValue(TrackingEventProcessor.class.getDeclaredField("initialTrackingTokenBuilder"), trackingEventProcessor)).apply(this.eventStoreTwo)).position().orElse(-1L));
        ((EventStore) Mockito.verify(this.eventStoreTwo)).createHeadToken();
    }

    @Test
    void sagaPooledStreamingProcessorConstructionUsesDefaultSagaProcessorConfigIfNoCustomizationIsPresent() throws NoSuchFieldException {
        this.configurer.eventProcessing().usingPooledStreamingEventProcessors().configureDefaultStreamableMessageSource(configuration -> {
            return this.eventStoreOne;
        }).registerSaga(Object.class);
        Optional eventProcessor = this.configurer.start().eventProcessingConfiguration().eventProcessor("ObjectProcessor", PooledStreamingEventProcessor.class);
        Assertions.assertTrue(eventProcessor.isPresent());
        PooledStreamingEventProcessor pooledStreamingEventProcessor = (PooledStreamingEventProcessor) eventProcessor.get();
        Assertions.assertEquals(5000L, ((Long) ReflectionUtils.getFieldValue(PooledStreamingEventProcessor.class.getDeclaredField("tokenClaimInterval"), pooledStreamingEventProcessor)).longValue());
        ((Function) ReflectionUtils.getFieldValue(PooledStreamingEventProcessor.class.getDeclaredField("initialToken"), pooledStreamingEventProcessor)).apply(this.eventStoreTwo);
        ((EventStore) Mockito.verify(this.eventStoreTwo, Mockito.times(0))).createTailToken();
        ((EventStore) Mockito.verify(this.eventStoreTwo)).createHeadToken();
    }

    @Test
    void sagaPooledStreamingProcessorConstructionDoesNotPickDefaultSagaProcessorConfigForCustomProcessor() throws NoSuchFieldException {
        this.configurer.eventProcessing().assignProcessingGroup(str -> {
            return "custom-processor";
        }).registerPooledStreamingEventProcessor("custom-processor", configuration -> {
            return this.eventStoreOne;
        }).registerSaga(CustomSaga.class);
        Optional eventProcessor = this.configurer.start().eventProcessingConfiguration().eventProcessor("custom-processor", PooledStreamingEventProcessor.class);
        Assertions.assertTrue(eventProcessor.isPresent());
        PooledStreamingEventProcessor pooledStreamingEventProcessor = (PooledStreamingEventProcessor) eventProcessor.get();
        Assertions.assertEquals(5000L, ((Long) ReflectionUtils.getFieldValue(PooledStreamingEventProcessor.class.getDeclaredField("tokenClaimInterval"), pooledStreamingEventProcessor)).longValue());
        Assertions.assertEquals(0L, ((TrackingToken) ((Function) ReflectionUtils.getFieldValue(PooledStreamingEventProcessor.class.getDeclaredField("initialToken"), pooledStreamingEventProcessor)).apply(this.eventStoreTwo)).position().orElse(-1L));
        ((EventStore) Mockito.verify(this.eventStoreTwo)).createHeadToken();
    }

    @Test
    void sagaPooledStreamingProcessorConstructionDoesNotPickDefaultSagaProcessorConfigForCustomPooledStreamingProcessorBuilder() throws NoSuchFieldException {
        this.configurer.eventProcessing().registerPooledStreamingEventProcessor("ObjectProcessor", configuration -> {
            return this.eventStoreOne;
        }, (configuration2, builder) -> {
            return builder.maxClaimedSegments(4);
        }).registerSaga(Object.class);
        Optional eventProcessor = this.configurer.start().eventProcessingConfiguration().eventProcessor("ObjectProcessor", PooledStreamingEventProcessor.class);
        Assertions.assertTrue(eventProcessor.isPresent());
        Assertions.assertEquals(0L, ((TrackingToken) ((Function) ReflectionUtils.getFieldValue(PooledStreamingEventProcessor.class.getDeclaredField("initialToken"), (PooledStreamingEventProcessor) eventProcessor.get())).apply(this.eventStoreTwo)).position().orElse(-1L));
        ((EventStore) Mockito.verify(this.eventStoreTwo)).createHeadToken();
    }

    @Test
    void sagaPooledStreamingProcessorConstructionDoesNotPickDefaultSagaProcessorConfigForCustomConfigInstance() throws NoSuchFieldException {
        this.configurer.eventProcessing().usingPooledStreamingEventProcessors().configureDefaultStreamableMessageSource(configuration -> {
            return this.eventStoreOne;
        }).registerSaga(Object.class).registerPooledStreamingEventProcessorConfiguration("ObjectProcessor", (configuration2, builder) -> {
            return builder.maxClaimedSegments(4);
        });
        Optional eventProcessor = this.configurer.start().eventProcessingConfiguration().eventProcessor("ObjectProcessor", PooledStreamingEventProcessor.class);
        Assertions.assertTrue(eventProcessor.isPresent());
        Assertions.assertEquals(0L, ((TrackingToken) ((Function) ReflectionUtils.getFieldValue(PooledStreamingEventProcessor.class.getDeclaredField("initialToken"), (PooledStreamingEventProcessor) eventProcessor.get())).apply(this.eventStoreTwo)).position().orElse(-1L));
        ((EventStore) Mockito.verify(this.eventStoreTwo)).createHeadToken();
    }

    @Test
    void sagaPooledStreamingProcessorConstructionDoesNotPickDefaultSagaProcessorConfigForCustomDefaultConfig() throws NoSuchFieldException {
        this.configurer.eventProcessing().usingPooledStreamingEventProcessors((configuration, builder) -> {
            return builder.maxClaimedSegments(4);
        }).configureDefaultStreamableMessageSource(configuration2 -> {
            return this.eventStoreOne;
        }).registerSaga(Object.class);
        Optional eventProcessor = this.configurer.start().eventProcessingConfiguration().eventProcessor("ObjectProcessor", PooledStreamingEventProcessor.class);
        Assertions.assertTrue(eventProcessor.isPresent());
        Assertions.assertEquals(0L, ((TrackingToken) ((Function) ReflectionUtils.getFieldValue(PooledStreamingEventProcessor.class.getDeclaredField("initialToken"), (PooledStreamingEventProcessor) eventProcessor.get())).apply(this.eventStoreTwo)).position().orElse(-1L));
        ((EventStore) Mockito.verify(this.eventStoreTwo)).createHeadToken();
    }

    @Test
    void defaultPooledStreamingEventProcessingConfiguration() {
        Object obj = new Object();
        this.configurer.eventProcessing().usingPooledStreamingEventProcessors().configureDefaultStreamableMessageSource(configuration -> {
            return this.eventStoreOne;
        }).byDefaultAssignTo("default").registerEventHandler(configuration2 -> {
            return obj;
        }).registerEventHandler(configuration3 -> {
            return new PooledStreamingEventHandler();
        });
        Configuration start = this.configurer.start();
        Assertions.assertTrue(start.eventProcessingConfiguration().eventProcessor("pooled-streaming", PooledStreamingEventProcessor.class).isPresent());
        Assertions.assertTrue(start.eventProcessingConfiguration().eventProcessor("default", PooledStreamingEventProcessor.class).isPresent());
    }

    @Test
    void configurePooledStreamingEventProcessorFailsInAbsenceOfStreamableMessageSource() {
        this.configurer.eventProcessing().registerPooledStreamingEventProcessor("pooled-streaming").registerEventHandler(configuration -> {
            return new PooledStreamingEventHandler();
        });
        Assertions.assertThrows(LifecycleHandlerInvocationException.class, () -> {
            this.configurer.start();
        });
    }

    @Test
    void configurePooledStreamingEventProcessor() throws NoSuchFieldException, IllegalAccessException {
        InMemoryTokenStore inMemoryTokenStore = new InMemoryTokenStore();
        TestSpanFactory testSpanFactory = new TestSpanFactory();
        this.configurer.configureEmbeddedEventStore(configuration -> {
            return new InMemoryEventStorageEngine();
        }).configureSpanFactory(configuration2 -> {
            return testSpanFactory;
        }).eventProcessing().registerPooledStreamingEventProcessor("pooled-streaming").registerEventHandler(configuration3 -> {
            return new PooledStreamingEventHandler();
        }).registerRollbackConfiguration("pooled-streaming", configuration4 -> {
            return RollbackConfigurationType.ANY_THROWABLE;
        }).registerErrorHandler("pooled-streaming", configuration5 -> {
            return PropagatingErrorHandler.INSTANCE;
        }).registerTokenStore("pooled-streaming", configuration6 -> {
            return inMemoryTokenStore;
        }).registerTransactionManager("pooled-streaming", configuration7 -> {
            return NoTransactionManager.INSTANCE;
        });
        Configuration start = this.configurer.start();
        Optional eventProcessor = start.eventProcessingConfiguration().eventProcessor("pooled-streaming", PooledStreamingEventProcessor.class);
        Assertions.assertTrue(eventProcessor.isPresent());
        PooledStreamingEventProcessor pooledStreamingEventProcessor = (PooledStreamingEventProcessor) eventProcessor.get();
        Assertions.assertEquals("pooled-streaming", pooledStreamingEventProcessor.getName());
        Assertions.assertEquals(RollbackConfigurationType.ANY_THROWABLE, getField(AbstractEventProcessor.class, "rollbackConfiguration", pooledStreamingEventProcessor));
        Assertions.assertEquals(PropagatingErrorHandler.INSTANCE, getField(AbstractEventProcessor.class, "errorHandler", pooledStreamingEventProcessor));
        Assertions.assertEquals(inMemoryTokenStore, getField("tokenStore", pooledStreamingEventProcessor));
        Assertions.assertEquals(NoTransactionManager.INSTANCE, getField("transactionManager", pooledStreamingEventProcessor));
        Assertions.assertEquals(start.getComponent(EventProcessorSpanFactory.class), getField(AbstractEventProcessor.class, "spanFactory", pooledStreamingEventProcessor));
    }

    @Test
    void configurePooledStreamingEventProcessorWithSource() throws NoSuchFieldException, IllegalAccessException {
        InMemoryTokenStore inMemoryTokenStore = new InMemoryTokenStore();
        this.configurer.eventProcessing().registerPooledStreamingEventProcessor("pooled-streaming", configuration -> {
            return this.eventStoreOne;
        }).registerEventHandler(configuration2 -> {
            return new PooledStreamingEventHandler();
        }).registerRollbackConfiguration("pooled-streaming", configuration3 -> {
            return RollbackConfigurationType.ANY_THROWABLE;
        }).registerErrorHandler("pooled-streaming", configuration4 -> {
            return PropagatingErrorHandler.INSTANCE;
        }).registerTokenStore("pooled-streaming", configuration5 -> {
            return inMemoryTokenStore;
        }).registerTransactionManager("pooled-streaming", configuration6 -> {
            return NoTransactionManager.INSTANCE;
        });
        Optional eventProcessor = this.configurer.start().eventProcessingConfiguration().eventProcessor("pooled-streaming", PooledStreamingEventProcessor.class);
        Assertions.assertTrue(eventProcessor.isPresent());
        PooledStreamingEventProcessor pooledStreamingEventProcessor = (PooledStreamingEventProcessor) eventProcessor.get();
        Assertions.assertEquals("pooled-streaming", pooledStreamingEventProcessor.getName());
        Assertions.assertEquals(RollbackConfigurationType.ANY_THROWABLE, getField(AbstractEventProcessor.class, "rollbackConfiguration", pooledStreamingEventProcessor));
        Assertions.assertEquals(PropagatingErrorHandler.INSTANCE, getField(AbstractEventProcessor.class, "errorHandler", pooledStreamingEventProcessor));
        Assertions.assertEquals(this.eventStoreOne, getField("messageSource", pooledStreamingEventProcessor));
        Assertions.assertEquals(inMemoryTokenStore, getField("tokenStore", pooledStreamingEventProcessor));
        Assertions.assertEquals(NoTransactionManager.INSTANCE, getField("transactionManager", pooledStreamingEventProcessor));
    }

    @Test
    void configurePooledStreamingEventProcessorWithConfiguration() throws NoSuchFieldException, IllegalAccessException {
        int i = 24;
        this.configurer.eventProcessing().registerPooledStreamingEventProcessor("pooled-streaming", configuration -> {
            return this.eventStoreOne;
        }, (configuration2, builder) -> {
            return builder.maxClaimedSegments(i);
        }).registerEventHandler(configuration3 -> {
            return new PooledStreamingEventHandler();
        });
        Optional eventProcessor = this.configurer.start().eventProcessingConfiguration().eventProcessor("pooled-streaming", PooledStreamingEventProcessor.class);
        Assertions.assertTrue(eventProcessor.isPresent());
        PooledStreamingEventProcessor pooledStreamingEventProcessor = (PooledStreamingEventProcessor) eventProcessor.get();
        Assertions.assertEquals(24, pooledStreamingEventProcessor.maxCapacity());
        Assertions.assertEquals(this.eventStoreOne, getField("messageSource", pooledStreamingEventProcessor));
    }

    @Test
    void registerPooledStreamingEventProcessorConfigurationIsUsedDuringAllPsepConstructions() throws NoSuchFieldException, IllegalAccessException {
        int i = 24;
        Object obj = new Object();
        this.configurer.eventProcessing().usingPooledStreamingEventProcessors().registerPooledStreamingEventProcessorConfiguration((configuration, builder) -> {
            return builder.maxClaimedSegments(i);
        }).configureDefaultStreamableMessageSource(configuration2 -> {
            return this.eventStoreOne;
        }).registerEventHandler(configuration3 -> {
            return new PooledStreamingEventHandler();
        }).byDefaultAssignTo("default").registerEventHandler(configuration4 -> {
            return obj;
        });
        Configuration start = this.configurer.start();
        Optional eventProcessor = start.eventProcessingConfiguration().eventProcessor("pooled-streaming", PooledStreamingEventProcessor.class);
        Assertions.assertTrue(eventProcessor.isPresent());
        PooledStreamingEventProcessor pooledStreamingEventProcessor = (PooledStreamingEventProcessor) eventProcessor.get();
        Assertions.assertEquals(24, pooledStreamingEventProcessor.maxCapacity());
        Assertions.assertEquals(this.eventStoreOne, getField("messageSource", pooledStreamingEventProcessor));
        Optional eventProcessor2 = start.eventProcessingConfiguration().eventProcessor("default", PooledStreamingEventProcessor.class);
        Assertions.assertTrue(eventProcessor2.isPresent());
        PooledStreamingEventProcessor pooledStreamingEventProcessor2 = (PooledStreamingEventProcessor) eventProcessor2.get();
        Assertions.assertEquals(24, pooledStreamingEventProcessor2.maxCapacity());
        Assertions.assertEquals(this.eventStoreOne, getField("messageSource", pooledStreamingEventProcessor2));
    }

    @Test
    void usingPooledStreamingEventProcessorWithConfigurationIsUsedDuringAllPsepConstructions() throws NoSuchFieldException, IllegalAccessException {
        int i = 24;
        Object obj = new Object();
        this.configurer.eventProcessing().usingPooledStreamingEventProcessors((configuration, builder) -> {
            return builder.maxClaimedSegments(i);
        }).configureDefaultStreamableMessageSource(configuration2 -> {
            return this.eventStoreOne;
        }).registerEventHandler(configuration3 -> {
            return new PooledStreamingEventHandler();
        }).byDefaultAssignTo("default").registerEventHandler(configuration4 -> {
            return obj;
        });
        Configuration start = this.configurer.start();
        Optional eventProcessor = start.eventProcessingConfiguration().eventProcessor("pooled-streaming", PooledStreamingEventProcessor.class);
        Assertions.assertTrue(eventProcessor.isPresent());
        PooledStreamingEventProcessor pooledStreamingEventProcessor = (PooledStreamingEventProcessor) eventProcessor.get();
        Assertions.assertEquals(24, pooledStreamingEventProcessor.maxCapacity());
        Assertions.assertEquals(this.eventStoreOne, getField("messageSource", pooledStreamingEventProcessor));
        Optional eventProcessor2 = start.eventProcessingConfiguration().eventProcessor("default", PooledStreamingEventProcessor.class);
        Assertions.assertTrue(eventProcessor2.isPresent());
        PooledStreamingEventProcessor pooledStreamingEventProcessor2 = (PooledStreamingEventProcessor) eventProcessor2.get();
        Assertions.assertEquals(24, pooledStreamingEventProcessor2.maxCapacity());
        Assertions.assertEquals(this.eventStoreOne, getField("messageSource", pooledStreamingEventProcessor2));
    }

    @Test
    void registerPooledStreamingEventProcessorConfigurationForNameIsUsedDuringSpecificPsepConstruction() throws NoSuchFieldException, IllegalAccessException {
        int i = 24;
        Object obj = new Object();
        this.configurer.eventProcessing().usingPooledStreamingEventProcessors().registerPooledStreamingEventProcessorConfiguration("pooled-streaming", (configuration, builder) -> {
            return builder.maxClaimedSegments(i);
        }).configureDefaultStreamableMessageSource(configuration2 -> {
            return this.eventStoreOne;
        }).registerEventHandler(configuration3 -> {
            return new PooledStreamingEventHandler();
        }).byDefaultAssignTo("default").registerEventHandler(configuration4 -> {
            return obj;
        });
        Configuration start = this.configurer.start();
        Optional eventProcessor = start.eventProcessingConfiguration().eventProcessor("pooled-streaming", PooledStreamingEventProcessor.class);
        Assertions.assertTrue(eventProcessor.isPresent());
        PooledStreamingEventProcessor pooledStreamingEventProcessor = (PooledStreamingEventProcessor) eventProcessor.get();
        Assertions.assertEquals(24, pooledStreamingEventProcessor.maxCapacity());
        Assertions.assertEquals(this.eventStoreOne, getField("messageSource", pooledStreamingEventProcessor));
        Optional eventProcessor2 = start.eventProcessingConfiguration().eventProcessor("default", PooledStreamingEventProcessor.class);
        Assertions.assertTrue(eventProcessor2.isPresent());
        PooledStreamingEventProcessor pooledStreamingEventProcessor2 = (PooledStreamingEventProcessor) eventProcessor2.get();
        Assertions.assertEquals(32767, pooledStreamingEventProcessor2.maxCapacity());
        Assertions.assertEquals(this.eventStoreOne, getField("messageSource", pooledStreamingEventProcessor2));
    }

    @Test
    void registerPooledStreamingEventProcessorWithConfigurationOverridesDefaultPsepConfiguration() throws NoSuchFieldException, IllegalAccessException {
        int i = 24;
        int i2 = 1745;
        this.configurer.eventProcessing().registerPooledStreamingEventProcessorConfiguration((configuration, builder) -> {
            return builder.maxClaimedSegments(i2);
        }).registerPooledStreamingEventProcessor("pooled-streaming", configuration2 -> {
            return this.eventStoreOne;
        }, (configuration3, builder2) -> {
            return builder2.maxClaimedSegments(i);
        }).registerEventHandler(configuration4 -> {
            return new PooledStreamingEventHandler();
        });
        Optional eventProcessor = this.configurer.start().eventProcessingConfiguration().eventProcessor("pooled-streaming", PooledStreamingEventProcessor.class);
        Assertions.assertTrue(eventProcessor.isPresent());
        PooledStreamingEventProcessor pooledStreamingEventProcessor = (PooledStreamingEventProcessor) eventProcessor.get();
        Assertions.assertEquals(24, pooledStreamingEventProcessor.maxCapacity());
        Assertions.assertEquals(this.eventStoreOne, getField("messageSource", pooledStreamingEventProcessor));
    }

    @Test
    void registerPooledStreamingEventProcessorWithConfigurationOverridesCustomPsepConfiguration() throws NoSuchFieldException, IllegalAccessException {
        int i = 24;
        int i2 = 42;
        int i3 = 1729;
        this.configurer.eventProcessing().registerPooledStreamingEventProcessorConfiguration((configuration, builder) -> {
            return builder.batchSize(100).maxClaimedSegments(i2);
        }).registerPooledStreamingEventProcessorConfiguration("pooled-streaming", (configuration2, builder2) -> {
            return builder2.maxClaimedSegments(i3);
        }).registerPooledStreamingEventProcessor("pooled-streaming", configuration3 -> {
            return this.eventStoreOne;
        }, (configuration4, builder3) -> {
            return builder3.maxClaimedSegments(i);
        }).registerEventHandler(configuration5 -> {
            return new PooledStreamingEventHandler();
        });
        Optional eventProcessor = this.configurer.buildConfiguration().eventProcessingConfiguration().eventProcessor("pooled-streaming", PooledStreamingEventProcessor.class);
        Assertions.assertTrue(eventProcessor.isPresent());
        PooledStreamingEventProcessor pooledStreamingEventProcessor = (PooledStreamingEventProcessor) eventProcessor.get();
        Assertions.assertEquals(24, pooledStreamingEventProcessor.maxCapacity());
        Assertions.assertEquals(this.eventStoreOne, getField("messageSource", pooledStreamingEventProcessor));
        Assertions.assertEquals(100, ((Integer) getField("batchSize", pooledStreamingEventProcessor)).intValue());
    }

    @Test
    void defaultTransactionManagerIsUsedUponEventProcessorConstruction() throws InterruptedException {
        EventMessage genericEventMessage = new GenericEventMessage(1000);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        StubTransactionManager stubTransactionManager = new StubTransactionManager(countDownLatch);
        this.configurer.configureEmbeddedEventStore(configuration -> {
            return new InMemoryEventStorageEngine();
        }).eventProcessing().registerPooledStreamingEventProcessor("pooled-streaming").registerEventHandler(configuration2 -> {
            return new PooledStreamingEventHandler();
        }).registerDefaultTransactionManager(configuration3 -> {
            return stubTransactionManager;
        });
        Configuration start = this.configurer.start();
        try {
            start.eventBus().publish(new EventMessage[]{genericEventMessage});
            Assertions.assertTrue(countDownLatch.await(10L, TimeUnit.SECONDS));
            start.shutdown();
        } catch (Throwable th) {
            start.shutdown();
            throw th;
        }
    }

    @Test
    void defaultTransactionManagerIsOverriddenByProcessorSpecificInstance() throws InterruptedException {
        EventMessage genericEventMessage = new GenericEventMessage(1000);
        TransactionManager transactionManager = (TransactionManager) Mockito.spy(TransactionManager.class);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        StubTransactionManager stubTransactionManager = new StubTransactionManager(countDownLatch);
        this.configurer.configureEmbeddedEventStore(configuration -> {
            return new InMemoryEventStorageEngine();
        }).eventProcessing().registerPooledStreamingEventProcessor("pooled-streaming").registerEventHandler(configuration2 -> {
            return new PooledStreamingEventHandler();
        }).registerDefaultTransactionManager(configuration3 -> {
            return transactionManager;
        }).registerTransactionManager("pooled-streaming", configuration4 -> {
            return stubTransactionManager;
        });
        Configuration start = this.configurer.start();
        try {
            start.eventBus().publish(new EventMessage[]{genericEventMessage});
            Assertions.assertTrue(countDownLatch.await(10L, TimeUnit.SECONDS));
            start.shutdown();
            Mockito.verifyNoInteractions(new Object[]{transactionManager});
        } catch (Throwable th) {
            start.shutdown();
            throw th;
        }
    }

    @Test
    void registerDeadLetterQueueConstructsDeadLetteringEventHandlerInvoker(@Mock SequencedDeadLetterQueue<EventMessage<?>> sequencedDeadLetterQueue) throws NoSuchFieldException, IllegalAccessException {
        this.configurer.configureEmbeddedEventStore(configuration -> {
            return new InMemoryEventStorageEngine();
        }).eventProcessing().registerPooledStreamingEventProcessor("pooled-streaming").registerEventHandler(configuration2 -> {
            return new PooledStreamingEventHandler();
        }).registerDeadLetterQueue("pooled-streaming", configuration3 -> {
            return sequencedDeadLetterQueue;
        }).registerTransactionManager("pooled-streaming", configuration4 -> {
            return NoTransactionManager.INSTANCE;
        });
        Configuration start = this.configurer.start();
        Optional deadLetterPolicy = start.eventProcessingConfiguration().deadLetterPolicy("pooled-streaming");
        Assertions.assertTrue(deadLetterPolicy.isPresent());
        EnqueuePolicy enqueuePolicy = (EnqueuePolicy) deadLetterPolicy.get();
        Optional deadLetterQueue = start.eventProcessingConfiguration().deadLetterQueue("pooled-streaming");
        Assertions.assertTrue(deadLetterQueue.isPresent());
        Assertions.assertEquals(sequencedDeadLetterQueue, deadLetterQueue.get());
        Optional eventProcessor = start.eventProcessingConfiguration().eventProcessor("pooled-streaming", PooledStreamingEventProcessor.class);
        Assertions.assertTrue(eventProcessor.isPresent());
        MultiEventHandlerInvoker multiEventHandlerInvoker = (EventHandlerInvoker) getField(AbstractEventProcessor.class, "eventHandlerInvoker", (PooledStreamingEventProcessor) eventProcessor.get());
        Assertions.assertEquals(MultiEventHandlerInvoker.class, multiEventHandlerInvoker.getClass());
        List list = (List) getField("delegates", multiEventHandlerInvoker);
        Assertions.assertFalse(list.isEmpty());
        DeadLetteringEventHandlerInvoker deadLetteringEventHandlerInvoker = (DeadLetteringEventHandlerInvoker) list.get(0);
        Assertions.assertEquals(sequencedDeadLetterQueue, getField("queue", deadLetteringEventHandlerInvoker));
        Assertions.assertEquals(enqueuePolicy, getField("enqueuePolicy", deadLetteringEventHandlerInvoker));
        Assertions.assertEquals(NoTransactionManager.INSTANCE, getField("transactionManager", deadLetteringEventHandlerInvoker));
    }

    @Test
    void registerDefaultDeadLetterPolicyIsUsed(@Mock SequencedDeadLetterQueue<EventMessage<?>> sequencedDeadLetterQueue) throws NoSuchFieldException, IllegalAccessException {
        EnqueuePolicy enqueuePolicy = (deadLetter, th) -> {
            return Decisions.ignore();
        };
        this.configurer.configureEmbeddedEventStore(configuration -> {
            return new InMemoryEventStorageEngine();
        }).eventProcessing().registerPooledStreamingEventProcessor("pooled-streaming").registerEventHandler(configuration2 -> {
            return new PooledStreamingEventHandler();
        }).registerDeadLetterQueue("pooled-streaming", configuration3 -> {
            return sequencedDeadLetterQueue;
        }).registerDefaultDeadLetterPolicy(configuration4 -> {
            return enqueuePolicy;
        }).registerTransactionManager("pooled-streaming", configuration5 -> {
            return NoTransactionManager.INSTANCE;
        });
        Configuration start = this.configurer.start();
        Optional deadLetterPolicy = start.eventProcessingConfiguration().deadLetterPolicy("pooled-streaming");
        Assertions.assertTrue(deadLetterPolicy.isPresent());
        Assertions.assertEquals(enqueuePolicy, (EnqueuePolicy) deadLetterPolicy.get());
        Optional deadLetterQueue = start.eventProcessingConfiguration().deadLetterQueue("pooled-streaming");
        Assertions.assertTrue(deadLetterQueue.isPresent());
        Assertions.assertEquals(sequencedDeadLetterQueue, deadLetterQueue.get());
        Optional eventProcessor = start.eventProcessingConfiguration().eventProcessor("pooled-streaming", PooledStreamingEventProcessor.class);
        Assertions.assertTrue(eventProcessor.isPresent());
        MultiEventHandlerInvoker multiEventHandlerInvoker = (EventHandlerInvoker) getField(AbstractEventProcessor.class, "eventHandlerInvoker", (PooledStreamingEventProcessor) eventProcessor.get());
        Assertions.assertEquals(MultiEventHandlerInvoker.class, multiEventHandlerInvoker.getClass());
        List list = (List) getField("delegates", multiEventHandlerInvoker);
        Assertions.assertFalse(list.isEmpty());
        Assertions.assertEquals(enqueuePolicy, getField("enqueuePolicy", (DeadLetteringEventHandlerInvoker) list.get(0)));
    }

    @Test
    void registerDeadLetterPolicyIsUsed(@Mock SequencedDeadLetterQueue<EventMessage<?>> sequencedDeadLetterQueue) throws NoSuchFieldException, IllegalAccessException {
        EnqueuePolicy enqueuePolicy = (deadLetter, th) -> {
            return Decisions.ignore();
        };
        EnqueuePolicy enqueuePolicy2 = (deadLetter2, th2) -> {
            return Decisions.evict();
        };
        this.configurer.configureEmbeddedEventStore(configuration -> {
            return new InMemoryEventStorageEngine();
        }).eventProcessing().registerPooledStreamingEventProcessor("pooled-streaming").registerEventHandler(configuration2 -> {
            return new PooledStreamingEventHandler();
        }).registerDeadLetterQueue("pooled-streaming", configuration3 -> {
            return sequencedDeadLetterQueue;
        }).registerDeadLetterPolicy("pooled-streaming", configuration4 -> {
            return enqueuePolicy;
        }).registerDeadLetterPolicy("unused-processing-group", configuration5 -> {
            return enqueuePolicy2;
        }).registerTransactionManager("pooled-streaming", configuration6 -> {
            return NoTransactionManager.INSTANCE;
        });
        Configuration start = this.configurer.start();
        Optional deadLetterPolicy = start.eventProcessingConfiguration().deadLetterPolicy("pooled-streaming");
        Assertions.assertTrue(deadLetterPolicy.isPresent());
        EnqueuePolicy enqueuePolicy3 = (EnqueuePolicy) deadLetterPolicy.get();
        Assertions.assertEquals(enqueuePolicy, enqueuePolicy3);
        Assertions.assertNotEquals(enqueuePolicy2, enqueuePolicy3);
        Optional deadLetterQueue = start.eventProcessingConfiguration().deadLetterQueue("pooled-streaming");
        Assertions.assertTrue(deadLetterQueue.isPresent());
        Assertions.assertEquals(sequencedDeadLetterQueue, deadLetterQueue.get());
        Optional eventProcessor = start.eventProcessingConfiguration().eventProcessor("pooled-streaming", PooledStreamingEventProcessor.class);
        Assertions.assertTrue(eventProcessor.isPresent());
        MultiEventHandlerInvoker multiEventHandlerInvoker = (EventHandlerInvoker) getField(AbstractEventProcessor.class, "eventHandlerInvoker", (PooledStreamingEventProcessor) eventProcessor.get());
        Assertions.assertEquals(MultiEventHandlerInvoker.class, multiEventHandlerInvoker.getClass());
        List list = (List) getField("delegates", multiEventHandlerInvoker);
        Assertions.assertFalse(list.isEmpty());
        DeadLetteringEventHandlerInvoker deadLetteringEventHandlerInvoker = (DeadLetteringEventHandlerInvoker) list.get(0);
        Assertions.assertEquals(enqueuePolicy, getField("enqueuePolicy", deadLetteringEventHandlerInvoker));
        Assertions.assertNotEquals(enqueuePolicy2, getField("enqueuePolicy", deadLetteringEventHandlerInvoker));
    }

    @Test
    void registeredDeadLetteringEventHandlerInvokerConfigurationIsUsed(@Mock SequencedDeadLetterQueue<EventMessage<?>> sequencedDeadLetterQueue) throws NoSuchFieldException, IllegalAccessException {
        this.configurer.configureEmbeddedEventStore(configuration -> {
            return new InMemoryEventStorageEngine();
        }).eventProcessing().registerPooledStreamingEventProcessor("pooled-streaming").registerEventHandler(configuration2 -> {
            return new PooledStreamingEventHandler();
        }).registerDeadLetterQueue("pooled-streaming", configuration3 -> {
            return sequencedDeadLetterQueue;
        }).registerDeadLetteringEventHandlerInvokerConfiguration("pooled-streaming", (configuration4, builder) -> {
            return builder.allowReset(true);
        }).registerTransactionManager("pooled-streaming", configuration5 -> {
            return NoTransactionManager.INSTANCE;
        });
        Configuration start = this.configurer.start();
        Optional deadLetterQueue = start.eventProcessingConfiguration().deadLetterQueue("pooled-streaming");
        Assertions.assertTrue(deadLetterQueue.isPresent());
        Assertions.assertEquals(sequencedDeadLetterQueue, deadLetterQueue.get());
        Optional eventProcessor = start.eventProcessingConfiguration().eventProcessor("pooled-streaming", PooledStreamingEventProcessor.class);
        Assertions.assertTrue(eventProcessor.isPresent());
        MultiEventHandlerInvoker multiEventHandlerInvoker = (EventHandlerInvoker) getField(AbstractEventProcessor.class, "eventHandlerInvoker", (PooledStreamingEventProcessor) eventProcessor.get());
        Assertions.assertEquals(MultiEventHandlerInvoker.class, multiEventHandlerInvoker.getClass());
        List list = (List) getField("delegates", multiEventHandlerInvoker);
        Assertions.assertFalse(list.isEmpty());
        Assertions.assertTrue(((Boolean) getField("allowReset", (DeadLetteringEventHandlerInvoker) list.get(0))).booleanValue());
    }

    @Test
    void sequencedDeadLetterProcessorReturnsForProcessingGroupWithDlq(@Mock SequencedDeadLetterQueue<EventMessage<?>> sequencedDeadLetterQueue) {
        this.configurer.configureEmbeddedEventStore(configuration -> {
            return new InMemoryEventStorageEngine();
        }).eventProcessing().registerPooledStreamingEventProcessor("pooled-streaming").registerPooledStreamingEventProcessor("tracking").registerEventHandler(configuration2 -> {
            return new PooledStreamingEventHandler();
        }).registerEventHandler(configuration3 -> {
            return new TrackingEventHandler();
        }).registerDeadLetterQueue("pooled-streaming", configuration4 -> {
            return sequencedDeadLetterQueue;
        }).registerTransactionManager("pooled-streaming", configuration5 -> {
            return NoTransactionManager.INSTANCE;
        });
        EventProcessingConfiguration eventProcessingConfiguration = this.configurer.start().eventProcessingConfiguration();
        Optional deadLetterQueue = eventProcessingConfiguration.deadLetterQueue("pooled-streaming");
        Assertions.assertTrue(deadLetterQueue.isPresent());
        Assertions.assertEquals(sequencedDeadLetterQueue, deadLetterQueue.get());
        Assertions.assertTrue(eventProcessingConfiguration.eventProcessor("pooled-streaming", PooledStreamingEventProcessor.class).isPresent());
        Assertions.assertTrue(eventProcessingConfiguration.sequencedDeadLetterProcessor("pooled-streaming").isPresent());
        Assertions.assertFalse(eventProcessingConfiguration.sequencedDeadLetterProcessor("tracking").isPresent());
        Assertions.assertFalse(eventProcessingConfiguration.sequencedDeadLetterProcessor("non-existing-group").isPresent());
    }

    @Test
    void interceptorsOnDeadLetterProcessorShouldBePresent(@Mock SequencedDeadLetterQueue<EventMessage<?>> sequencedDeadLetterQueue) throws NoSuchFieldException, IllegalAccessException {
        StubInterceptor stubInterceptor = new StubInterceptor();
        StubInterceptor stubInterceptor2 = new StubInterceptor();
        this.configurer.configureEmbeddedEventStore(configuration -> {
            return new InMemoryEventStorageEngine();
        }).eventProcessing().registerPooledStreamingEventProcessor("pooled-streaming").registerEventHandler(configuration2 -> {
            return new PooledStreamingEventHandler();
        }).registerDeadLetterQueue("pooled-streaming", configuration3 -> {
            return sequencedDeadLetterQueue;
        }).registerTransactionManager("pooled-streaming", configuration4 -> {
            return NoTransactionManager.INSTANCE;
        }).registerHandlerInterceptor("pooled-streaming", configuration5 -> {
            return stubInterceptor;
        }).registerDefaultHandlerInterceptor((configuration6, str) -> {
            return stubInterceptor2;
        });
        Optional sequencedDeadLetterProcessor = this.configurer.start().eventProcessingConfiguration().sequencedDeadLetterProcessor("pooled-streaming");
        Assertions.assertTrue(sequencedDeadLetterProcessor.isPresent());
        Assertions.assertEquals(3, ((List) getField("interceptors", sequencedDeadLetterProcessor.get())).size());
    }

    @Test
    void registerDeadLetterQueueProviderConstructsDeadLetteringEventHandlerInvoker(@Mock SequencedDeadLetterQueue<EventMessage<?>> sequencedDeadLetterQueue) throws NoSuchFieldException, IllegalAccessException {
        this.configurer.configureEmbeddedEventStore(configuration -> {
            return new InMemoryEventStorageEngine();
        }).eventProcessing().registerPooledStreamingEventProcessor("pooled-streaming").registerEventHandler(configuration2 -> {
            return new PooledStreamingEventHandler();
        }).registerDeadLetterQueueProvider(str -> {
            return configuration3 -> {
                return sequencedDeadLetterQueue;
            };
        }).registerTransactionManager("pooled-streaming", configuration3 -> {
            return NoTransactionManager.INSTANCE;
        });
        Configuration start = this.configurer.start();
        Optional deadLetterPolicy = start.eventProcessingConfiguration().deadLetterPolicy("pooled-streaming");
        Assertions.assertTrue(deadLetterPolicy.isPresent());
        EnqueuePolicy enqueuePolicy = (EnqueuePolicy) deadLetterPolicy.get();
        Optional deadLetterQueue = start.eventProcessingConfiguration().deadLetterQueue("pooled-streaming");
        Assertions.assertTrue(deadLetterQueue.isPresent());
        Assertions.assertEquals(sequencedDeadLetterQueue, deadLetterQueue.get());
        Optional eventProcessor = start.eventProcessingConfiguration().eventProcessor("pooled-streaming", PooledStreamingEventProcessor.class);
        Assertions.assertTrue(eventProcessor.isPresent());
        MultiEventHandlerInvoker multiEventHandlerInvoker = (EventHandlerInvoker) getField(AbstractEventProcessor.class, "eventHandlerInvoker", (PooledStreamingEventProcessor) eventProcessor.get());
        Assertions.assertEquals(MultiEventHandlerInvoker.class, multiEventHandlerInvoker.getClass());
        List list = (List) getField("delegates", multiEventHandlerInvoker);
        Assertions.assertFalse(list.isEmpty());
        DeadLetteringEventHandlerInvoker deadLetteringEventHandlerInvoker = (DeadLetteringEventHandlerInvoker) list.get(0);
        Assertions.assertEquals(sequencedDeadLetterQueue, getField("queue", deadLetteringEventHandlerInvoker));
        Assertions.assertEquals(enqueuePolicy, getField("enqueuePolicy", deadLetteringEventHandlerInvoker));
        Assertions.assertEquals(NoTransactionManager.INSTANCE, getField("transactionManager", deadLetteringEventHandlerInvoker));
    }

    @Test
    void whenADeadLetterHasBeenRegisteredForASpecificGroupItWillBeUsedInsteadOfTheGenericOne(@Mock SequencedDeadLetterQueue<EventMessage<?>> sequencedDeadLetterQueue, @Mock SequencedDeadLetterQueue<EventMessage<?>> sequencedDeadLetterQueue2) {
        this.configurer.configureEmbeddedEventStore(configuration -> {
            return new InMemoryEventStorageEngine();
        }).eventProcessing().registerPooledStreamingEventProcessor("pooled-streaming").registerEventHandler(configuration2 -> {
            return new PooledStreamingEventHandler();
        }).registerDeadLetterQueue("pooled-streaming", configuration3 -> {
            return sequencedDeadLetterQueue;
        }).registerDeadLetterQueueProvider(str -> {
            return configuration4 -> {
                return sequencedDeadLetterQueue2;
            };
        }).registerTransactionManager("pooled-streaming", configuration4 -> {
            return NoTransactionManager.INSTANCE;
        });
        Optional deadLetterQueue = this.configurer.start().eventProcessingConfiguration().deadLetterQueue("pooled-streaming");
        Assertions.assertTrue(deadLetterQueue.isPresent());
        Assertions.assertEquals(sequencedDeadLetterQueue, deadLetterQueue.get());
    }

    private <O, R> R getField(String str, O o) throws NoSuchFieldException, IllegalAccessException {
        return (R) getField(o.getClass(), str, o);
    }

    private <C, O, R> R getField(Class<C> cls, String str, O o) throws NoSuchFieldException, IllegalAccessException {
        Field declaredField = cls.getDeclaredField(str);
        declaredField.setAccessible(true);
        return (R) declaredField.get(o);
    }

    private void buildComplexEventHandlingConfiguration(CountDownLatch countDownLatch) {
        this.configurer.configureEmbeddedEventStore(configuration -> {
            return new InMemoryEventStorageEngine();
        });
        this.configurer.eventProcessing().registerSubscribingEventProcessor("subscribing").registerTrackingEventProcessor("tracking").assignHandlerInstancesMatching("subscribing", obj -> {
            return obj.getClass().isAssignableFrom(SubscribingEventHandler.class);
        }).assignHandlerInstancesMatching("tracking", obj2 -> {
            return obj2.getClass().isAssignableFrom(TrackingEventHandler.class);
        }).registerEventHandler(configuration2 -> {
            return new SubscribingEventHandler();
        }).registerEventHandler(configuration3 -> {
            return new TrackingEventHandler();
        }).registerTokenStore("tracking", configuration4 -> {
            return new InMemoryTokenStore() { // from class: org.axonframework.config.EventProcessingModuleTest.1
                public int[] fetchSegments(@Nonnull String str) {
                    countDownLatch.countDown();
                    return super.fetchSegments(str);
                }
            };
        });
    }
}
