package org.axonframework.eventsourcing;

import java.lang.Thread;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.axonframework.eventhandling.DomainEventMessage;
import org.axonframework.eventhandling.EventBus;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventsourcing.eventstore.DomainEventStream;
import org.axonframework.eventsourcing.eventstore.LegacyEmbeddedEventStore;
import org.axonframework.eventsourcing.eventstore.LegacyEventStore;
import org.axonframework.eventsourcing.eventstore.inmemory.LegacyInMemoryEventStorageEngine;
import org.axonframework.eventsourcing.utils.StubDomainEvent;
import org.axonframework.messaging.Message;
import org.axonframework.messaging.unitofwork.LegacyDefaultUnitOfWork;
import org.axonframework.modelling.command.Aggregate;
import org.axonframework.modelling.command.AggregateIdentifier;
import org.axonframework.modelling.command.AggregateLifecycle;
import org.axonframework.modelling.command.ConcurrencyException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.mockito.Mockito;

/* loaded from: input_file:org/axonframework/eventsourcing/EventSourcingRepositoryIntegrationTest.class */
public class EventSourcingRepositoryIntegrationTest implements Thread.UncaughtExceptionHandler {
    private static final int CONCURRENT_MODIFIERS = 10;
    private LegacyEventSourcingRepository<SimpleAggregateRoot> repository;
    private String aggregateIdentifier;
    private LegacyEventStore eventStore;
    private List<Throwable> uncaughtExceptions = new CopyOnWriteArrayList();
    private List<Thread> startedThreads = new ArrayList();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/axonframework/eventsourcing/EventSourcingRepositoryIntegrationTest$SimpleAggregateFactory.class */
    public static class SimpleAggregateFactory extends AbstractAggregateFactory<SimpleAggregateRoot> {
        SimpleAggregateFactory() {
            super(SimpleAggregateRoot.class);
        }

        /* renamed from: doCreateAggregate, reason: merged with bridge method [inline-methods] */
        public SimpleAggregateRoot m8doCreateAggregate(String str, DomainEventMessage domainEventMessage) {
            return new SimpleAggregateRoot(str);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/axonframework/eventsourcing/EventSourcingRepositoryIntegrationTest$SimpleAggregateRoot.class */
    public static class SimpleAggregateRoot {

        @AggregateIdentifier
        private String identifier;

        private SimpleAggregateRoot() {
            this.identifier = UUID.randomUUID().toString();
            AggregateLifecycle.apply(new StubDomainEvent());
        }

        private SimpleAggregateRoot(String str) {
            this.identifier = str;
        }

        private void doOperation() {
            AggregateLifecycle.apply(new StubDomainEvent());
        }

        @EventSourcingHandler
        protected void handle(EventMessage eventMessage) {
            this.identifier = ((DomainEventMessage) eventMessage).getAggregateIdentifier();
        }

        public String getIdentifier() {
            return this.identifier;
        }
    }

    @Timeout(6)
    @Test
    void pessimisticLocking() throws Throwable {
        initializeRepository();
        Assertions.assertEquals(20L, executeConcurrentModifications(CONCURRENT_MODIFIERS));
        Assertions.assertEquals(CONCURRENT_MODIFIERS, getSuccessfulModifications());
    }

    private int getSuccessfulModifications() {
        return CONCURRENT_MODIFIERS - this.uncaughtExceptions.size();
    }

    private void initializeRepository() throws Exception {
        this.eventStore = LegacyEmbeddedEventStore.builder().storageEngine(new LegacyInMemoryEventStorageEngine()).build();
        this.repository = LegacyEventSourcingRepository.builder(SimpleAggregateRoot.class).aggregateFactory(new SimpleAggregateFactory()).eventStore(this.eventStore).build();
        EventBus eventBus = (EventBus) Mockito.mock(EventBus.class);
        LegacyDefaultUnitOfWork startAndGet = LegacyDefaultUnitOfWork.startAndGet((Message) null);
        Aggregate newInstance = this.repository.newInstance(SimpleAggregateRoot::new);
        startAndGet.commit();
        Mockito.reset(new EventBus[]{eventBus});
        this.aggregateIdentifier = (String) newInstance.invoke((v0) -> {
            return v0.getIdentifier();
        });
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r1v8 */
    private long executeConcurrentModifications(int i) throws Throwable {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(i);
        for (int i2 = 0; i2 < i; i2++) {
            prepareAggregateModifier(countDownLatch, countDownLatch2, this.repository, this.aggregateIdentifier);
        }
        countDownLatch.countDown();
        TimeUnit timeUnit = TimeUnit.SECONDS;
        if (!countDownLatch2.await(30L, timeUnit)) {
            printDiagnosticInformation();
            Assertions.fail("Thread found to be alive after timeout. It might be hanging");
        }
        for (Throwable th : this.uncaughtExceptions) {
            if (!(th instanceof ConcurrencyException)) {
                throw th;
            }
        }
        DomainEventStream readEvents = this.eventStore.readEvents(this.aggregateIdentifier);
        long j = -1;
        Object obj = timeUnit;
        while (readEvents.hasNext()) {
            DomainEventMessage next = readEvents.next();
            long j2 = j + 1;
            j = obj;
            Assertions.assertEquals(j2, next.getSequenceNumber(), "Events are not stored sequentially. Most likely due to unlocked concurrent access.");
            obj = "Events are not stored sequentially. Most likely due to unlocked concurrent access.";
        }
        return j;
    }

    private void printDiagnosticInformation() {
        for (Thread thread : this.startedThreads) {
            System.out.print("## Thread [" + thread.getName() + "] did not properly shut down during Locking test. ##");
            if (thread.getState() != Thread.State.TERMINATED) {
                for (StackTraceElement stackTraceElement : thread.getStackTrace()) {
                    System.out.println(" - " + stackTraceElement.toString());
                }
            }
            System.out.println();
        }
    }

    private Thread prepareAggregateModifier(CountDownLatch countDownLatch, CountDownLatch countDownLatch2, LegacyEventSourcingRepository<SimpleAggregateRoot> legacyEventSourcingRepository, String str) {
        Thread thread = new Thread(() -> {
            try {
                try {
                    countDownLatch.await();
                    LegacyDefaultUnitOfWork startAndGet = LegacyDefaultUnitOfWork.startAndGet((Message) null);
                    Aggregate load = legacyEventSourcingRepository.load(str);
                    load.execute((v0) -> {
                        v0.doOperation();
                    });
                    load.execute((v0) -> {
                        v0.doOperation();
                    });
                    startAndGet.commit();
                    countDownLatch2.countDown();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            } catch (Throwable th) {
                countDownLatch2.countDown();
                throw th;
            }
        });
        thread.setUncaughtExceptionHandler(this);
        this.startedThreads.add(thread);
        thread.start();
        return thread;
    }

    @Override // java.lang.Thread.UncaughtExceptionHandler
    public void uncaughtException(Thread thread, Throwable th) {
        this.uncaughtExceptions.add(th);
    }
}
