package org.axonframework.eventhandling.async;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.EventTestUtils;
import org.axonframework.serialization.upcasting.event.EventTypeUpcasterTest;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

/* loaded from: input_file:org/axonframework/eventhandling/async/AsynchronousEventProcessorConcurrencyTest.class */
class AsynchronousEventProcessorConcurrencyTest {
    private ExecutorService executor;
    private AsynchronousEventProcessingStrategy testSubject;

    /* loaded from: input_file:org/axonframework/eventhandling/async/AsynchronousEventProcessorConcurrencyTest$EventsPublisher.class */
    private class EventsPublisher implements Runnable {
        private static final int ITERATIONS = 10000;
        private static final int EVENTS_COUNT = 30000;
        private final Consumer<List<? extends EventMessage<?>>> processor;

        public EventsPublisher(Consumer<List<? extends EventMessage<?>>> consumer) {
            this.processor = consumer;
        }

        @Override // java.lang.Runnable
        public void run() {
            for (int i = 0; i < ITERATIONS; i++) {
                AsynchronousEventProcessorConcurrencyTest.this.testSubject.handle(Collections.singletonList(EventTestUtils.asEventMessage(EventTypeUpcasterTest.EXPECTED_REVISION)), this.processor);
                AsynchronousEventProcessorConcurrencyTest.this.testSubject.handle(Collections.singletonList(EventTestUtils.asEventMessage(EventTypeUpcasterTest.UPCASTED_REVISION)), this.processor);
                AsynchronousEventProcessorConcurrencyTest.this.testSubject.handle(Collections.singletonList(EventTestUtils.asEventMessage("3")), this.processor);
            }
        }
    }

    AsynchronousEventProcessorConcurrencyTest() {
    }

    @BeforeEach
    void setUp() {
        this.executor = Executors.newCachedThreadPool();
        this.testSubject = new AsynchronousEventProcessingStrategy(this.executor, (v0) -> {
            return v0.getPayload();
        });
    }

    @Timeout(value = 30000, unit = TimeUnit.MILLISECONDS)
    @Test
    void handleEvents() throws InterruptedException {
        AtomicInteger atomicInteger = new AtomicInteger();
        Consumer consumer = list -> {
            atomicInteger.addAndGet(list.size());
        };
        for (int i = 0; i < 50; i++) {
            this.executor.submit(new EventsPublisher(consumer));
        }
        while (atomicInteger.get() < 50 * 30000) {
            Thread.sleep(10L);
        }
        this.executor.shutdown();
        Assertions.assertTrue(this.executor.awaitTermination(10L, TimeUnit.SECONDS), "Executor not closed within a reasonable timeframe");
        Assertions.assertEquals(50 * 30000, atomicInteger.get());
    }
}
