package org.axonframework.eventsourcing.eventstore;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.GenericEventMessage;
import org.axonframework.eventhandling.TrackingToken;
import org.axonframework.eventsourcing.eventstore.EventStorageEngine;
import org.axonframework.eventsourcing.eventstore.inmemory.InMemoryEventStorageEngine;
import org.axonframework.eventstreaming.EventCriteria;
import org.axonframework.eventstreaming.EventTypeRestrictableEventCriteria;
import org.axonframework.eventstreaming.Tag;
import org.axonframework.messaging.Context;
import org.axonframework.messaging.MessageStream;
import org.axonframework.messaging.MessageType;
import org.axonframework.messaging.unitofwork.ProcessingContext;
import org.axonframework.messaging.unitofwork.UnitOfWork;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.testcontainers.shaded.org.awaitility.Awaitility;
import reactor.test.StepVerifier;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/axonframework/eventsourcing/eventstore/DefaultEventStoreTransactionTest.class */
public class DefaultEventStoreTransactionTest {
    private static final String TEST_AGGREGATE_ID = "someId";
    public static final Tag AGGREGATE_ID_TAG = new Tag("aggregateIdentifier", TEST_AGGREGATE_ID);
    private static final EventCriteria TEST_AGGREGATE_CRITERIA = EventCriteria.havingTags(new Tag[]{AGGREGATE_ID_TAG});
    private final Context.ResourceKey<EventStoreTransaction> testEventStoreTransactionKey = Context.ResourceKey.withLabel("eventStoreTransaction");
    private final InMemoryEventStorageEngine eventStorageEngine = new InMemoryEventStorageEngine();

    @Nested
    /* loaded from: input_file:org/axonframework/eventsourcing/eventstore/DefaultEventStoreTransactionTest$AppendEvent.class */
    class AppendEvent {
        AppendEvent() {
        }

        @Test
        void sourcingConditionIsMappedToAppendCondition() {
            EventCriteria eventCriteria = DefaultEventStoreTransactionTest.TEST_AGGREGATE_CRITERIA;
            EventMessage<?> eventMessage = DefaultEventStoreTransactionTest.eventMessage(0);
            EventMessage<?> eventMessage2 = DefaultEventStoreTransactionTest.eventMessage(1);
            EventMessage<?> eventMessage3 = DefaultEventStoreTransactionTest.eventMessage(2);
            SourcingCondition conditionFor = SourcingCondition.conditionFor(eventCriteria);
            AtomicReference atomicReference = new AtomicReference();
            AtomicReference atomicReference2 = new AtomicReference();
            AtomicReference atomicReference3 = new AtomicReference();
            UnitOfWork unitOfWork = new UnitOfWork();
            unitOfWork.runOnPreInvocation(processingContext -> {
                atomicReference.set(DefaultEventStoreTransactionTest.this.defaultEventStoreTransactionFor(processingContext).source(conditionFor));
            }).runOnPostInvocation(processingContext2 -> {
                EventStoreTransaction defaultEventStoreTransactionFor = DefaultEventStoreTransactionTest.this.defaultEventStoreTransactionFor(processingContext2);
                defaultEventStoreTransactionFor.appendEvent(eventMessage);
                defaultEventStoreTransactionFor.appendEvent(eventMessage2);
                defaultEventStoreTransactionFor.appendEvent(eventMessage3);
            }).runOnAfterCommit(processingContext3 -> {
                EventStoreTransaction defaultEventStoreTransactionFor = DefaultEventStoreTransactionTest.this.defaultEventStoreTransactionFor(processingContext3);
                atomicReference2.set(defaultEventStoreTransactionFor.source(conditionFor));
                atomicReference3.set(defaultEventStoreTransactionFor.appendPosition());
            });
            DefaultEventStoreTransactionTest.awaitCompletion(unitOfWork.execute());
            Assertions.assertNull(((MessageStream) atomicReference.get()).first().asCompletableFuture().join());
            StepVerifier.create(((MessageStream) atomicReference2.get()).asFlux()).assertNext(entry -> {
                DefaultEventStoreTransactionTest.assertTagsPositionAndEvent(entry, eventCriteria, 0, eventMessage);
            }).assertNext(entry2 -> {
                DefaultEventStoreTransactionTest.assertTagsPositionAndEvent(entry2, eventCriteria, 1, eventMessage2);
            }).assertNext(entry3 -> {
                DefaultEventStoreTransactionTest.assertTagsPositionAndEvent(entry3, eventCriteria, 2, eventMessage3);
            }).verifyComplete();
            Assertions.assertEquals(GlobalIndexConsistencyMarker.position(new GlobalIndexConsistencyMarker(2L)), GlobalIndexConsistencyMarker.position((ConsistencyMarker) atomicReference3.get()));
        }

        @Test
        void sourceReturnsOnlyCommitedEvents() {
            EventMessage<?> eventMessage = DefaultEventStoreTransactionTest.eventMessage(0);
            EventMessage<?> eventMessage2 = DefaultEventStoreTransactionTest.eventMessage(1);
            SourcingCondition conditionFor = SourcingCondition.conditionFor(DefaultEventStoreTransactionTest.TEST_AGGREGATE_CRITERIA);
            AtomicReference atomicReference = new AtomicReference();
            AtomicReference atomicReference2 = new AtomicReference();
            UnitOfWork unitOfWork = new UnitOfWork();
            unitOfWork.runOnPreInvocation(processingContext -> {
                EventStoreTransaction defaultEventStoreTransactionFor = DefaultEventStoreTransactionTest.this.defaultEventStoreTransactionFor(processingContext);
                defaultEventStoreTransactionFor.appendEvent(eventMessage);
                defaultEventStoreTransactionFor.appendEvent(eventMessage2);
                atomicReference.set(defaultEventStoreTransactionFor.source(conditionFor));
            }).runOnAfterCommit(processingContext2 -> {
                atomicReference2.set(DefaultEventStoreTransactionTest.this.defaultEventStoreTransactionFor(processingContext2).source(conditionFor));
            });
            DefaultEventStoreTransactionTest.awaitCompletion(unitOfWork.execute());
            StepVerifier.create(((MessageStream) atomicReference.get()).asFlux()).verifyComplete();
            StepVerifier.create(((MessageStream) atomicReference2.get()).asFlux()).assertNext(entry -> {
                DefaultEventStoreTransactionTest.assertTagsPositionAndEvent(entry, DefaultEventStoreTransactionTest.TEST_AGGREGATE_CRITERIA, 0, eventMessage);
            }).assertNext(entry2 -> {
                DefaultEventStoreTransactionTest.assertTagsPositionAndEvent(entry2, DefaultEventStoreTransactionTest.TEST_AGGREGATE_CRITERIA, 1, eventMessage2);
            }).verifyComplete();
        }

        @Test
        void appendCommitsOfNonExistentTagWhenOfTwoNonOverlappingTagsOneYieldedNoEvents() {
            Tag tag = new Tag("nonExistent", "tag");
            EventTypeRestrictableEventCriteria havingTags = EventCriteria.havingTags(new Tag[]{tag});
            Tag tag2 = new Tag("existent", "tag");
            EventTypeRestrictableEventCriteria havingTags2 = EventCriteria.havingTags(new Tag[]{tag2});
            appendEventForTag(tag2);
            testCanCommitTag(havingTags, havingTags2, tag);
        }

        @Test
        void appendCommitsOfExistentTagWhenOfTwoNonOverlappingTagsOneYieldedNoEvents() {
            EventTypeRestrictableEventCriteria havingTags = EventCriteria.havingTags(new Tag[]{new Tag("nonExistent", "tag")});
            Tag tag = new Tag("existent", "tag");
            EventTypeRestrictableEventCriteria havingTags2 = EventCriteria.havingTags(new Tag[]{tag});
            appendEventForTag(tag);
            testCanCommitTag(havingTags, havingTags2, tag);
        }

        private ConsistencyMarker appendEventForTag(Tag tag) {
            return (ConsistencyMarker) ((EventStorageEngine.AppendTransaction) DefaultEventStoreTransactionTest.this.eventStorageEngine.appendEvents(AppendCondition.none(), new TaggedEventMessage[]{new GenericTaggedEventMessage(new GenericEventMessage(new MessageType(String.class), "my payload"), Set.of(tag))}).join()).commit().join();
        }

        private void testCanCommitTag(EventCriteria eventCriteria, EventCriteria eventCriteria2, Tag tag) {
            DefaultEventStoreTransactionTest.awaitCompletion(new UnitOfWork().executeWithResult(processingContext -> {
                EventStoreTransaction defaultEventStoreTransactionFor = DefaultEventStoreTransactionTest.this.defaultEventStoreTransactionFor(processingContext, eventMessage -> {
                    return Set.of(tag);
                });
                defaultEventStoreTransactionFor.source(SourcingCondition.conditionFor(eventCriteria)).asFlux().blockLast();
                defaultEventStoreTransactionFor.source(SourcingCondition.conditionFor(eventCriteria2)).asFlux().blockLast();
                defaultEventStoreTransactionFor.appendEvent(new GenericEventMessage(new MessageType(String.class), "my payload"));
                return MessageStream.empty().asCompletableFuture();
            }));
        }
    }

    @Nested
    /* loaded from: input_file:org/axonframework/eventsourcing/eventstore/DefaultEventStoreTransactionTest$AppendPosition.class */
    class AppendPosition {
        AppendPosition() {
        }

        @Test
        void appendPositionReturnsMinusOneWhenNoEventsAppended() {
            AtomicReference atomicReference = new AtomicReference();
            UnitOfWork unitOfWork = new UnitOfWork();
            unitOfWork.runOnAfterCommit(processingContext -> {
                atomicReference.set(DefaultEventStoreTransactionTest.this.defaultEventStoreTransactionFor(processingContext).appendPosition());
            });
            DefaultEventStoreTransactionTest.awaitCompletion(unitOfWork.execute());
            Assertions.assertEquals(ConsistencyMarker.ORIGIN, atomicReference.get());
        }

        @Test
        void appendPositionReturnsConsistencyMarkerOfTheResultWhenEventsAppended() {
            AtomicReference atomicReference = new AtomicReference();
            UnitOfWork unitOfWork = new UnitOfWork();
            unitOfWork.runOnPreInvocation(processingContext -> {
                EventStoreTransaction defaultEventStoreTransactionFor = DefaultEventStoreTransactionTest.this.defaultEventStoreTransactionFor(processingContext);
                defaultEventStoreTransactionFor.appendEvent(DefaultEventStoreTransactionTest.eventMessage(0));
                defaultEventStoreTransactionFor.appendEvent(DefaultEventStoreTransactionTest.eventMessage(1));
                defaultEventStoreTransactionFor.appendEvent(DefaultEventStoreTransactionTest.eventMessage(2));
                defaultEventStoreTransactionFor.appendEvent(DefaultEventStoreTransactionTest.eventMessage(3));
            }).runOnAfterCommit(processingContext2 -> {
                atomicReference.set(DefaultEventStoreTransactionTest.this.defaultEventStoreTransactionFor(processingContext2).appendPosition());
            });
            DefaultEventStoreTransactionTest.awaitCompletion(unitOfWork.execute());
            Assertions.assertEquals(GlobalIndexConsistencyMarker.position(new GlobalIndexConsistencyMarker(3L)), GlobalIndexConsistencyMarker.position((ConsistencyMarker) atomicReference.get()));
        }
    }

    @Nested
    /* loaded from: input_file:org/axonframework/eventsourcing/eventstore/DefaultEventStoreTransactionTest$OnAppendCallbacks.class */
    class OnAppendCallbacks {
        OnAppendCallbacks() {
        }

        @Test
        void appendEventNotifiesRegisteredCallbacks() {
            EventMessage<?> eventMessage = DefaultEventStoreTransactionTest.eventMessage(0);
            ArrayList arrayList = new ArrayList();
            ArrayList arrayList2 = new ArrayList();
            UnitOfWork unitOfWork = new UnitOfWork();
            unitOfWork.runOnPreInvocation(processingContext -> {
                EventStoreTransaction defaultEventStoreTransactionFor = DefaultEventStoreTransactionTest.this.defaultEventStoreTransactionFor(processingContext);
                Objects.requireNonNull(arrayList);
                defaultEventStoreTransactionFor.onAppend((v1) -> {
                    r1.add(v1);
                });
                Objects.requireNonNull(arrayList2);
                defaultEventStoreTransactionFor.onAppend((v1) -> {
                    r1.add(v1);
                });
                defaultEventStoreTransactionFor.appendEvent(eventMessage);
            });
            DefaultEventStoreTransactionTest.awaitCompletion(unitOfWork.execute());
            Assertions.assertEquals(1, arrayList.size());
            Assertions.assertEquals(1, arrayList2.size());
            Assertions.assertEquals(eventMessage.getIdentifier(), ((EventMessage) arrayList.getFirst()).getIdentifier());
            Assertions.assertEquals(eventMessage.getIdentifier(), ((EventMessage) arrayList2.getFirst()).getIdentifier());
        }

        @Test
        void appendEventNotifierRegisteredCallbacksEvenWhenTransactionRollback() {
            EventMessage<?> eventMessage = DefaultEventStoreTransactionTest.eventMessage(0);
            AtomicBoolean atomicBoolean = new AtomicBoolean(false);
            UnitOfWork unitOfWork = new UnitOfWork();
            unitOfWork.runOnPreInvocation(processingContext -> {
                EventStoreTransaction defaultEventStoreTransactionFor = DefaultEventStoreTransactionTest.this.defaultEventStoreTransactionFor(processingContext);
                defaultEventStoreTransactionFor.onAppend(eventMessage2 -> {
                    atomicBoolean.set(true);
                });
                defaultEventStoreTransactionFor.appendEvent(eventMessage);
            }).runOnPrepareCommit(processingContext2 -> {
                throw new RuntimeException("Simulated failure during prepare commit");
            });
            Assertions.assertThrows(RuntimeException.class, () -> {
                DefaultEventStoreTransactionTest.awaitCompletion(unitOfWork.execute());
            });
            Assertions.assertTrue(atomicBoolean.get());
        }
    }

    @Nested
    /* loaded from: input_file:org/axonframework/eventsourcing/eventstore/DefaultEventStoreTransactionTest$TransactionRollback.class */
    class TransactionRollback {
        TransactionRollback() {
        }

        @Test
        void eventsAreNotAppendedWhenTransactionFails() {
            EventMessage<?> eventMessage = DefaultEventStoreTransactionTest.eventMessage(0);
            EventMessage<?> eventMessage2 = DefaultEventStoreTransactionTest.eventMessage(1);
            SourcingCondition conditionFor = SourcingCondition.conditionFor(DefaultEventStoreTransactionTest.TEST_AGGREGATE_CRITERIA);
            UnitOfWork unitOfWork = new UnitOfWork();
            unitOfWork.runOnPreInvocation(processingContext -> {
                EventStoreTransaction defaultEventStoreTransactionFor = DefaultEventStoreTransactionTest.this.defaultEventStoreTransactionFor(processingContext);
                defaultEventStoreTransactionFor.appendEvent(eventMessage);
                defaultEventStoreTransactionFor.appendEvent(eventMessage2);
            }).runOnPrepareCommit(processingContext2 -> {
                throw new IllegalStateException("Simulated failure during prepare commit");
            });
            Assertions.assertThrows(CompletionException.class, () -> {
                DefaultEventStoreTransactionTest.awaitException(unitOfWork.execute());
            });
            UnitOfWork unitOfWork2 = new UnitOfWork();
            AtomicReference atomicReference = new AtomicReference();
            unitOfWork2.runOnPreInvocation(processingContext3 -> {
                atomicReference.set(DefaultEventStoreTransactionTest.this.defaultEventStoreTransactionFor(processingContext3).source(conditionFor));
            });
            DefaultEventStoreTransactionTest.awaitCompletion(unitOfWork2.execute());
            StepVerifier.create(((MessageStream) atomicReference.get()).asFlux()).verifyComplete();
        }

        @Test
        void errorPropagationIsHandledByOnErrorPhase() {
            EventMessage<?> eventMessage = DefaultEventStoreTransactionTest.eventMessage(0);
            AtomicReference atomicReference = new AtomicReference();
            AtomicBoolean atomicBoolean = new AtomicBoolean(false);
            AtomicBoolean atomicBoolean2 = new AtomicBoolean(false);
            AtomicBoolean atomicBoolean3 = new AtomicBoolean(false);
            UnitOfWork unitOfWork = new UnitOfWork();
            unitOfWork.onError((processingContext, phase, th) -> {
                atomicReference.set(th);
            }).runOnPreInvocation(processingContext2 -> {
                DefaultEventStoreTransactionTest.this.defaultEventStoreTransactionFor(processingContext2).appendEvent(eventMessage);
            }).runOnPrepareCommit(processingContext3 -> {
                throw new RuntimeException("Simulated failure during prepare commit");
            }).runOnCommit(processingContext4 -> {
                atomicBoolean.set(true);
            }).runOnAfterCommit(processingContext5 -> {
                atomicBoolean2.set(true);
            }).runOnPostInvocation(processingContext6 -> {
                atomicBoolean3.set(true);
            });
            RuntimeException runtimeException = (RuntimeException) Assertions.assertThrows(CompletionException.class, () -> {
                DefaultEventStoreTransactionTest.awaitException(unitOfWork.execute());
            });
            Assertions.assertNotNull(atomicReference.get());
            Assertions.assertEquals("Simulated failure during prepare commit", ((Throwable) atomicReference.get()).getMessage());
            Assertions.assertEquals(runtimeException.getCause(), atomicReference.get());
            Assertions.assertFalse(atomicBoolean.get(), "Commit step should not execute after an error");
            Assertions.assertFalse(atomicBoolean2.get(), "After commit step should not execute after an error");
            Assertions.assertTrue(atomicBoolean3.get(), "Post invocation step should be executed after an error");
        }
    }

    DefaultEventStoreTransactionTest() {
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <R> R awaitCompletion(CompletableFuture<R> completableFuture) {
        Awaitility.await().atMost(Duration.ofMillis(500L)).pollDelay(Duration.ofMillis(25L)).untilAsserted(() -> {
            Assertions.assertFalse(completableFuture.isCompletedExceptionally(), () -> {
                return completableFuture.exceptionNow().toString();
            });
        });
        return completableFuture.join();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <R> R awaitException(CompletableFuture<R> completableFuture) {
        Awaitility.await().atMost(Duration.ofMillis(500L)).pollDelay(Duration.ofMillis(25L)).untilAsserted(() -> {
            Assertions.assertTrue(completableFuture.isCompletedExceptionally(), "Expected exception but none occurred");
        });
        return completableFuture.join();
    }

    private EventStoreTransaction defaultEventStoreTransactionFor(ProcessingContext processingContext) {
        return defaultEventStoreTransactionFor(processingContext, eventMessage -> {
            return Set.of(AGGREGATE_ID_TAG);
        });
    }

    private EventStoreTransaction defaultEventStoreTransactionFor(ProcessingContext processingContext, TagResolver tagResolver) {
        return (EventStoreTransaction) processingContext.computeResourceIfAbsent(this.testEventStoreTransactionKey, () -> {
            return new DefaultEventStoreTransaction(this.eventStorageEngine, processingContext, tagResolver);
        });
    }

    protected static EventMessage<?> eventMessage(int i) {
        return new GenericEventMessage(new MessageType("test", "event", "0.0.1"), "event-" + i);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void assertTagsPositionAndEvent(MessageStream.Entry<? extends EventMessage<?>> entry, EventCriteria eventCriteria, int i, EventMessage<?> eventMessage) {
        Optional fromContext = Tag.fromContext(entry);
        Assertions.assertTrue(fromContext.isPresent());
        Assertions.assertTrue(((Set) fromContext.get()).containsAll((Set) eventCriteria.flatten().stream().map(eventCriterion -> {
            return eventCriterion.tags();
        }).flatMap((v0) -> {
            return v0.stream();
        }).collect(Collectors.toSet())));
        assertPositionAndEvent(entry, i, eventMessage);
    }

    private static void assertPositionAndEvent(MessageStream.Entry<? extends EventMessage<?>> entry, long j, EventMessage<?> eventMessage) {
        Optional fromContext = TrackingToken.fromContext(entry);
        Assertions.assertTrue(fromContext.isPresent());
        OptionalLong position = ((TrackingToken) fromContext.get()).position();
        Assertions.assertTrue(position.isPresent());
        Assertions.assertEquals(j, position.getAsLong());
        assertEvent(entry.message(), eventMessage);
    }

    private static void assertEvent(EventMessage<?> eventMessage, EventMessage<?> eventMessage2) {
        Assertions.assertEquals(eventMessage2.getIdentifier(), eventMessage.getIdentifier());
        Assertions.assertEquals(eventMessage2.getPayload(), eventMessage.getPayload());
        Assertions.assertEquals(eventMessage2.getTimestamp(), eventMessage.getTimestamp());
        Assertions.assertEquals(eventMessage2.getMetaData(), eventMessage.getMetaData());
    }
}
