package org.axonframework.eventhandling.deadletter;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.time.Duration;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.awaitility.Awaitility;
import org.axonframework.common.AxonException;
import org.axonframework.common.transaction.NoOpTransactionManager;
import org.axonframework.common.transaction.TransactionManager;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.EventTestUtils;
import org.axonframework.eventhandling.EventTrackerStatus;
import org.axonframework.eventhandling.StreamingEventProcessor;
import org.axonframework.eventhandling.annotation.EventHandler;
import org.axonframework.eventhandling.deadletter.DeadLetteringEventHandlerInvoker;
import org.axonframework.eventhandling.pooled.PooledStreamingEventProcessor;
import org.axonframework.eventhandling.tokenstore.inmemory.InMemoryTokenStore;
import org.axonframework.messaging.MessageHandlerInterceptor;
import org.axonframework.messaging.MetaData;
import org.axonframework.messaging.annotation.MessageIdentifier;
import org.axonframework.messaging.deadletter.Cause;
import org.axonframework.messaging.deadletter.DeadLetter;
import org.axonframework.messaging.deadletter.Decisions;
import org.axonframework.messaging.deadletter.SequencedDeadLetterQueue;
import org.axonframework.messaging.deadletter.ThrowableCause;
import org.axonframework.utils.AssertUtils;
import org.axonframework.utils.InMemoryStreamableEventSource;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

/* loaded from: input_file:org/axonframework/eventhandling/deadletter/DeadLetteringEventIntegrationTest.class */
public abstract class DeadLetteringEventIntegrationTest {
    protected static final String PROCESSING_GROUP = "problematicProcessingGroup";
    private static final boolean SUCCEED = true;
    private static final boolean SUCCEED_RETRY = true;
    private static final boolean FAIL = false;
    private static final boolean FAIL_RETRY = false;
    private static final int DEFAULT_RETRIES = 1;
    private static final String BLOB_OF_TEXT = "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Gravida quis blandit turpis cursus in. Nulla facilisi etiam dignissim diam quis enim lobortis scelerisque fermentum. Egestas maecenas pharetra convallis posuere morbi leo urna. Dictumst quisque sagittis purus sit amet volutpat consequat. At volutpat diam ut venenatis tellus in metus vulputate eu. Imperdiet dui accumsan sit amet nulla facilisi. Eget est lorem ipsum dolor sit amet. Vestibulum morbi blandit cursus risus at ultrices mi tempus imperdiet. Sed tempus urna et pharetra pharetra massa massa. Dolor magna eget est lorem. Purus semper eget duis at tellus. Tincidunt augue interdum velit euismod in pellentesque massa placerat duis.\n\nQuis ipsum suspendisse ultrices gravida dictum fusce ut. Nascetur ridiculus mus mauris vitae ultricies leo integer malesuada. Sit amet purus gravida quis blandit turpis cursus in. Gravida rutrum quisque non tellus. Eros donec ac odio tempor orci dapibus. Dictum varius duis at consectetur lorem donec massa sapien.Tincidunt arcu non sodales neque sodales ut etiam sit amet. Sagittis aliquam malesuada bibendum arcu vitae. Vel turpis nunc eget lorem dolor sed viverra. In egestas erat imperdiet sed euismod nisi. Lorem ipsum dolor sit amet consectetur.";
    private ProblematicEventHandlingComponent eventHandlingComponent;
    private SequencedDeadLetterQueue<EventMessage<?>> deadLetterQueue;
    private DeadLetteringEventHandlerInvoker deadLetteringInvoker;
    private InMemoryStreamableEventSource eventSource;
    private StreamingEventProcessor streamingProcessor;
    protected TransactionManager transactionManager;
    private ScheduledExecutorService executor;
    private final AtomicInteger maxRetries = new AtomicInteger(1);
    private final AtomicBoolean returnReferenceErrorFromPolicy = new AtomicBoolean(false);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/axonframework/eventhandling/deadletter/DeadLetteringEventIntegrationTest$DeadLetterableEvent.class */
    public static class DeadLetterableEvent {
        private final String aggregateIdentifier;
        private final boolean shouldSucceed;
        private final boolean shouldSucceedOnEvaluation;
        private final String causeMessage;

        private DeadLetterableEvent(String str, boolean z) {
            this(str, z, true);
        }

        private DeadLetterableEvent(String str, boolean z, boolean z2) {
            this(str, z, z2, "");
        }

        @JsonCreator
        public DeadLetterableEvent(@JsonProperty("aggregateIdentifier") String str, @JsonProperty("shouldSucceed") boolean z, @JsonProperty("shouldSucceedOnEvaluation") boolean z2, @JsonProperty("causeMessage") String str2) {
            this.aggregateIdentifier = str;
            this.shouldSucceed = z;
            this.shouldSucceedOnEvaluation = z2;
            this.causeMessage = str2;
        }

        public String getAggregateIdentifier() {
            return this.aggregateIdentifier;
        }

        public boolean isShouldSucceed() {
            return this.shouldSucceed;
        }

        public boolean isShouldSucceedOnEvaluation() {
            return this.shouldSucceedOnEvaluation;
        }

        public String getCauseMessage() {
            return this.causeMessage;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            DeadLetterableEvent deadLetterableEvent = (DeadLetterableEvent) obj;
            return this.shouldSucceed == deadLetterableEvent.shouldSucceed && this.shouldSucceedOnEvaluation == deadLetterableEvent.shouldSucceedOnEvaluation && Objects.equals(this.aggregateIdentifier, deadLetterableEvent.aggregateIdentifier) && Objects.equals(this.causeMessage, deadLetterableEvent.causeMessage);
        }

        public int hashCode() {
            return Objects.hash(this.aggregateIdentifier, Boolean.valueOf(this.shouldSucceed), Boolean.valueOf(this.shouldSucceedOnEvaluation), this.causeMessage);
        }

        public String toString() {
            return "DeadLetterableEvent{aggregateIdentifier='" + this.aggregateIdentifier + "', shouldSucceed=" + this.shouldSucceed + ", shouldSucceedOnEvaluation=" + this.shouldSucceedOnEvaluation + ", causeMessage='" + this.causeMessage + "'}";
        }
    }

    /* loaded from: input_file:org/axonframework/eventhandling/deadletter/DeadLetteringEventIntegrationTest$ProblematicEventHandlingComponent.class */
    private static class ProblematicEventHandlingComponent {
        private final Set<String> handledEvent = new ConcurrentSkipListSet();
        private final Map<String, Integer> firstTrySuccesses = new ConcurrentSkipListMap();
        private final Map<String, Integer> evaluationSuccesses = new ConcurrentSkipListMap();
        private final Map<String, Integer> firstTryFailures = new ConcurrentSkipListMap();
        private final Map<String, Integer> evaluationFailures = new ConcurrentSkipListMap();
        private final Map<String, Integer> hasResolvedDeadLetterParameter = new ConcurrentSkipListMap();

        private ProblematicEventHandlingComponent() {
        }

        @EventHandler
        public void on(DeadLetterableEvent deadLetterableEvent, @MessageIdentifier String str, DeadLetter<EventMessage<DeadLetterableEvent>> deadLetter) {
            String aggregateIdentifier = deadLetterableEvent.getAggregateIdentifier();
            if (this.handledEvent.contains(str)) {
                processEvaluationOf(deadLetterableEvent, aggregateIdentifier);
            } else {
                this.handledEvent.add(str);
                if (initialHandlingWasUnsuccessful(aggregateIdentifier)) {
                    processEvaluationOf(deadLetterableEvent, aggregateIdentifier);
                } else {
                    processInitialHandlingOf(deadLetterableEvent, aggregateIdentifier);
                }
            }
            if (deadLetter != null) {
                this.hasResolvedDeadLetterParameter.compute(aggregateIdentifier, (str2, num) -> {
                    return Integer.valueOf(num == null ? 1 : Integer.valueOf(num.intValue() + 1).intValue());
                });
            }
        }

        private void processInitialHandlingOf(DeadLetterableEvent deadLetterableEvent, String str) {
            if (deadLetterableEvent.isShouldSucceed()) {
                this.firstTrySuccesses.compute(str, (str2, num) -> {
                    return Integer.valueOf(num == null ? 1 : Integer.valueOf(num.intValue() + 1).intValue());
                });
            } else {
                this.firstTryFailures.compute(str, (str3, num2) -> {
                    return Integer.valueOf(num2 == null ? 1 : Integer.valueOf(num2.intValue() + 1).intValue());
                });
                throw new RuntimeException("Initial handling failed. Let's dead letter event [" + str + "].\n" + deadLetterableEvent.getCauseMessage());
            }
        }

        private void processEvaluationOf(DeadLetterableEvent deadLetterableEvent, String str) {
            if (deadLetterableEvent.isShouldSucceedOnEvaluation()) {
                this.evaluationSuccesses.compute(str, (str2, num) -> {
                    return Integer.valueOf(num == null ? 1 : Integer.valueOf(num.intValue() + 1).intValue());
                });
            } else {
                this.evaluationFailures.compute(str, (str3, num2) -> {
                    return Integer.valueOf(num2 == null ? 1 : Integer.valueOf(num2.intValue() + 1).intValue());
                });
                throw new RuntimeException("Evaluation failed. Let's dead letter event [" + str + "].\n" + deadLetterableEvent.getCauseMessage());
            }
        }

        public boolean initialHandlingWasSuccessful(String str) {
            return this.firstTrySuccesses.containsKey(str);
        }

        public int successfulInitialHandlingCount(String str) {
            if (initialHandlingWasSuccessful(str)) {
                return this.firstTrySuccesses.get(str).intValue();
            }
            return 0;
        }

        public boolean evaluationWasSuccessful(String str) {
            return this.evaluationSuccesses.containsKey(str);
        }

        public int successfulEvaluationCount(String str) {
            if (evaluationWasSuccessful(str)) {
                return this.evaluationSuccesses.get(str).intValue();
            }
            return 0;
        }

        public boolean initialHandlingWasUnsuccessful(String str) {
            return this.firstTryFailures.containsKey(str);
        }

        public int unsuccessfulInitialHandlingCount(String str) {
            if (initialHandlingWasUnsuccessful(str)) {
                return this.firstTryFailures.get(str).intValue();
            }
            return 0;
        }

        public boolean evaluationWasUnsuccessful(String str) {
            return this.evaluationFailures.containsKey(str);
        }

        public int unsuccessfulEvaluationCount(String str) {
            if (evaluationWasUnsuccessful(str)) {
                return this.evaluationFailures.get(str).intValue();
            }
            return 0;
        }

        public int overallSuccessfulHandlingCount(String str) {
            return this.firstTrySuccesses.get(str).intValue() + this.evaluationSuccesses.get(str).intValue();
        }

        public int resolvedDeadLetterParameterCount(String str) {
            return this.hasResolvedDeadLetterParameter.get(str).intValue();
        }
    }

    /* loaded from: input_file:org/axonframework/eventhandling/deadletter/DeadLetteringEventIntegrationTest$ReferenceException.class */
    private static class ReferenceException extends AxonException {
        ReferenceException(UUID uuid) {
            super(uuid.toString());
        }
    }

    protected abstract SequencedDeadLetterQueue<EventMessage<?>> buildDeadLetterQueue();

    protected TransactionManager getTransactionManager() {
        return new NoOpTransactionManager();
    }

    protected boolean identifierCacheEnabled() {
        return false;
    }

    @BeforeEach
    void setUp() {
        this.transactionManager = getTransactionManager();
        this.eventHandlingComponent = new ProblematicEventHandlingComponent();
        this.deadLetterQueue = buildDeadLetterQueue();
        DeadLetteringEventHandlerInvoker.Builder transactionManager = DeadLetteringEventHandlerInvoker.builder().eventHandlers(new Object[]{this.eventHandlingComponent}).sequencingPolicy(eventMessage -> {
            return ((DeadLetterableEvent) eventMessage.getPayload()).getAggregateIdentifier();
        }).enqueuePolicy((deadLetter, th) -> {
            if (((Integer) deadLetter.diagnostics().getOrDefault("retries", 0)).intValue() >= this.maxRetries.get()) {
                return Decisions.evict();
            }
            Throwable th = th;
            if (this.returnReferenceErrorFromPolicy.get()) {
                th = new ReferenceException(UUID.randomUUID());
            }
            return Decisions.enqueue(ThrowableCause.truncated(th), deadLetter -> {
                return MetaData.with("retries", Integer.valueOf(((Integer) deadLetter.diagnostics().getOrDefault("retries", 0)).intValue() + 1));
            });
        }).queue(this.deadLetterQueue).transactionManager(this.transactionManager);
        if (identifierCacheEnabled()) {
            transactionManager.enableSequenceIdentifierCache();
        }
        this.deadLetteringInvoker = transactionManager.build();
        this.eventSource = new InMemoryStreamableEventSource();
        this.streamingProcessor = PooledStreamingEventProcessor.builder().name(PROCESSING_GROUP).eventHandlerInvoker(this.deadLetteringInvoker).messageSource(this.eventSource).tokenStore(new InMemoryTokenStore()).transactionManager(this.transactionManager).coordinatorExecutor(Executors.newSingleThreadScheduledExecutor()).workerExecutor(Executors.newSingleThreadScheduledExecutor()).initialSegmentCount(1).claimExtensionThreshold(1000L).build();
        this.executor = Executors.newScheduledThreadPool(2);
    }

    @AfterEach
    void tearDown() {
        boolean z = false;
        try {
            this.streamingProcessor.shutdownAsync().get(15L, TimeUnit.SECONDS);
            z = this.executor.awaitTermination(50L, TimeUnit.MILLISECONDS);
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            Thread.currentThread().interrupt();
            e.printStackTrace();
        }
        if (!z) {
            this.executor.shutdownNow();
        }
        this.maxRetries.set(1);
        this.returnReferenceErrorFromPolicy.set(false);
    }

    protected void startProcessingEvent() {
        this.streamingProcessor.start();
    }

    protected void processAnyDeadLetter() {
        this.deadLetteringInvoker.processAny();
    }

    protected void processAnyDeadLettersPeriodically() {
        this.executor.scheduleWithFixedDelay(this::processAnyDeadLetter, 5L, 5L, TimeUnit.MILLISECONDS);
    }

    @Test
    void failedEventHandlingEnqueuesTheEvent() {
        EventMessage<?> asEventMessage = EventTestUtils.asEventMessage(new DeadLetterableEvent("failure", false));
        this.eventSource.publishMessage(EventTestUtils.asEventMessage(new DeadLetterableEvent("success", true)));
        this.eventSource.publishMessage(asEventMessage);
        startProcessingEvent();
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> {
            Assertions.assertEquals(1, this.streamingProcessor.processingStatus().size());
        });
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> {
            Assertions.assertTrue(((EventTrackerStatus) this.streamingProcessor.processingStatus().get(0)).getCurrentPosition().getAsLong() >= 2);
        });
        Assertions.assertTrue(this.eventHandlingComponent.initialHandlingWasSuccessful("success"));
        Assertions.assertTrue(this.eventHandlingComponent.initialHandlingWasUnsuccessful("failure"));
        Assertions.assertTrue(this.deadLetterQueue.contains("failure"));
        Assertions.assertFalse(this.deadLetterQueue.contains("success"));
        Iterator it = this.deadLetterQueue.deadLetterSequence("failure").iterator();
        Assertions.assertTrue(it.hasNext());
        Assertions.assertEquals(asEventMessage.getPayload(), ((DeadLetter) it.next()).message().getPayload());
        Assertions.assertFalse(it.hasNext());
    }

    @Test
    void eventsInTheSameSequenceAreAllEnqueuedIfOneOfThemFails() {
        String uuid = UUID.randomUUID().toString();
        this.eventSource.publishMessage(EventTestUtils.asEventMessage(new DeadLetterableEvent(uuid, true)));
        this.eventSource.publishMessage(EventTestUtils.asEventMessage(new DeadLetterableEvent(uuid, true)));
        this.eventSource.publishMessage(EventTestUtils.asEventMessage(new DeadLetterableEvent(uuid, true)));
        DeadLetterableEvent deadLetterableEvent = new DeadLetterableEvent(uuid, false);
        this.eventSource.publishMessage(EventTestUtils.asEventMessage(deadLetterableEvent));
        DeadLetterableEvent deadLetterableEvent2 = new DeadLetterableEvent(uuid, true);
        this.eventSource.publishMessage(EventTestUtils.asEventMessage(deadLetterableEvent2));
        DeadLetterableEvent deadLetterableEvent3 = new DeadLetterableEvent(uuid, true);
        this.eventSource.publishMessage(EventTestUtils.asEventMessage(deadLetterableEvent3));
        startProcessingEvent();
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> {
            Assertions.assertEquals(1, this.streamingProcessor.processingStatus().size());
        });
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> {
            Assertions.assertTrue(((EventTrackerStatus) this.streamingProcessor.processingStatus().get(0)).getCurrentPosition().getAsLong() >= 6);
        });
        Assertions.assertTrue(this.eventHandlingComponent.initialHandlingWasSuccessful(uuid));
        Assertions.assertEquals(3, this.eventHandlingComponent.successfulInitialHandlingCount(uuid));
        Assertions.assertTrue(this.eventHandlingComponent.initialHandlingWasUnsuccessful(uuid));
        Assertions.assertEquals(1, this.eventHandlingComponent.unsuccessfulInitialHandlingCount(uuid));
        Assertions.assertTrue(this.deadLetterQueue.contains(uuid));
        AssertUtils.assertWithin(2, TimeUnit.SECONDS, () -> {
            Iterator it = this.deadLetterQueue.deadLetterSequence(uuid).iterator();
            Assertions.assertTrue(it.hasNext());
            Assertions.assertEquals(deadLetterableEvent, ((DeadLetter) it.next()).message().getPayload());
            Assertions.assertTrue(it.hasNext());
            Assertions.assertEquals(deadLetterableEvent2, ((DeadLetter) it.next()).message().getPayload());
            Assertions.assertTrue(it.hasNext());
            Assertions.assertEquals(deadLetterableEvent3, ((DeadLetter) it.next()).message().getPayload());
            Assertions.assertFalse(it.hasNext());
        });
    }

    @Test
    void successfulRetryingLettersEvictsTheLettersFromTheQueue() {
        int i = 3;
        int i2 = 0;
        String uuid = UUID.randomUUID().toString();
        this.eventSource.publishMessage(EventTestUtils.asEventMessage(new DeadLetterableEvent(uuid, true)));
        this.eventSource.publishMessage(EventTestUtils.asEventMessage(new DeadLetterableEvent(uuid, true)));
        this.eventSource.publishMessage(EventTestUtils.asEventMessage(new DeadLetterableEvent(uuid, true)));
        this.eventSource.publishMessage(EventTestUtils.asEventMessage(new DeadLetterableEvent(uuid, false, true)));
        this.eventSource.publishMessage(EventTestUtils.asEventMessage(new DeadLetterableEvent(uuid, true, true)));
        this.eventSource.publishMessage(EventTestUtils.asEventMessage(new DeadLetterableEvent(uuid, true, true)));
        startProcessingEvent();
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> {
            Assertions.assertEquals(1, this.streamingProcessor.processingStatus().size());
        });
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> {
            Assertions.assertTrue(((EventTrackerStatus) this.streamingProcessor.processingStatus().get(0)).getCurrentPosition().getAsLong() >= 6);
        });
        Assertions.assertTrue(this.eventHandlingComponent.initialHandlingWasSuccessful(uuid));
        Assertions.assertEquals(3, this.eventHandlingComponent.successfulInitialHandlingCount(uuid));
        Assertions.assertTrue(this.eventHandlingComponent.initialHandlingWasUnsuccessful(uuid));
        Assertions.assertEquals(1, this.eventHandlingComponent.unsuccessfulInitialHandlingCount(uuid));
        Assertions.assertTrue(this.deadLetterQueue.contains(uuid));
        this.deadLetteringInvoker.process(deadLetter -> {
            return true;
        });
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> {
            Assertions.assertTrue(this.eventHandlingComponent.evaluationWasSuccessful(uuid));
        });
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> {
            Assertions.assertEquals(i, this.eventHandlingComponent.successfulEvaluationCount(uuid));
        });
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> {
            Assertions.assertFalse(this.eventHandlingComponent.evaluationWasUnsuccessful(uuid));
        });
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> {
            Assertions.assertEquals(i2, this.eventHandlingComponent.unsuccessfulEvaluationCount(uuid));
        });
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> {
            Assertions.assertFalse(this.deadLetterQueue.contains(uuid));
        });
    }

    @Test
    void unsuccessfulProcessingLettersRequeuesTheLettersInTheQueue() {
        int i = 2;
        int i2 = 1;
        String uuid = UUID.randomUUID().toString();
        this.eventSource.publishMessage(EventTestUtils.asEventMessage(new DeadLetterableEvent(uuid, true)));
        this.eventSource.publishMessage(EventTestUtils.asEventMessage(new DeadLetterableEvent(uuid, true)));
        this.eventSource.publishMessage(EventTestUtils.asEventMessage(new DeadLetterableEvent(uuid, true)));
        this.eventSource.publishMessage(EventTestUtils.asEventMessage(new DeadLetterableEvent(uuid, false, true)));
        this.eventSource.publishMessage(EventTestUtils.asEventMessage(new DeadLetterableEvent(uuid, true, true)));
        this.eventSource.publishMessage(EventTestUtils.asEventMessage(new DeadLetterableEvent(uuid, true, false)));
        startProcessingEvent();
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> {
            Assertions.assertEquals(1, this.streamingProcessor.processingStatus().size());
        });
        AssertUtils.assertWithin(2, TimeUnit.SECONDS, () -> {
            OptionalLong currentPosition = ((EventTrackerStatus) this.streamingProcessor.processingStatus().get(0)).getCurrentPosition();
            Assertions.assertTrue(currentPosition.isPresent());
            Assertions.assertTrue(currentPosition.getAsLong() >= 6);
        });
        Assertions.assertTrue(this.eventHandlingComponent.initialHandlingWasSuccessful(uuid));
        Assertions.assertEquals(3, this.eventHandlingComponent.successfulInitialHandlingCount(uuid));
        Assertions.assertTrue(this.eventHandlingComponent.initialHandlingWasUnsuccessful(uuid));
        Assertions.assertEquals(1, this.eventHandlingComponent.unsuccessfulInitialHandlingCount(uuid));
        Assertions.assertTrue(this.deadLetterQueue.contains(uuid));
        this.deadLetteringInvoker.process(deadLetter -> {
            return true;
        });
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> {
            Assertions.assertTrue(this.eventHandlingComponent.evaluationWasSuccessful(uuid));
        });
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> {
            Assertions.assertEquals(i, this.eventHandlingComponent.successfulEvaluationCount(uuid));
        });
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> {
            Assertions.assertTrue(this.eventHandlingComponent.evaluationWasUnsuccessful(uuid));
        });
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> {
            Assertions.assertEquals(i2, this.eventHandlingComponent.unsuccessfulEvaluationCount(uuid));
        });
        Assertions.assertTrue(this.deadLetterQueue.contains(uuid));
    }

    @Test
    void publishEventsAndProcessDeadLettersConcurrentlyShouldWorkFine() {
        int i = 2;
        int i2 = 1;
        String uuid = UUID.randomUUID().toString();
        startProcessingEvent();
        processAnyDeadLettersPeriodically();
        this.eventSource.publishMessage(EventTestUtils.asEventMessage(new DeadLetterableEvent(uuid, true)));
        this.eventSource.publishMessage(EventTestUtils.asEventMessage(new DeadLetterableEvent(uuid, true)));
        this.eventSource.publishMessage(EventTestUtils.asEventMessage(new DeadLetterableEvent(uuid, true)));
        this.eventSource.publishMessage(EventTestUtils.asEventMessage(new DeadLetterableEvent(uuid, false, true)));
        this.eventSource.publishMessage(EventTestUtils.asEventMessage(new DeadLetterableEvent(uuid, true, true)));
        this.eventSource.publishMessage(EventTestUtils.asEventMessage(new DeadLetterableEvent(uuid, true, false)));
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> {
            Assertions.assertEquals(1, this.streamingProcessor.processingStatus().size());
        });
        AssertUtils.assertWithin(2, TimeUnit.SECONDS, () -> {
            OptionalLong currentPosition = ((EventTrackerStatus) this.streamingProcessor.processingStatus().get(0)).getCurrentPosition();
            Assertions.assertTrue(currentPosition.isPresent());
            Assertions.assertTrue(currentPosition.getAsLong() >= 6);
        });
        Assertions.assertTrue(this.eventHandlingComponent.initialHandlingWasSuccessful(uuid));
        Assertions.assertEquals(3, this.eventHandlingComponent.successfulInitialHandlingCount(uuid));
        Assertions.assertTrue(this.eventHandlingComponent.initialHandlingWasUnsuccessful(uuid));
        Assertions.assertEquals(1, this.eventHandlingComponent.unsuccessfulInitialHandlingCount(uuid));
        Assertions.assertTrue(this.deadLetterQueue.contains(uuid));
        AssertUtils.assertWithin(2, TimeUnit.SECONDS, () -> {
            Assertions.assertTrue(this.eventHandlingComponent.evaluationWasSuccessful(uuid));
        });
        AssertUtils.assertWithin(500, TimeUnit.MILLISECONDS, () -> {
            Assertions.assertEquals(i, this.eventHandlingComponent.successfulEvaluationCount(uuid));
        });
        AssertUtils.assertWithin(500, TimeUnit.MILLISECONDS, () -> {
            Assertions.assertTrue(this.eventHandlingComponent.evaluationWasUnsuccessful(uuid));
        });
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> {
            Assertions.assertEquals(i2, this.eventHandlingComponent.unsuccessfulEvaluationCount(uuid));
        });
        Assertions.assertTrue(this.deadLetterQueue.contains(uuid));
    }

    @Timeout(20)
    @Test
    void publishEventsAndProcessDeadLettersConcurrentlyInBulkShouldWorkFine() throws InterruptedException {
        int i = 5;
        int i2 = 4;
        int i3 = 1;
        int i4 = 4 - 1;
        int i5 = 5 + i4;
        int i6 = 40;
        int i7 = (5 + 4) * 40;
        HashSet<String> hashSet = new HashSet();
        HashSet hashSet2 = new HashSet();
        Thread thread = new Thread(() -> {
            for (int i8 = 0; i8 < i6; i8++) {
                String num = Integer.toString(i8);
                publishEventsFor(num, i, i2, i3);
                hashSet.add(num);
            }
        });
        startProcessingEvent();
        processAnyDeadLettersPeriodically();
        thread.start();
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> {
            Assertions.assertEquals(1, this.streamingProcessor.processingStatus().size());
        });
        AssertUtils.assertWithin(15, TimeUnit.SECONDS, () -> {
            OptionalLong currentPosition = ((EventTrackerStatus) this.streamingProcessor.processingStatus().get(0)).getCurrentPosition();
            Assertions.assertTrue(currentPosition.isPresent());
            Assertions.assertEquals(i7, currentPosition.getAsLong());
        });
        for (String str : hashSet) {
            if (!hashSet2.contains(str)) {
                AssertUtils.assertWithin(500, TimeUnit.MILLISECONDS, () -> {
                    Assertions.assertTrue(this.eventHandlingComponent.initialHandlingWasSuccessful(str));
                });
                AssertUtils.assertWithin(500, TimeUnit.MILLISECONDS, () -> {
                    Assertions.assertEquals(i, this.eventHandlingComponent.successfulInitialHandlingCount(str));
                });
                AssertUtils.assertWithin(500, TimeUnit.MILLISECONDS, () -> {
                    Assertions.assertTrue(this.eventHandlingComponent.initialHandlingWasUnsuccessful(str));
                });
                Assertions.assertEquals(1, this.eventHandlingComponent.unsuccessfulInitialHandlingCount(str));
                AssertUtils.assertWithin(15, TimeUnit.SECONDS, () -> {
                    Assertions.assertTrue(this.eventHandlingComponent.evaluationWasSuccessful(str));
                });
                AssertUtils.assertWithin(500, TimeUnit.MILLISECONDS, () -> {
                    Assertions.assertEquals(i4, this.eventHandlingComponent.successfulEvaluationCount(str));
                });
                AssertUtils.assertWithin(500, TimeUnit.MILLISECONDS, () -> {
                    Assertions.assertTrue(this.eventHandlingComponent.evaluationWasUnsuccessful(str));
                });
                Assertions.assertTrue(this.eventHandlingComponent.unsuccessfulEvaluationCount(str) >= 1);
                Assertions.assertEquals(i5, this.eventHandlingComponent.overallSuccessfulHandlingCount(str));
                hashSet2.add(str);
            }
        }
        thread.join();
    }

    @Test
    void processedDeadLetterIsResolvedAsParameterToEventHandlers() {
        String uuid = UUID.randomUUID().toString();
        this.eventSource.publishMessage(EventTestUtils.asEventMessage(new DeadLetterableEvent(uuid, true)));
        this.eventSource.publishMessage(EventTestUtils.asEventMessage(new DeadLetterableEvent(uuid, true)));
        this.eventSource.publishMessage(EventTestUtils.asEventMessage(new DeadLetterableEvent(uuid, true)));
        this.eventSource.publishMessage(EventTestUtils.asEventMessage(new DeadLetterableEvent(uuid, false, true)));
        this.eventSource.publishMessage(EventTestUtils.asEventMessage(new DeadLetterableEvent(uuid, true, true)));
        this.eventSource.publishMessage(EventTestUtils.asEventMessage(new DeadLetterableEvent(uuid, true, true)));
        startProcessingEvent();
        Awaitility.await().pollDelay(Duration.ofMillis(25L)).atMost(Duration.ofSeconds(1L)).until(() -> {
            return Boolean.valueOf(this.streamingProcessor.processingStatus().size() == 1);
        });
        Awaitility.await().pollDelay(Duration.ofMillis(25L)).atMost(Duration.ofSeconds(1L)).until(() -> {
            return Boolean.valueOf(((EventTrackerStatus) this.streamingProcessor.processingStatus().get(0)).getCurrentPosition().getAsLong() >= 6);
        });
        Assertions.assertTrue(this.eventHandlingComponent.initialHandlingWasSuccessful(uuid));
        Assertions.assertEquals(3, this.eventHandlingComponent.successfulInitialHandlingCount(uuid));
        Assertions.assertTrue(this.eventHandlingComponent.initialHandlingWasUnsuccessful(uuid));
        Assertions.assertEquals(1, this.eventHandlingComponent.unsuccessfulInitialHandlingCount(uuid));
        Assertions.assertTrue(this.deadLetterQueue.contains(uuid));
        this.deadLetteringInvoker.process(deadLetter -> {
            return true;
        });
        Assertions.assertEquals(3, this.eventHandlingComponent.resolvedDeadLetterParameterCount(uuid));
    }

    @Test
    void deadLetterEventProcessingTaskIsUsingInterceptor() {
        this.maxRetries.set(3);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        this.deadLetteringInvoker.registerHandlerInterceptor(errorCatchingInterceptor(atomicBoolean));
        this.eventSource.publishMessage(EventTestUtils.asEventMessage(new DeadLetterableEvent(UUID.randomUUID().toString(), false)));
        startProcessingEvent();
        Awaitility.await().pollDelay(Duration.ofMillis(25L)).atMost(Duration.ofSeconds(10L)).until(() -> {
            return Boolean.valueOf(this.deadLetterQueue.size() == 1);
        });
        this.eventHandlingComponent.handledEvent.clear();
        this.eventHandlingComponent.firstTryFailures.clear();
        this.deadLetteringInvoker.process(deadLetter -> {
            return true;
        });
        Assertions.assertTrue(atomicBoolean.get());
        Assertions.assertEquals(0L, this.deadLetterQueue.size());
    }

    @Test
    void causeFromDecisionShouldBeStored() {
        this.returnReferenceErrorFromPolicy.set(true);
        this.eventSource.publishMessage(EventTestUtils.asEventMessage(new DeadLetterableEvent(UUID.randomUUID().toString(), false)));
        startProcessingEvent();
        Awaitility.await().pollDelay(Duration.ofMillis(25L)).atMost(Duration.ofSeconds(1L)).until(() -> {
            return Boolean.valueOf(this.deadLetterQueue.amountOfSequences() == 1);
        });
        DeadLetter deadLetter = (DeadLetter) ((Iterable) this.deadLetterQueue.deadLetters().iterator().next()).iterator().next();
        Assertions.assertTrue(deadLetter.cause().isPresent());
        Assertions.assertEquals(ReferenceException.class.getName(), ((Cause) deadLetter.cause().get()).type());
    }

    @Test
    void largeThrowableMessageIsTruncatedUponCauseCreation() {
        String str = "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Gravida quis blandit turpis cursus in. Nulla facilisi etiam dignissim diam quis enim lobortis scelerisque fermentum. Egestas maecenas pharetra convallis posuere morbi leo urna. Dictumst quisque sagittis purus sit amet volutpat consequat. At volutpat diam ut venenatis tellus in metus vulputate eu. Imperdiet dui accumsan sit amet nulla facilisi. Eget est lorem ipsum dolor sit amet. Vestibulum morbi blandit cursus risus at ultrices mi tempus imperdiet. Sed tempus urna et pharetra pharetra massa massa. Dolor magna eget est lorem. Purus semper eget duis at tellus. Tincidunt augue interdum velit euismod in pellentesque massa placerat duis.\n\nQuis ipsum suspendisse ultrices gravida dictum fusce ut. Nascetur ridiculus mus mauris vitae ultricies leo integer malesuada. Sit amet purus gravida quis blandit turpis cursus in. Gravida rutrum quisque non tellus. Eros donec ac odio tempor orci dapibus. Dictum varius duis at consectetur lorem donec massa sapien.Tincidunt arcu non sodales neque sodales ut etiam sit amet. Sagittis aliquam malesuada bibendum arcu vitae. Vel turpis nunc eget lorem dolor sed viverra. In egestas erat imperdiet sed euismod nisi. Lorem ipsum dolor sit amet consectetur." + "truncated-text";
        this.eventSource.publishMessage(EventTestUtils.asEventMessage(new DeadLetterableEvent(UUID.randomUUID().toString(), false, false, str)));
        startProcessingEvent();
        Awaitility.await().pollDelay(Duration.ofMillis(25L)).atMost(Duration.ofSeconds(1L)).until(() -> {
            return Boolean.valueOf(this.deadLetterQueue.amountOfSequences() == 1);
        });
        Optional cause = ((DeadLetter) ((Iterable) this.deadLetterQueue.deadLetters().iterator().next()).iterator().next()).cause();
        Assertions.assertTrue(cause.isPresent());
        String message = ((Cause) cause.get()).message();
        Assertions.assertNotEquals(str, message);
        Assertions.assertFalse(message.contains("truncated-text"));
        Assertions.assertTrue(message.contains(BLOB_OF_TEXT.substring(0, 10)));
    }

    private void publishEventsFor(String str, int i, int i2, int i3) {
        for (int i4 = 0; i4 < i; i4++) {
            this.eventSource.publishMessage(EventTestUtils.asEventMessage(new DeadLetterableEvent(str, true)));
        }
        for (int i5 = 0; i5 < i2; i5++) {
            if (i5 == 0) {
                this.eventSource.publishMessage(EventTestUtils.asEventMessage(new DeadLetterableEvent(str, false, true)));
            } else if (i2 - i3 == i5) {
                this.eventSource.publishMessage(EventTestUtils.asEventMessage(new DeadLetterableEvent(str, true, false)));
            } else {
                this.eventSource.publishMessage(EventTestUtils.asEventMessage(new DeadLetterableEvent(str, true, true)));
            }
        }
    }

    private MessageHandlerInterceptor<? super EventMessage<?>> errorCatchingInterceptor(AtomicBoolean atomicBoolean) {
        return (legacyUnitOfWork, interceptorChain) -> {
            atomicBoolean.set(true);
            try {
                interceptorChain.proceedSync();
                return legacyUnitOfWork;
            } catch (RuntimeException e) {
                return legacyUnitOfWork;
            }
        };
    }
}
