package org.axonframework.eventsourcing.eventstore;

import jakarta.annotation.Nonnull;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.ObjectMethods;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.function.Supplier;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.GenericEventMessage;
import org.axonframework.eventhandling.TrackingToken;
import org.axonframework.eventsourcing.eventstore.EventStorageEngine;
import org.axonframework.messaging.MessageStream;
import org.axonframework.messaging.MessageType;
import org.axonframework.messaging.MetaData;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.opentest4j.TestAbortedException;
import reactor.test.StepVerifier;

/* loaded from: input_file:org/axonframework/eventsourcing/eventstore/AggregateBasedStorageEngineTestSuite.class */
public abstract class AggregateBasedStorageEngineTestSuite<ESE extends EventStorageEngine> {
    private static final String TEST_AGGREGATE_TYPE = "TEST_AGGREGATE";
    private static ExecutorService executor;
    protected String TEST_AGGREGATE_ID;
    protected String OTHER_AGGREGATE_ID;
    protected Set<Tag> TEST_AGGREGATE_TAGS;
    protected EventCriteria TEST_AGGREGATE_CRITERIA;
    protected Set<Tag> OTHER_AGGREGATE_TAGS;
    protected EventCriteria OTHER_AGGREGATE_CRITERIA;
    protected ESE testSubject;

    /* loaded from: input_file:org/axonframework/eventsourcing/eventstore/AggregateBasedStorageEngineTestSuite$ComplexObject.class */
    public static final class ComplexObject extends Record {
        private final String value1;
        private final boolean value2;
        private final int value3;

        public ComplexObject(String str, boolean z, int i) {
            this.value1 = str;
            this.value2 = z;
            this.value3 = i;
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, ComplexObject.class), ComplexObject.class, "value1;value2;value3", "FIELD:Lorg/axonframework/eventsourcing/eventstore/AggregateBasedStorageEngineTestSuite$ComplexObject;->value1:Ljava/lang/String;", "FIELD:Lorg/axonframework/eventsourcing/eventstore/AggregateBasedStorageEngineTestSuite$ComplexObject;->value2:Z", "FIELD:Lorg/axonframework/eventsourcing/eventstore/AggregateBasedStorageEngineTestSuite$ComplexObject;->value3:I").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, ComplexObject.class), ComplexObject.class, "value1;value2;value3", "FIELD:Lorg/axonframework/eventsourcing/eventstore/AggregateBasedStorageEngineTestSuite$ComplexObject;->value1:Ljava/lang/String;", "FIELD:Lorg/axonframework/eventsourcing/eventstore/AggregateBasedStorageEngineTestSuite$ComplexObject;->value2:Z", "FIELD:Lorg/axonframework/eventsourcing/eventstore/AggregateBasedStorageEngineTestSuite$ComplexObject;->value3:I").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, ComplexObject.class, Object.class), ComplexObject.class, "value1;value2;value3", "FIELD:Lorg/axonframework/eventsourcing/eventstore/AggregateBasedStorageEngineTestSuite$ComplexObject;->value1:Ljava/lang/String;", "FIELD:Lorg/axonframework/eventsourcing/eventstore/AggregateBasedStorageEngineTestSuite$ComplexObject;->value2:Z", "FIELD:Lorg/axonframework/eventsourcing/eventstore/AggregateBasedStorageEngineTestSuite$ComplexObject;->value3:I").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        public String value1() {
            return this.value1;
        }

        public boolean value2() {
            return this.value2;
        }

        public int value3() {
            return this.value3;
        }
    }

    @BeforeEach
    void setUp() throws Exception {
        this.TEST_AGGREGATE_ID = UUID.randomUUID().toString();
        this.OTHER_AGGREGATE_ID = UUID.randomUUID().toString();
        this.TEST_AGGREGATE_TAGS = Set.of(new Tag(TEST_AGGREGATE_TYPE, this.TEST_AGGREGATE_ID));
        this.TEST_AGGREGATE_CRITERIA = EventCriteria.havingTags(new String[]{TEST_AGGREGATE_TYPE, this.TEST_AGGREGATE_ID});
        this.OTHER_AGGREGATE_TAGS = Set.of(new Tag("OTHER_AGGREGATE", this.OTHER_AGGREGATE_ID));
        this.OTHER_AGGREGATE_CRITERIA = EventCriteria.havingTags(new String[]{"OTHER_AGGREGATE", this.OTHER_AGGREGATE_ID});
        this.testSubject = buildStorageEngine();
    }

    @BeforeAll
    static void beforeAll() {
        executor = Executors.newVirtualThreadPerTaskExecutor();
    }

    @AfterAll
    static void afterAll() {
        executor.close();
    }

    protected abstract ESE buildStorageEngine() throws Exception;

    protected abstract long globalSequenceOfEvent(long j);

    protected abstract TrackingToken trackingTokenAt(long j);

    @Test
    void streamingFromStartReturnsSelectedMessages() {
        TaggedEventMessage<?> taggedEventMessage = taggedEventMessage("event-0", this.TEST_AGGREGATE_TAGS);
        TaggedEventMessage<?> taggedEventMessage2 = taggedEventMessage("event-1", this.TEST_AGGREGATE_TAGS);
        TaggedEventMessage<?> taggedEventMessage3 = taggedEventMessage("event-4", this.TEST_AGGREGATE_TAGS);
        ConsistencyMarker consistencyMarker = (ConsistencyMarker) this.testSubject.appendEvents(AppendCondition.withCriteria(this.TEST_AGGREGATE_CRITERIA), new TaggedEventMessage[]{taggedEventMessage, taggedEventMessage2}).thenCompose((v0) -> {
            return v0.commit();
        }).join();
        this.testSubject.appendEvents(AppendCondition.none(), new TaggedEventMessage[]{taggedEventMessage("event-2", Collections.emptySet()), taggedEventMessage("event-3", Collections.emptySet())}).thenCompose((v0) -> {
            return v0.commit();
        }).join();
        this.testSubject.appendEvents(new DefaultAppendCondition(consistencyMarker, this.TEST_AGGREGATE_CRITERIA), new TaggedEventMessage[]{taggedEventMessage3}).thenCompose((v0) -> {
            return v0.commit();
        }).join();
        this.testSubject.appendEvents(AppendCondition.none(), new TaggedEventMessage[]{taggedEventMessage("event-5", Collections.emptySet()), taggedEventMessage("event-6", Collections.emptySet())}).thenCompose((v0) -> {
            return v0.commit();
        }).join();
        StepVerifier.create(this.testSubject.stream(StreamingCondition.startingFrom(trackingTokenAt(0L))).asFlux()).assertNext(entry -> {
            assertTrackedEntry(entry, taggedEventMessage.event(), 1L);
        }).assertNext(entry2 -> {
            assertTrackedEntry(entry2, taggedEventMessage2.event(), 2L);
        }).expectNextCount(2L).assertNext(entry3 -> {
            assertTrackedEntry(entry3, taggedEventMessage3.event(), 5L);
        }).expectNextCount(2L).thenCancel().verify();
    }

    @Test
    void streamingFromSpecificPositionSkipsMessages() {
        TaggedEventMessage<?> taggedEventMessage = taggedEventMessage("event-0", this.TEST_AGGREGATE_TAGS);
        TaggedEventMessage<?> taggedEventMessage2 = taggedEventMessage("event-1", this.TEST_AGGREGATE_TAGS);
        TaggedEventMessage<?> taggedEventMessage3 = taggedEventMessage("event-4", this.TEST_AGGREGATE_TAGS);
        this.testSubject.appendEvents(AppendCondition.withCriteria(this.TEST_AGGREGATE_CRITERIA), new TaggedEventMessage[]{taggedEventMessage, taggedEventMessage2, taggedEventMessage("event-2", Set.of()), taggedEventMessage("event-3", this.TEST_AGGREGATE_TAGS), taggedEventMessage3, taggedEventMessage("event-5", this.TEST_AGGREGATE_TAGS), taggedEventMessage("event-6", this.TEST_AGGREGATE_TAGS)}).thenCompose((v0) -> {
            return v0.commit();
        }).join();
        StepVerifier.create(this.testSubject.stream(StreamingCondition.startingFrom(trackingTokenAt(2L))).asFlux()).expectNextCount(2L).assertNext(entry -> {
            assertTrackedEntry(entry, taggedEventMessage3.event(), 5L);
        }).expectNextCount(2L).thenCancel().verify();
    }

    @Test
    void streamingAfterLastPositionReturnsEmptyStream() {
        EventCriteria eventCriteria = this.TEST_AGGREGATE_CRITERIA;
        TaggedEventMessage<?> taggedEventMessage = taggedEventMessage("event-0", this.TEST_AGGREGATE_TAGS);
        TaggedEventMessage<?> taggedEventMessage2 = taggedEventMessage("event-1", this.TEST_AGGREGATE_TAGS);
        TaggedEventMessage<?> taggedEventMessage3 = taggedEventMessage("event-4", this.TEST_AGGREGATE_TAGS);
        ConsistencyMarker consistencyMarker = (ConsistencyMarker) this.testSubject.appendEvents(AppendCondition.withCriteria(eventCriteria), new TaggedEventMessage[]{taggedEventMessage, taggedEventMessage2}).thenCompose((v0) -> {
            return v0.commit();
        }).join();
        this.testSubject.appendEvents(AppendCondition.none(), new TaggedEventMessage[]{taggedEventMessage("event-2", Set.of()), taggedEventMessage("event-3", Set.of())}).thenCompose((v0) -> {
            return v0.commit();
        }).join();
        this.testSubject.appendEvents(new DefaultAppendCondition(consistencyMarker, eventCriteria), new TaggedEventMessage[]{taggedEventMessage3}).thenCompose((v0) -> {
            return v0.commit();
        }).join();
        this.testSubject.appendEvents(AppendCondition.none(), new TaggedEventMessage[]{taggedEventMessage("event-5", Set.of()), taggedEventMessage("event-6", Set.of())}).thenCompose((v0) -> {
            return v0.commit();
        }).join();
        MessageStream stream = this.testSubject.stream(StreamingCondition.startingFrom(trackingTokenAt(10L)).or(eventCriteria));
        try {
            Assertions.assertTrue(stream.next().isEmpty());
            stream.close();
        } catch (Throwable th) {
            stream.close();
            throw th;
        }
    }

    @Test
    void sourcingEventsReturnsMatchingAggregateEvent() {
        this.testSubject.appendEvents(AppendCondition.withCriteria(this.TEST_AGGREGATE_CRITERIA), new TaggedEventMessage[]{taggedEventMessage("event-0", this.TEST_AGGREGATE_TAGS)}).thenCompose((v0) -> {
            return v0.commit();
        }).join();
        StepVerifier.create(this.testSubject.source(SourcingCondition.conditionFor(this.TEST_AGGREGATE_CRITERIA)).asFlux()).expectNextMatches(entryWithAggregateEvent("event-0", 0)).verifyComplete();
    }

    @Test
    void sourcingEventsWithMetadata() {
        this.testSubject.appendEvents(AppendCondition.withCriteria(this.TEST_AGGREGATE_CRITERIA), new TaggedEventMessage[]{taggedEventMessage("event-0", this.TEST_AGGREGATE_TAGS, MetaData.with("key1", "value1").and("key2", true).and("key3", new ComplexObject("value1", false, 44)))}).thenCompose((v0) -> {
            return v0.commit();
        }).join();
        StepVerifier.create(this.testSubject.source(SourcingCondition.conditionFor(this.TEST_AGGREGATE_CRITERIA)).asFlux()).expectNextMatches(entryWithAggregateEvent("event-0", 0)).verifyComplete();
    }

    @Test
    void sourcingEventsReturnsMatchingAggregateEvents() {
        AppendCondition withCriteria = AppendCondition.withCriteria(this.TEST_AGGREGATE_CRITERIA);
        AppendCondition withCriteria2 = AppendCondition.withCriteria(this.OTHER_AGGREGATE_CRITERIA);
        this.testSubject.appendEvents(withCriteria, new TaggedEventMessage[]{taggedEventMessage("event-0", this.TEST_AGGREGATE_TAGS), taggedEventMessage("event-1", this.TEST_AGGREGATE_TAGS), taggedEventMessage("event-2", this.TEST_AGGREGATE_TAGS)}).thenCompose((v0) -> {
            return v0.commit();
        }).join();
        this.testSubject.appendEvents(withCriteria2, new TaggedEventMessage[]{taggedEventMessage("event-4", this.OTHER_AGGREGATE_TAGS), taggedEventMessage("event-5", this.OTHER_AGGREGATE_TAGS), taggedEventMessage("event-6", this.OTHER_AGGREGATE_TAGS)}).thenCompose((v0) -> {
            return v0.commit();
        }).join();
        StepVerifier.create(this.testSubject.source(SourcingCondition.conditionFor(this.TEST_AGGREGATE_CRITERIA)).asFlux()).expectNextMatches(entryWithAggregateEvent("event-0", 0)).expectNextMatches(entryWithAggregateEvent("event-1", 1)).expectNextMatches(entryWithAggregateEvent("event-2", 2)).verifyComplete();
    }

    @Test
    void eventsWithoutTagsAreNotSourcedAsAggregatedEvents() {
        this.testSubject.appendEvents(AppendCondition.withCriteria(this.TEST_AGGREGATE_CRITERIA), new TaggedEventMessage[]{taggedEventMessage("event-0", Set.of()), taggedEventMessage("event-1", this.TEST_AGGREGATE_TAGS), taggedEventMessage("event-2", Set.of()), taggedEventMessage("event-3", this.TEST_AGGREGATE_TAGS), taggedEventMessage("event-4", Set.of())}).thenCompose((v0) -> {
            return v0.commit();
        }).join();
        StepVerifier.create(this.testSubject.source(SourcingCondition.conditionFor(this.TEST_AGGREGATE_CRITERIA)).asFlux()).expectNextMatches(entryWithAggregateEvent("event-1", 0)).expectNextMatches(entryWithAggregateEvent("event-3", 1)).verifyComplete();
    }

    @Test
    void eventsWithTagsNotMatchingCriteriaAreInsertedAtSequenceZero() {
        this.testSubject.appendEvents(AppendCondition.withCriteria(this.OTHER_AGGREGATE_CRITERIA), new TaggedEventMessage[]{taggedEventMessage("event-4", this.TEST_AGGREGATE_TAGS), taggedEventMessage("event-5", this.TEST_AGGREGATE_TAGS), taggedEventMessage("event-6", this.TEST_AGGREGATE_TAGS)}).thenCompose((v0) -> {
            return v0.commit();
        }).join();
        StepVerifier.create(this.testSubject.source(SourcingCondition.conditionFor(this.TEST_AGGREGATE_CRITERIA)).asFlux()).expectNextMatches(entryWithAggregateEvent("event-4", 0)).expectNextMatches(entryWithAggregateEvent("event-5", 1)).expectNextMatches(entryWithAggregateEvent("event-6", 2)).verifyComplete();
    }

    @Test
    void sourcingFromTwoAggregateStreamsReturnsACombinedStream() {
        ConsistencyMarker consistencyMarker = (ConsistencyMarker) this.testSubject.appendEvents(AppendCondition.none(), new TaggedEventMessage[]{taggedEventMessage("event-0", this.TEST_AGGREGATE_TAGS), taggedEventMessage("event-1", this.OTHER_AGGREGATE_TAGS), taggedEventMessage("event-2", this.TEST_AGGREGATE_TAGS), taggedEventMessage("event-3", this.OTHER_AGGREGATE_TAGS), taggedEventMessage("event-4", this.TEST_AGGREGATE_TAGS), taggedEventMessage("event-5", this.OTHER_AGGREGATE_TAGS)}).thenCompose((v0) -> {
            return v0.commit();
        }).join();
        HashSet hashSet = new HashSet();
        AtomicReference atomicReference = new AtomicReference();
        StepVerifier.create(this.testSubject.source(SourcingCondition.conditionFor(this.TEST_AGGREGATE_CRITERIA.or(this.OTHER_AGGREGATE_CRITERIA))).asFlux()).thenConsumeWhile(entry -> {
            atomicReference.set((ConsistencyMarker) entry.getResource(ConsistencyMarker.RESOURCE_KEY));
            return hashSet.add((String) entry.map(this::convertPayload).message().getPayload());
        }).verifyComplete();
        Assertions.assertEquals(Set.of("event-0", "event-1", "event-2", "event-3", "event-4", "event-5"), hashSet);
        Assertions.assertEquals(consistencyMarker, atomicReference.get());
    }

    @Test
    void sourcingWithStartAndEndReturnsEventsWithinBounds() {
        this.testSubject.appendEvents(AppendCondition.none(), new TaggedEventMessage[]{taggedEventMessage("event-0", this.TEST_AGGREGATE_TAGS), taggedEventMessage("event-1", this.OTHER_AGGREGATE_TAGS), taggedEventMessage("event-2", this.TEST_AGGREGATE_TAGS), taggedEventMessage("event-3", this.OTHER_AGGREGATE_TAGS), taggedEventMessage("event-4", this.TEST_AGGREGATE_TAGS), taggedEventMessage("event-5", this.OTHER_AGGREGATE_TAGS)}).thenCompose((v0) -> {
            return v0.commit();
        }).join();
        ArrayList arrayList = new ArrayList();
        StepVerifier.create(this.testSubject.source(SourcingCondition.conditionFor(1L, 1L, this.TEST_AGGREGATE_CRITERIA)).asFlux()).thenConsumeWhile(entry -> {
            return arrayList.add((String) entry.map(this::convertPayload).message().getPayload());
        }).verifyComplete();
        Assertions.assertEquals(List.of("event-2"), arrayList);
    }

    @Test
    void sourcingFromTwoAggregatesWithStartAndEndRespectsBounds() {
        this.testSubject.appendEvents(AppendCondition.none(), new TaggedEventMessage[]{taggedEventMessage("event-0", this.TEST_AGGREGATE_TAGS), taggedEventMessage("event-1", this.OTHER_AGGREGATE_TAGS), taggedEventMessage("event-2", this.TEST_AGGREGATE_TAGS), taggedEventMessage("event-3", this.OTHER_AGGREGATE_TAGS), taggedEventMessage("event-4", this.TEST_AGGREGATE_TAGS), taggedEventMessage("event-5", this.OTHER_AGGREGATE_TAGS)}).thenCompose((v0) -> {
            return v0.commit();
        }).join();
        HashSet hashSet = new HashSet();
        try {
            StepVerifier.create(this.testSubject.source(SourcingCondition.conditionFor(1L, 2L, this.TEST_AGGREGATE_CRITERIA.or(this.OTHER_AGGREGATE_CRITERIA))).asFlux()).thenConsumeWhile(entry -> {
                return hashSet.add((String) entry.map(this::convertPayload).message().getPayload());
            }).verifyComplete();
            Assertions.assertEquals(Set.of("event-2", "event-3", "event-4", "event-5"), hashSet);
        } catch (IllegalArgumentException e) {
            throw new TestAbortedException("Multi-aggregate streams not supported", e);
        }
    }

    @Test
    void transactionRejectedWithConflictingEventsInStore() {
        ConsistencyMarker consistencyMarker = (ConsistencyMarker) this.testSubject.appendEvents(AppendCondition.withCriteria(this.TEST_AGGREGATE_CRITERIA), new TaggedEventMessage[]{taggedEventMessage("event-0", this.TEST_AGGREGATE_TAGS)}).thenCompose((v0) -> {
            return v0.commit();
        }).join();
        this.testSubject.appendEvents(AppendCondition.withCriteria(this.TEST_AGGREGATE_CRITERIA).withMarker(consistencyMarker), new TaggedEventMessage[]{taggedEventMessage("event-1", this.TEST_AGGREGATE_TAGS)}).thenApply((v0) -> {
            return v0.commit();
        }).join();
        CompletableFuture thenCompose = this.testSubject.appendEvents(AppendCondition.withCriteria(this.TEST_AGGREGATE_CRITERIA).withMarker(consistencyMarker), new TaggedEventMessage[]{taggedEventMessage("event-2", this.TEST_AGGREGATE_TAGS)}).thenCompose((v0) -> {
            return v0.commit();
        });
        Assertions.assertInstanceOf(AppendEventsTransactionRejectedException.class, ((ExecutionException) Assertions.assertThrows(ExecutionException.class, () -> {
            thenCompose.get(1L, TimeUnit.SECONDS);
        })).getCause());
    }

    @Test
    void transactionRejectedWhenConcurrentlyCreatedTransactionIsCommittedFirst() {
        CompletableFuture appendEvents = this.testSubject.appendEvents(AppendCondition.withCriteria(this.TEST_AGGREGATE_CRITERIA), new TaggedEventMessage[]{taggedEventMessage("event-10", this.TEST_AGGREGATE_TAGS)});
        CompletableFuture appendEvents2 = this.testSubject.appendEvents(AppendCondition.withCriteria(this.TEST_AGGREGATE_CRITERIA), new TaggedEventMessage[]{taggedEventMessage("event-11", this.TEST_AGGREGATE_TAGS)});
        CompletableFuture thenCompose = appendEvents.thenCompose((v0) -> {
            return v0.commit();
        });
        Assertions.assertDoesNotThrow(() -> {
            return (ConsistencyMarker) thenCompose.get(1L, TimeUnit.SECONDS);
        });
        CompletableFuture thenCompose2 = appendEvents2.thenCompose((v0) -> {
            return v0.commit();
        });
        Assertions.assertInstanceOf(AppendEventsTransactionRejectedException.class, ((ExecutionException) Assertions.assertThrows(ExecutionException.class, () -> {
            thenCompose2.get(1L, TimeUnit.SECONDS);
        })).getCause());
    }

    @Test
    void whenConflictingTransactionsRunOnDifferentThreadsConcurrentlyThenOnlyOneOfThemIsCommited() {
        List list = List.of(runAsync(() -> {
            return this.testSubject.appendEvents(AppendCondition.withCriteria(this.TEST_AGGREGATE_CRITERIA), new TaggedEventMessage[]{taggedEventMessage("event-10", this.TEST_AGGREGATE_TAGS)});
        }), runAsync(() -> {
            return this.testSubject.appendEvents(AppendCondition.withCriteria(this.TEST_AGGREGATE_CRITERIA), new TaggedEventMessage[]{taggedEventMessage("event-11", this.TEST_AGGREGATE_TAGS)});
        }), runAsync(() -> {
            return this.testSubject.appendEvents(AppendCondition.withCriteria(this.TEST_AGGREGATE_CRITERIA), new TaggedEventMessage[]{taggedEventMessage("event-12", this.TEST_AGGREGATE_TAGS)});
        }), runAsync(() -> {
            return this.testSubject.appendEvents(AppendCondition.withCriteria(this.TEST_AGGREGATE_CRITERIA), new TaggedEventMessage[]{taggedEventMessage("event-13", this.TEST_AGGREGATE_TAGS)});
        })).stream().map(completableFuture -> {
            return completableFuture.thenCompose((v0) -> {
                return v0.commit();
            });
        }).toList();
        CompletableFuture<Void> allOf = CompletableFuture.allOf((CompletableFuture[]) list.toArray(new CompletableFuture[4]));
        Objects.requireNonNull(allOf);
        Assertions.assertInstanceOf(AppendEventsTransactionRejectedException.class, ((Exception) Assertions.assertThrows(Exception.class, allOf::join)).getCause());
        Assertions.assertEquals(1L, list.stream().filter(completableFuture2 -> {
            return !completableFuture2.isCompletedExceptionally();
        }).count());
        Assertions.assertEquals(r0.size() - 1, list.stream().filter((v0) -> {
            return v0.isCompletedExceptionally();
        }).count());
    }

    private static <T> CompletableFuture<T> runAsync(Supplier<CompletableFuture<T>> supplier) {
        return CompletableFuture.supplyAsync(supplier, executor).thenCompose(completableFuture -> {
            return completableFuture;
        });
    }

    @Test
    void concurrentTransactionsForNonOverlappingTagsBothCommit() throws ExecutionException, InterruptedException, TimeoutException {
        EventStorageEngine.AppendTransaction appendTransaction = (EventStorageEngine.AppendTransaction) this.testSubject.appendEvents(AppendCondition.withCriteria(this.TEST_AGGREGATE_CRITERIA), new TaggedEventMessage[]{taggedEventMessage("event-0", this.TEST_AGGREGATE_TAGS)}).get(1L, TimeUnit.SECONDS);
        EventStorageEngine.AppendTransaction appendTransaction2 = (EventStorageEngine.AppendTransaction) this.testSubject.appendEvents(AppendCondition.withCriteria(this.OTHER_AGGREGATE_CRITERIA), new TaggedEventMessage[]{taggedEventMessage("event-0", this.OTHER_AGGREGATE_TAGS)}).get(1L, TimeUnit.SECONDS);
        CompletableFuture commit = appendTransaction.commit();
        CompletableFuture commit2 = appendTransaction2.commit();
        Assertions.assertDoesNotThrow(() -> {
            return (ConsistencyMarker) commit.get(1L, TimeUnit.SECONDS);
        });
        Assertions.assertDoesNotThrow(() -> {
            return (ConsistencyMarker) commit2.get(1L, TimeUnit.SECONDS);
        });
        Assertions.assertTrue(validConsistencyMarker((ConsistencyMarker) commit.join(), this.TEST_AGGREGATE_ID, 0));
        Assertions.assertTrue(validConsistencyMarker((ConsistencyMarker) commit2.join(), this.OTHER_AGGREGATE_ID, 0));
    }

    @Test
    void transactionCanBeCommitedOnlyOnce() {
        EventStorageEngine.AppendTransaction appendTransaction = (EventStorageEngine.AppendTransaction) this.testSubject.appendEvents(AppendCondition.withCriteria(this.TEST_AGGREGATE_CRITERIA), new TaggedEventMessage[]{taggedEventMessage("event-0", this.TEST_AGGREGATE_TAGS)}).join();
        Assertions.assertDoesNotThrow(() -> {
            return (ConsistencyMarker) appendTransaction.commit().get(1L, TimeUnit.SECONDS);
        });
        Assertions.assertThrows(Exception.class, () -> {
            appendTransaction.commit().get(1L, TimeUnit.SECONDS);
        });
    }

    @Test
    void emptyTransactionAlwaysCommitSuccessfullyAndReturnsOriginConsistencyMarker() {
        CompletableFuture thenCompose = this.testSubject.appendEvents(AppendCondition.withCriteria(this.TEST_AGGREGATE_CRITERIA), Collections.emptyList()).thenCompose((v0) -> {
            return v0.commit();
        });
        Objects.requireNonNull(thenCompose);
        Assertions.assertEquals(ConsistencyMarker.ORIGIN, (ConsistencyMarker) Assertions.assertDoesNotThrow(thenCompose::join));
    }

    @Test
    void eventWithMultipleTagsIsReportedAsPartOfException() {
        TaggedEventMessage<?> taggedEventMessage = taggedEventMessage("event2", Set.of(new Tag("key1", "value1"), new Tag("key2", "value2")));
        CompletableFuture appendEvents = this.testSubject.appendEvents(AppendCondition.none(), new TaggedEventMessage[]{taggedEventMessage("event1", Set.of(new Tag("key1", "value1"))), taggedEventMessage, taggedEventMessage("event3", Set.of(new Tag("key1", "value1")))});
        Assertions.assertTrue(appendEvents.isDone());
        Assertions.assertTrue(appendEvents.isCompletedExceptionally());
        Objects.requireNonNull(appendEvents);
        ExecutionException executionException = (ExecutionException) Assertions.assertThrows(ExecutionException.class, appendEvents::get);
        TooManyTagsOnEventMessageException cause = executionException.getCause();
        if (!(cause instanceof TooManyTagsOnEventMessageException)) {
            Assertions.fail("Unexpected exception", executionException);
            return;
        }
        TooManyTagsOnEventMessageException tooManyTagsOnEventMessageException = cause;
        Assertions.assertEquals(taggedEventMessage.tags(), tooManyTagsOnEventMessageException.tags());
        Assertions.assertEquals(taggedEventMessage.event(), tooManyTagsOnEventMessageException.eventMessage());
    }

    private void assertTrackedEntry(MessageStream.Entry<EventMessage<?>> entry, EventMessage<?> eventMessage, long j) {
        Optional fromContext = TrackingToken.fromContext(entry);
        Assertions.assertTrue(fromContext.isPresent());
        OptionalLong position = ((TrackingToken) fromContext.get()).position();
        Assertions.assertTrue(position.isPresent());
        Assertions.assertEquals(globalSequenceOfEvent(j), position.getAsLong());
        assertEvent((EventMessage) entry.message(), eventMessage);
    }

    private void assertEvent(EventMessage<?> eventMessage, EventMessage<?> eventMessage2) {
        Assertions.assertEquals(eventMessage2.getPayload(), convertPayload(eventMessage).getPayload());
        Assertions.assertEquals(eventMessage2.getIdentifier(), eventMessage.getIdentifier());
        Assertions.assertEquals(eventMessage2.getTimestamp().toEpochMilli(), eventMessage.getTimestamp().toEpochMilli());
        Assertions.assertEquals(eventMessage2.getMetaData(), eventMessage.getMetaData());
    }

    protected abstract EventMessage<String> convertPayload(EventMessage<?> eventMessage);

    protected static TaggedEventMessage<?> taggedEventMessage(String str, Set<Tag> set) {
        return taggedEventMessage(str, set, MetaData.emptyInstance());
    }

    protected static TaggedEventMessage<?> taggedEventMessage(String str, Set<Tag> set, MetaData metaData) {
        return new GenericTaggedEventMessage(new GenericEventMessage(new MessageType("event"), str, metaData), set);
    }

    @Nonnull
    private Predicate<MessageStream.Entry<EventMessage<?>>> entryWithAggregateEvent(String str, int i) {
        return entry -> {
            return str.equals(convertPayload((EventMessage) entry.message()).getPayload()) && this.TEST_AGGREGATE_ID.equals(entry.getResource(LegacyResources.AGGREGATE_IDENTIFIER_KEY)) && ((Long) entry.getResource(LegacyResources.AGGREGATE_SEQUENCE_NUMBER_KEY)).longValue() == ((long) i) && validConsistencyMarker((ConsistencyMarker) entry.getResource(ConsistencyMarker.RESOURCE_KEY), this.TEST_AGGREGATE_ID, i) && TEST_AGGREGATE_TYPE.equals(entry.getResource(LegacyResources.AGGREGATE_TYPE_KEY));
        };
    }

    protected boolean validConsistencyMarker(ConsistencyMarker consistencyMarker, String str, int i) {
        return (consistencyMarker instanceof AggregateBasedConsistencyMarker) && ((AggregateBasedConsistencyMarker) consistencyMarker).positionOf(str) == ((long) i);
    }
}
