package org.axonframework.eventsourcing.eventstore;

import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalUnit;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionFactory;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.GenericEventMessage;
import org.axonframework.eventhandling.GlobalSequenceTrackingToken;
import org.axonframework.eventhandling.TerminalEventMessage;
import org.axonframework.eventhandling.TrackingToken;
import org.axonframework.eventsourcing.eventstore.EventStorageEngine;
import org.axonframework.eventstreaming.EventCriteria;
import org.axonframework.eventstreaming.StreamingCondition;
import org.axonframework.eventstreaming.Tag;
import org.axonframework.messaging.MessageStream;
import org.axonframework.messaging.MessageType;
import org.axonframework.messaging.MetaData;
import org.axonframework.utils.AssertUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import reactor.test.StepVerifier;

/* loaded from: input_file:org/axonframework/eventsourcing/eventstore/StorageEngineTestSuite.class */
public abstract class StorageEngineTestSuite<ESE extends EventStorageEngine> {
    protected String TEST_DOMAIN_ID;
    protected String OTHER_DOMAIN_ID;
    protected Set<Tag> TEST_CRITERIA_TAGS;
    protected EventCriteria TEST_CRITERIA;
    protected Set<Tag> OTHER_CRITERIA_TAGS;
    protected EventCriteria OTHER_CRITERIA;
    protected ESE testSubject;

    @Nested
    /* loaded from: input_file:org/axonframework/eventsourcing/eventstore/StorageEngineTestSuite$Peek.class */
    class Peek {
        Peek() {
        }

        @Test
        void returnsMarkerWhenNoEventsMatchCriteria() {
            MessageStream source = StorageEngineTestSuite.this.testSubject.source(SourcingCondition.conditionFor(StorageEngineTestSuite.this.TEST_CRITERIA));
            StorageEngineTestSuite.waitUntilHasNextAvailable(source);
            Optional peek = source.peek();
            Assertions.assertTrue(peek.isPresent());
            StorageEngineTestSuite.assertMarkerEntry((MessageStream.Entry) peek.get());
        }

        @Test
        void returnsFirstEventWithoutAdvancing() throws Exception {
            TaggedEventMessage<EventMessage<String>> taggedEventMessage = StorageEngineTestSuite.taggedEventMessage("event-1", StorageEngineTestSuite.this.TEST_CRITERIA_TAGS);
            StorageEngineTestSuite.this.testSubject.appendEvents(AppendCondition.none(), new TaggedEventMessage[]{taggedEventMessage, StorageEngineTestSuite.taggedEventMessage("event-2", StorageEngineTestSuite.this.TEST_CRITERIA_TAGS)}).thenCompose((v0) -> {
                return v0.commit();
            }).get(5L, TimeUnit.SECONDS);
            MessageStream source = StorageEngineTestSuite.this.testSubject.source(SourcingCondition.conditionFor(StorageEngineTestSuite.this.TEST_CRITERIA));
            StorageEngineTestSuite.waitUntilHasNextAvailable(source);
            Optional peek = source.peek();
            Optional peek2 = source.peek();
            Assertions.assertTrue(peek.isPresent());
            Assertions.assertTrue(peek2.isPresent());
            StorageEngineTestSuite.assertEvent(((MessageStream.Entry) peek.get()).message(), taggedEventMessage.event());
        }

        @Test
        void doesNotAdvanceStream() throws Exception {
            TaggedEventMessage<EventMessage<String>> taggedEventMessage = StorageEngineTestSuite.taggedEventMessage("event-1", StorageEngineTestSuite.this.TEST_CRITERIA_TAGS);
            StorageEngineTestSuite.this.testSubject.appendEvents(AppendCondition.none(), new TaggedEventMessage[]{taggedEventMessage}).thenCompose((v0) -> {
                return v0.commit();
            }).get(5L, TimeUnit.SECONDS);
            MessageStream source = StorageEngineTestSuite.this.testSubject.source(SourcingCondition.conditionFor(StorageEngineTestSuite.this.TEST_CRITERIA));
            StorageEngineTestSuite.waitUntilHasNextAvailable(source);
            Optional peek = source.peek();
            Optional next = source.next();
            Assertions.assertTrue(peek.isPresent());
            Assertions.assertTrue(next.isPresent());
            StorageEngineTestSuite.assertEvent(((MessageStream.Entry) peek.get()).message(), taggedEventMessage.event());
            StorageEngineTestSuite.assertEvent(((MessageStream.Entry) next.get()).message(), taggedEventMessage.event());
        }

        @Test
        void returnsMarkerWhenNoEvents() {
            MessageStream source = StorageEngineTestSuite.this.testSubject.source(SourcingCondition.conditionFor(StorageEngineTestSuite.this.TEST_CRITERIA));
            StorageEngineTestSuite.waitUntilHasNextAvailable(source);
            Optional peek = source.peek();
            Assertions.assertTrue(peek.isPresent());
            StorageEngineTestSuite.assertMarkerEntry((MessageStream.Entry) peek.get());
        }

        @Test
        void returnsEmptyAfterConsumingAll() throws Exception {
            StorageEngineTestSuite.this.testSubject.appendEvents(AppendCondition.none(), new TaggedEventMessage[]{StorageEngineTestSuite.taggedEventMessage("event-1", StorageEngineTestSuite.this.TEST_CRITERIA_TAGS)}).thenCompose((v0) -> {
                return v0.commit();
            }).get(5L, TimeUnit.SECONDS);
            MessageStream source = StorageEngineTestSuite.this.testSubject.source(SourcingCondition.conditionFor(StorageEngineTestSuite.this.TEST_CRITERIA));
            StorageEngineTestSuite.waitUntilHasNextAvailable(source);
            source.next();
            source.next();
            Assertions.assertTrue(source.peek().isEmpty());
        }
    }

    @BeforeEach
    void setUp() throws Exception {
        this.TEST_DOMAIN_ID = UUID.randomUUID().toString();
        this.OTHER_DOMAIN_ID = UUID.randomUUID().toString();
        this.TEST_CRITERIA_TAGS = Set.of(new Tag("TEST", this.TEST_DOMAIN_ID));
        this.TEST_CRITERIA = EventCriteria.havingTags(new Tag[]{new Tag("TEST", this.TEST_DOMAIN_ID)});
        this.OTHER_CRITERIA_TAGS = Set.of(new Tag("OTHER", this.OTHER_DOMAIN_ID));
        this.OTHER_CRITERIA = EventCriteria.havingTags(new Tag[]{new Tag("OTHER", this.OTHER_DOMAIN_ID)});
        this.testSubject = buildStorageEngine();
    }

    protected abstract ESE buildStorageEngine() throws Exception;

    @Test
    void sourcingEventsReturnsMatchingAggregateEvents() throws Exception {
        this.testSubject.appendEvents(AppendCondition.none(), new TaggedEventMessage[]{taggedEventMessage("event-0", this.TEST_CRITERIA_TAGS), taggedEventMessage("event-1", this.TEST_CRITERIA_TAGS), taggedEventMessage("event-2", this.OTHER_CRITERIA_TAGS), taggedEventMessage("event-3", this.OTHER_CRITERIA_TAGS), taggedEventMessage("event-4", this.OTHER_CRITERIA_TAGS), taggedEventMessage("event-5", this.TEST_CRITERIA_TAGS)}).thenCompose((v0) -> {
            return v0.commit();
        }).get(5L, TimeUnit.SECONDS);
        StepVerifier.create(this.testSubject.source(SourcingCondition.conditionFor(this.TEST_CRITERIA)).asFlux()).expectNextCount(3 + 1).verifyComplete();
    }

    @Test
    void sourcingEventsReturnsConsistencyMarkerWithNoEventMessageAsFinalEntryInTheMessageStream() throws Exception {
        this.testSubject.appendEvents(AppendCondition.none(), new TaggedEventMessage[]{taggedEventMessage("event-0", this.TEST_CRITERIA_TAGS), taggedEventMessage("event-1", this.TEST_CRITERIA_TAGS), taggedEventMessage("event-2", this.OTHER_CRITERIA_TAGS), taggedEventMessage("event-3", this.OTHER_CRITERIA_TAGS), taggedEventMessage("event-4", this.OTHER_CRITERIA_TAGS), taggedEventMessage("event-5", this.OTHER_CRITERIA_TAGS)}).thenCompose((v0) -> {
            return v0.commit();
        }).get(5L, TimeUnit.SECONDS);
        StepVerifier.create(this.testSubject.source(SourcingCondition.conditionFor(this.TEST_CRITERIA)).asFlux()).assertNext(entry -> {
            Assertions.assertNull(entry.getResource(ConsistencyMarker.RESOURCE_KEY));
        }).assertNext(entry2 -> {
            Assertions.assertNull(entry2.getResource(ConsistencyMarker.RESOURCE_KEY));
        }).assertNext(StorageEngineTestSuite::assertMarkerEntry).verifyComplete();
        StepVerifier.create(this.testSubject.source(SourcingCondition.conditionFor(this.OTHER_CRITERIA)).asFlux()).assertNext(entry3 -> {
            Assertions.assertNull(entry3.getResource(ConsistencyMarker.RESOURCE_KEY));
        }).assertNext(entry4 -> {
            Assertions.assertNull(entry4.getResource(ConsistencyMarker.RESOURCE_KEY));
        }).assertNext(entry5 -> {
            Assertions.assertNull(entry5.getResource(ConsistencyMarker.RESOURCE_KEY));
        }).assertNext(entry6 -> {
            Assertions.assertNull(entry6.getResource(ConsistencyMarker.RESOURCE_KEY));
        }).assertNext(StorageEngineTestSuite::assertMarkerEntry).verifyComplete();
    }

    @Test
    void usingConsistencyMarkerFromSourcingEventToAppendAfterAppendWithConditionIsNotAllowed() throws Exception {
        this.testSubject.appendEvents(AppendCondition.none(), new TaggedEventMessage[]{taggedEventMessage("event-0", this.TEST_CRITERIA_TAGS), taggedEventMessage("event-1", this.TEST_CRITERIA_TAGS), taggedEventMessage("event-2", this.OTHER_CRITERIA_TAGS), taggedEventMessage("event-3", this.OTHER_CRITERIA_TAGS), taggedEventMessage("event-4", this.OTHER_CRITERIA_TAGS), taggedEventMessage("event-5", this.OTHER_CRITERIA_TAGS)}).thenCompose((v0) -> {
            return v0.commit();
        }).get(5L, TimeUnit.SECONDS);
        ConsistencyMarker consistencyMarker = (ConsistencyMarker) this.testSubject.source(SourcingCondition.conditionFor(this.OTHER_CRITERIA)).asFlux().collectList().map((v0) -> {
            return v0.getLast();
        }).map(entry -> {
            return (ConsistencyMarker) entry.getResource(ConsistencyMarker.RESOURCE_KEY);
        }).block();
        this.testSubject.appendEvents(AppendCondition.none(), new TaggedEventMessage[]{taggedEventMessage("event-6", this.OTHER_CRITERIA_TAGS)}).thenCompose((v0) -> {
            return v0.commit();
        }).get(5L, TimeUnit.SECONDS);
        CompletableFuture thenCompose = this.testSubject.appendEvents(AppendCondition.withCriteria(this.OTHER_CRITERIA).withMarker(consistencyMarker), new TaggedEventMessage[]{taggedEventMessage("event-7", this.OTHER_CRITERIA_TAGS)}).thenCompose((v0) -> {
            return v0.commit();
        });
        ConditionFactory atMost = Awaitility.await("Await commit").pollDelay(Duration.ofMillis(50L)).atMost(Duration.ofSeconds(5L));
        Objects.requireNonNull(thenCompose);
        atMost.untilAsserted(thenCompose::isDone);
        Assertions.assertTrue(thenCompose.isCompletedExceptionally());
        Assertions.assertInstanceOf(AppendEventsTransactionRejectedException.class, thenCompose.exceptionNow());
    }

    @Test
    void sourcingEventsReturnsConsistencyMarkerAsSoleMessageWhenNoEventsInTheStoreForFlux() {
        StepVerifier.create(this.testSubject.source(SourcingCondition.conditionFor(this.TEST_CRITERIA)).asFlux()).assertNext(StorageEngineTestSuite::assertMarkerEntry).verifyComplete();
    }

    @Test
    void sourcingEventsReturnsConsistencyMarkerAsSoleMessageAndCompletesWhenNoEventsInTheStore() {
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        MessageStream whenComplete = this.testSubject.source(SourcingCondition.conditionFor(this.TEST_CRITERIA)).whenComplete(() -> {
            atomicBoolean.set(true);
        });
        ConditionFactory atMost = Awaitility.await("Await first entry availability").pollDelay(Duration.ofMillis(50L)).atMost(Duration.ofMillis(500L));
        Objects.requireNonNull(whenComplete);
        atMost.until(whenComplete::hasNextAvailable);
        Optional next = whenComplete.next();
        Assertions.assertTrue(next.isPresent());
        assertMarkerEntry((MessageStream.Entry) next.get());
        Awaitility.await("Await end of stream").pollDelay(Duration.ofMillis(50L)).atMost(Duration.ofMillis(500L)).until(() -> {
            return Boolean.valueOf(!whenComplete.hasNextAvailable());
        });
        Awaitility.await("Awaiting until sourcing completes").pollDelay(Duration.ofMillis(50L)).atMost(Duration.ofMillis(500L)).untilTrue(atomicBoolean);
    }

    private static void assertMarkerEntry(MessageStream.Entry<EventMessage<?>> entry) {
        Assertions.assertNotNull(entry.getResource(ConsistencyMarker.RESOURCE_KEY));
        Assertions.assertEquals(TerminalEventMessage.INSTANCE, entry.message());
    }

    @Test
    void transactionRejectedWithConflictingEventsInStore() throws Exception {
        this.testSubject.appendEvents(AppendCondition.none(), new TaggedEventMessage[]{taggedEventMessage("event-0", this.TEST_CRITERIA_TAGS), taggedEventMessage("event-1", this.TEST_CRITERIA_TAGS)}).thenApply((v0) -> {
            return v0.commit();
        }).get(5L, TimeUnit.SECONDS);
        CompletableFuture thenCompose = this.testSubject.appendEvents(AppendCondition.withCriteria(this.TEST_CRITERIA), new TaggedEventMessage[]{taggedEventMessage("event-2", this.TEST_CRITERIA_TAGS)}).thenCompose((v0) -> {
            return v0.commit();
        });
        Assertions.assertInstanceOf(AppendEventsTransactionRejectedException.class, ((ExecutionException) Assertions.assertThrows(ExecutionException.class, () -> {
            thenCompose.get(1L, TimeUnit.SECONDS);
        })).getCause());
    }

    @Test
    void transactionRejectedWhenConcurrentlyCreatedTransactionIsCommittedFirst() {
        AppendCondition withCriteria = AppendCondition.withCriteria(this.TEST_CRITERIA);
        CompletableFuture appendEvents = this.testSubject.appendEvents(withCriteria, new TaggedEventMessage[]{taggedEventMessage("event-0", this.TEST_CRITERIA_TAGS)});
        CompletableFuture appendEvents2 = this.testSubject.appendEvents(withCriteria, new TaggedEventMessage[]{taggedEventMessage("event-1", this.TEST_CRITERIA_TAGS)});
        CompletableFuture thenCompose = appendEvents.thenCompose((v0) -> {
            return v0.commit();
        });
        Assertions.assertDoesNotThrow(() -> {
            return (ConsistencyMarker) thenCompose.get(1L, TimeUnit.SECONDS);
        });
        CompletableFuture thenCompose2 = appendEvents2.thenCompose((v0) -> {
            return v0.commit();
        });
        Assertions.assertInstanceOf(AppendEventsTransactionRejectedException.class, ((ExecutionException) Assertions.assertThrows(ExecutionException.class, () -> {
            thenCompose2.get(1L, TimeUnit.SECONDS);
        })).getCause());
    }

    @Test
    void concurrentTransactionsForNonOverlappingTagsBothCommitWithExpectedConsistencyMarkerResponse() throws Exception {
        Set of = Set.of(new GlobalIndexConsistencyMarker(1L), new GlobalIndexConsistencyMarker(2L));
        DefaultAppendCondition defaultAppendCondition = new DefaultAppendCondition(ConsistencyMarker.ORIGIN, this.TEST_CRITERIA);
        DefaultAppendCondition defaultAppendCondition2 = new DefaultAppendCondition(ConsistencyMarker.ORIGIN, this.OTHER_CRITERIA);
        EventStorageEngine.AppendTransaction appendTransaction = (EventStorageEngine.AppendTransaction) this.testSubject.appendEvents(defaultAppendCondition, new TaggedEventMessage[]{taggedEventMessage("event-0", this.TEST_CRITERIA_TAGS)}).get(1L, TimeUnit.SECONDS);
        EventStorageEngine.AppendTransaction appendTransaction2 = (EventStorageEngine.AppendTransaction) this.testSubject.appendEvents(defaultAppendCondition2, new TaggedEventMessage[]{taggedEventMessage("event-0", this.OTHER_CRITERIA_TAGS)}).get(1L, TimeUnit.SECONDS);
        CompletableFuture commit = appendTransaction.commit();
        CompletableFuture commit2 = appendTransaction2.commit();
        Assertions.assertDoesNotThrow(() -> {
            return (ConsistencyMarker) commit.get(5L, TimeUnit.SECONDS);
        });
        Assertions.assertDoesNotThrow(() -> {
            return (ConsistencyMarker) commit2.get(5L, TimeUnit.SECONDS);
        });
        ConsistencyMarker consistencyMarker = (ConsistencyMarker) commit.get(50L, TimeUnit.MILLISECONDS);
        ConsistencyMarker consistencyMarker2 = (ConsistencyMarker) commit2.get(50L, TimeUnit.MILLISECONDS);
        Assertions.assertNotNull(consistencyMarker);
        Assertions.assertNotNull(consistencyMarker2);
        Assertions.assertEquals(of, Set.of(consistencyMarker, consistencyMarker2));
    }

    @Test
    void concurrentTransactionsForOverlappingTagsThrowAnAppendEventsTransactionRejectedException() throws Exception {
        int i = 0;
        DefaultAppendCondition defaultAppendCondition = new DefaultAppendCondition(ConsistencyMarker.ORIGIN, this.TEST_CRITERIA);
        DefaultAppendCondition defaultAppendCondition2 = new DefaultAppendCondition(ConsistencyMarker.ORIGIN, this.TEST_CRITERIA);
        EventStorageEngine.AppendTransaction appendTransaction = (EventStorageEngine.AppendTransaction) this.testSubject.appendEvents(defaultAppendCondition, new TaggedEventMessage[]{taggedEventMessage("event-0", this.TEST_CRITERIA_TAGS)}).get(1L, TimeUnit.SECONDS);
        EventStorageEngine.AppendTransaction appendTransaction2 = (EventStorageEngine.AppendTransaction) this.testSubject.appendEvents(defaultAppendCondition2, new TaggedEventMessage[]{taggedEventMessage("event-1", this.TEST_CRITERIA_TAGS)}).get(1L, TimeUnit.SECONDS);
        CompletableFuture commit = appendTransaction.commit();
        CompletableFuture commit2 = appendTransaction2.commit();
        try {
            commit.get(5L, TimeUnit.SECONDS);
        } catch (Exception e) {
            i = 0 + 1;
            Assertions.assertInstanceOf(AppendEventsTransactionRejectedException.class, e.getCause(), () -> {
                return "Exception [" + String.valueOf(e.getClass()) + "] is not expected. Message:" + e.getMessage();
            });
        }
        try {
            commit2.get(5L, TimeUnit.SECONDS);
        } catch (Exception e2) {
            i++;
            Assertions.assertInstanceOf(AppendEventsTransactionRejectedException.class, e2.getCause(), () -> {
                return "Exception [" + String.valueOf(e2.getClass()) + "] is not expected. Message:" + e2.getMessage();
            });
        }
        Assertions.assertEquals(1, i);
    }

    @Test
    void streamingFromStartReturnsSelectedMessages() throws Exception {
        TaggedEventMessage<EventMessage<String>> taggedEventMessage = taggedEventMessage("event-0", this.TEST_CRITERIA_TAGS);
        TaggedEventMessage<EventMessage<String>> taggedEventMessage2 = taggedEventMessage("event-1", this.TEST_CRITERIA_TAGS);
        TaggedEventMessage<EventMessage<String>> taggedEventMessage3 = taggedEventMessage("event-4", this.TEST_CRITERIA_TAGS);
        this.testSubject.appendEvents(AppendCondition.none(), new TaggedEventMessage[]{taggedEventMessage, taggedEventMessage2, taggedEventMessage("event-2", this.OTHER_CRITERIA_TAGS), taggedEventMessage("event-3", this.OTHER_CRITERIA_TAGS), taggedEventMessage3, taggedEventMessage("event-5", this.OTHER_CRITERIA_TAGS), taggedEventMessage("event-6", this.OTHER_CRITERIA_TAGS)}).thenCompose((v0) -> {
            return v0.commit();
        }).get(5L, TimeUnit.SECONDS);
        CompletableFuture thenApply = this.testSubject.firstToken().thenApply(trackingToken -> {
            return StreamingCondition.conditionFor(trackingToken, this.TEST_CRITERIA);
        });
        ESE ese = this.testSubject;
        Objects.requireNonNull(ese);
        StepVerifier.create(((MessageStream) thenApply.thenApply(ese::stream).get(5L, TimeUnit.SECONDS)).asFlux()).assertNext(entry -> {
            assertEvent(entry.message(), taggedEventMessage.event());
        }).assertNext(entry2 -> {
            assertEvent(entry2.message(), taggedEventMessage2.event());
        }).assertNext(entry3 -> {
            assertEvent(entry3.message(), taggedEventMessage3.event());
        }).thenCancel().verify();
    }

    @Test
    void streamingFromSpecificPositionReturnsSelectedMessages() throws Exception {
        TaggedEventMessage<EventMessage<String>> taggedEventMessage = taggedEventMessage("event-1", this.TEST_CRITERIA_TAGS);
        TaggedEventMessage<EventMessage<String>> taggedEventMessage2 = taggedEventMessage("event-4", this.TEST_CRITERIA_TAGS);
        this.testSubject.appendEvents(AppendCondition.none(), new TaggedEventMessage[]{taggedEventMessage("event-0", this.TEST_CRITERIA_TAGS), taggedEventMessage, taggedEventMessage("event-2", this.OTHER_CRITERIA_TAGS), taggedEventMessage("event-3", this.OTHER_CRITERIA_TAGS), taggedEventMessage2, taggedEventMessage("event-5", this.OTHER_CRITERIA_TAGS), taggedEventMessage("event-6", this.OTHER_CRITERIA_TAGS)}).thenCompose((v0) -> {
            return v0.commit();
        }).get(5L, TimeUnit.SECONDS);
        CompletableFuture thenApply = this.testSubject.firstToken().thenApply(StreamingCondition::startingFrom);
        ESE ese = this.testSubject;
        Objects.requireNonNull(ese);
        StepVerifier.create(this.testSubject.stream(StreamingCondition.conditionFor((TrackingToken) thenApply.thenApply(ese::stream).thenApply((v0) -> {
            return v0.first();
        }).thenCompose((v0) -> {
            return v0.asCompletableFuture();
        }).thenApply(entry -> {
            return (TrackingToken) entry.getResource(TrackingToken.RESOURCE_KEY);
        }).get(5L, TimeUnit.SECONDS), this.TEST_CRITERIA)).asFlux()).assertNext(entry2 -> {
            assertEvent(entry2.message(), taggedEventMessage.event());
        }).assertNext(entry3 -> {
            assertEvent(entry3.message(), taggedEventMessage2.event());
        }).thenCancel().verify();
    }

    @Test
    void streamingAfterLastPositionReturnsEmptyStream() throws Exception {
        this.testSubject.appendEvents(AppendCondition.none(), new TaggedEventMessage[]{taggedEventMessage("event-0", this.TEST_CRITERIA_TAGS), taggedEventMessage("event-1", this.TEST_CRITERIA_TAGS), taggedEventMessage("event-2", this.TEST_CRITERIA_TAGS), taggedEventMessage("event-3", this.TEST_CRITERIA_TAGS), taggedEventMessage("event-4", this.TEST_CRITERIA_TAGS), taggedEventMessage("event-5", this.TEST_CRITERIA_TAGS), taggedEventMessage("event-6", this.TEST_CRITERIA_TAGS)}).thenCompose((v0) -> {
            return v0.commit();
        }).get(5L, TimeUnit.SECONDS);
        MessageStream stream = this.testSubject.stream(StreamingCondition.conditionFor(new GlobalSequenceTrackingToken(10L), this.TEST_CRITERIA));
        try {
            Assertions.assertTrue(stream.next().isEmpty());
        } finally {
            stream.close();
        }
    }

    @Test
    void eventsPublishedAreIncludedInOpenStreams() throws Exception {
        this.testSubject.appendEvents(AppendCondition.none(), new TaggedEventMessage[]{taggedEventMessage("event-0", this.TEST_CRITERIA_TAGS), taggedEventMessage("event-1", this.TEST_CRITERIA_TAGS)}).thenCompose((v0) -> {
            return v0.commit();
        }).get(5L, TimeUnit.SECONDS);
        CompletableFuture thenApply = this.testSubject.firstToken().thenApply(StreamingCondition::startingFrom);
        ESE ese = this.testSubject;
        Objects.requireNonNull(ese);
        MessageStream messageStream = (MessageStream) thenApply.thenApply(ese::stream).get(5L, TimeUnit.SECONDS);
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> {
            Assertions.assertTrue(messageStream.next().isPresent());
        });
        AssertUtils.assertWithin(1, TimeUnit.SECONDS, () -> {
            Assertions.assertTrue(messageStream.next().isPresent());
        });
        this.testSubject.appendEvents(AppendCondition.none(), new TaggedEventMessage[]{taggedEventMessage("event-3", this.TEST_CRITERIA_TAGS)}).thenCompose((v0) -> {
            return v0.commit();
        }).get(5L, TimeUnit.SECONDS);
        Awaitility.await("Await until stream contains newly appended events").atMost(Duration.ofSeconds(1L)).pollDelay(Duration.ofMillis(100L)).untilAsserted(() -> {
            Assertions.assertTrue(messageStream.hasNextAvailable());
        });
        Assertions.assertEquals("event-3", messageStream.next().map(entry -> {
            Object payload = entry.message().getPayload();
            if (payload instanceof String) {
                return (String) payload;
            }
            Object payload2 = entry.message().getPayload();
            if (payload2 instanceof byte[]) {
                return new String((byte[]) payload2, StandardCharsets.UTF_8);
            }
            throw new AssertionError("Unexpected payload type: " + String.valueOf(entry.message().getPayload().getClass()));
        }).orElse("none"));
    }

    @Test
    void tailTokenReturnsHeadTokenForEmptyStore() throws Exception {
        TrackingToken trackingToken = (TrackingToken) this.testSubject.firstToken().get(5L, TimeUnit.SECONDS);
        TrackingToken trackingToken2 = (TrackingToken) this.testSubject.latestToken().get(5L, TimeUnit.SECONDS);
        Assertions.assertTrue(trackingToken2.covers(trackingToken));
        Assertions.assertTrue(trackingToken.covers(trackingToken2));
    }

    @Test
    void tailTokenReturnsFirstAppendedEvent() throws Exception {
        TaggedEventMessage<EventMessage<String>> taggedEventMessage = taggedEventMessage("event-0", this.TEST_CRITERIA_TAGS);
        this.testSubject.appendEvents(AppendCondition.none(), new TaggedEventMessage[]{taggedEventMessage, taggedEventMessage("event-1", this.TEST_CRITERIA_TAGS)}).thenCompose((v0) -> {
            return v0.commit();
        }).get(5L, TimeUnit.SECONDS);
        CompletableFuture thenApply = this.testSubject.firstToken().thenApply(StreamingCondition::startingFrom);
        ESE ese = this.testSubject;
        Objects.requireNonNull(ese);
        assertEvent(((MessageStream.Entry) ((MessageStream) thenApply.thenApply(ese::stream).get(5L, TimeUnit.SECONDS)).first().asCompletableFuture().get(5L, TimeUnit.SECONDS)).message(), taggedEventMessage.event());
    }

    @Test
    void headTokenReturnsTokenBasedOnLastAppendedEvent() throws Exception {
        this.testSubject.appendEvents(AppendCondition.none(), new TaggedEventMessage[]{taggedEventMessage("event-0", this.TEST_CRITERIA_TAGS), taggedEventMessage("event-1", this.TEST_CRITERIA_TAGS)}).thenCompose((v0) -> {
            return v0.commit();
        }).get(5L, TimeUnit.SECONDS);
        CompletableFuture thenApply = this.testSubject.latestToken().thenApply(StreamingCondition::startingFrom);
        ESE ese = this.testSubject;
        Objects.requireNonNull(ese);
        MessageStream messageStream = (MessageStream) thenApply.thenApply(ese::stream).get(5L, TimeUnit.SECONDS);
        Awaitility.await("Await until the store has caught up").atLeast(Duration.ofMillis(50L)).atMost(Duration.ofMillis(500L)).pollDelay(Duration.ofMillis(100L)).untilAsserted(() -> {
            Assertions.assertFalse(messageStream.hasNextAvailable());
        });
    }

    @Test
    void tokenAtRetrievesTokenFromStorageEngineThatStreamsEventsSinceThatMoment() throws Exception {
        Instant now = Instant.now();
        this.testSubject.appendEvents(AppendCondition.none(), new TaggedEventMessage[]{taggedEventMessageAt("event-0", this.TEST_CRITERIA_TAGS, now.minusSeconds(10L)), taggedEventMessageAt("event-1", this.TEST_CRITERIA_TAGS, now), taggedEventMessageAt("event-2", this.TEST_CRITERIA_TAGS, now.plusSeconds(10L))}).thenCompose((v0) -> {
            return v0.commit();
        }).get(5L, TimeUnit.SECONDS);
        TrackingToken trackingToken = (TrackingToken) this.testSubject.tokenAt(now.minus(5L, (TemporalUnit) ChronoUnit.SECONDS)).get(5L, TimeUnit.SECONDS);
        Assertions.assertNotNull(trackingToken);
        StepVerifier.create(this.testSubject.stream(StreamingCondition.startingFrom(trackingToken)).asFlux()).expectNextCount(2L).thenCancel().verify();
    }

    @Test
    void tokenAtReturnsHeadTokenWhenThereAreNoEventsAfterTheGivenAt() throws Exception {
        this.testSubject.appendEvents(AppendCondition.none(), new TaggedEventMessage[]{taggedEventMessage("event-0", this.TEST_CRITERIA_TAGS), taggedEventMessage("event-1", this.TEST_CRITERIA_TAGS), taggedEventMessage("event-2", this.TEST_CRITERIA_TAGS)}).thenCompose((v0) -> {
            return v0.commit();
        }).get(5L, TimeUnit.SECONDS);
        TrackingToken trackingToken = (TrackingToken) this.testSubject.tokenAt(Instant.now().plus(1L, (TemporalUnit) ChronoUnit.DAYS)).get(5L, TimeUnit.SECONDS);
        TrackingToken trackingToken2 = (TrackingToken) this.testSubject.latestToken().get(5L, TimeUnit.SECONDS);
        Assertions.assertNotNull(trackingToken);
        Assertions.assertNotNull(trackingToken2);
        Assertions.assertEquals(trackingToken2, trackingToken);
    }

    private static TaggedEventMessage<EventMessage<String>> taggedEventMessage(String str, Set<Tag> set) {
        return taggedEventMessageAt(str, set, Instant.now());
    }

    private static TaggedEventMessage<EventMessage<String>> taggedEventMessageAt(String str, Set<Tag> set, Instant instant) {
        return new GenericTaggedEventMessage(new GenericEventMessage(UUID.randomUUID().toString(), new MessageType("event"), str, MetaData.emptyInstance(), instant), set);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void assertEvent(EventMessage<?> eventMessage, EventMessage<String> eventMessage2) {
        Object payload = eventMessage.getPayload();
        if (payload instanceof byte[]) {
            Assertions.assertEquals(eventMessage2.getPayload(), new String((byte[]) payload, StandardCharsets.UTF_8));
        } else {
            Object payload2 = eventMessage.getPayload();
            if (!(payload2 instanceof String)) {
                throw new AssertionError("Unexpected payload type: " + String.valueOf(eventMessage.getPayload().getClass()));
            }
            Assertions.assertEquals(eventMessage2.getPayload(), (String) payload2);
        }
        Assertions.assertEquals(eventMessage2.getIdentifier(), eventMessage.getIdentifier());
        Assertions.assertEquals(eventMessage2.getTimestamp().toEpochMilli(), eventMessage.getTimestamp().toEpochMilli());
        Assertions.assertEquals(eventMessage2.getMetaData(), eventMessage.getMetaData());
    }

    private static void waitUntilHasNextAvailable(MessageStream<EventMessage<?>> messageStream) {
        ConditionFactory pollInterval = Awaitility.await("Await event availability in stream").atMost(Duration.ofSeconds(2L)).pollInterval(Duration.ofMillis(100L));
        Objects.requireNonNull(messageStream);
        pollInterval.until(messageStream::hasNextAvailable);
    }
}
