package org.axonframework.eventsourcing.eventstore.inmemory;

import jakarta.annotation.Nonnull;
import java.lang.invoke.MethodHandles;
import java.time.Instant;
import java.util.Comparator;
import java.util.List;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import org.axonframework.common.infra.ComponentDescriptor;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.GlobalSequenceTrackingToken;
import org.axonframework.eventhandling.TerminalEventMessage;
import org.axonframework.eventhandling.TrackingToken;
import org.axonframework.eventsourcing.eventstore.AppendCondition;
import org.axonframework.eventsourcing.eventstore.AppendEventsTransactionRejectedException;
import org.axonframework.eventsourcing.eventstore.ConsistencyMarker;
import org.axonframework.eventsourcing.eventstore.EventStorageEngine;
import org.axonframework.eventsourcing.eventstore.GlobalIndexConsistencyMarker;
import org.axonframework.eventsourcing.eventstore.SourcingCondition;
import org.axonframework.eventsourcing.eventstore.TaggedEventMessage;
import org.axonframework.eventstreaming.EventsCondition;
import org.axonframework.eventstreaming.StreamingCondition;
import org.axonframework.messaging.Context;
import org.axonframework.messaging.MessageStream;
import org.axonframework.messaging.SimpleEntry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/axonframework/eventsourcing/eventstore/inmemory/InMemoryEventStorageEngine.class */
public class InMemoryEventStorageEngine implements EventStorageEngine {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private static final boolean WITH_MARKER = true;
    private static final boolean WITHOUT_MARKER = false;
    private final NavigableMap<Long, TaggedEventMessage<? extends EventMessage<?>>> eventStorage;
    private final long offset;
    private final ReentrantLock appendLock;
    private final Set<MapBackedMessageStream> openStreams;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/axonframework/eventsourcing/eventstore/inmemory/InMemoryEventStorageEngine$MapBackedMessageStream.class */
    public abstract class MapBackedMessageStream implements MessageStream<EventMessage<?>> {
        private final AtomicLong position;
        protected final long end;
        private final EventsCondition condition;
        private final AtomicReference<Runnable> callback = new AtomicReference<>(() -> {
        });

        private MapBackedMessageStream(long j, long j2, EventsCondition eventsCondition) {
            this.position = new AtomicLong(j);
            this.end = j2;
            this.condition = eventsCondition;
        }

        public Optional<MessageStream.Entry<EventMessage<?>>> next() {
            long j = this.position.get();
            while (true) {
                long j2 = j;
                if (j2 > this.end || !InMemoryEventStorageEngine.this.eventStorage.containsKey(Long.valueOf(j2)) || !this.position.compareAndSet(j2, j2 + 1)) {
                    break;
                }
                TaggedEventMessage taggedEventMessage = (TaggedEventMessage) InMemoryEventStorageEngine.this.eventStorage.get(Long.valueOf(j2));
                if (InMemoryEventStorageEngine.match(taggedEventMessage, this.condition)) {
                    return Optional.of(new SimpleEntry(taggedEventMessage.event(), TrackingToken.addToContext(Context.empty(), new GlobalSequenceTrackingToken(j2 + 1))));
                }
                j = this.position.get();
            }
            return lastEntry();
        }

        abstract Optional<MessageStream.Entry<EventMessage<?>>> lastEntry();

        public void onAvailable(@Nonnull Runnable runnable) {
            this.callback.set(runnable);
            if (InMemoryEventStorageEngine.this.eventStorage.isEmpty() || InMemoryEventStorageEngine.this.eventStorage.containsKey(Long.valueOf(this.position.get()))) {
                runnable.run();
            }
        }

        public Optional<Throwable> error() {
            return Optional.empty();
        }

        public boolean isCompleted() {
            return this.position.get() > this.end;
        }

        public boolean hasNextAvailable() {
            long j = this.position.get();
            return j <= this.end && InMemoryEventStorageEngine.this.eventStorage.containsKey(Long.valueOf(j));
        }

        public void close() {
            this.position.set(this.end + 1);
        }

        Runnable callback() {
            return this.callback.get();
        }
    }

    /* loaded from: input_file:org/axonframework/eventsourcing/eventstore/inmemory/InMemoryEventStorageEngine$MapBackedSourcingEventMessageStream.class */
    private class MapBackedSourcingEventMessageStream extends MapBackedMessageStream {
        private final AtomicBoolean sharedLastEntry;

        private MapBackedSourcingEventMessageStream(InMemoryEventStorageEngine inMemoryEventStorageEngine, long j, long j2, EventsCondition eventsCondition) {
            super(j, j2, eventsCondition);
            this.sharedLastEntry = new AtomicBoolean(false);
        }

        @Override // org.axonframework.eventsourcing.eventstore.inmemory.InMemoryEventStorageEngine.MapBackedMessageStream
        Optional<MessageStream.Entry<EventMessage<?>>> lastEntry() {
            if (!this.sharedLastEntry.compareAndSet(false, true)) {
                return Optional.empty();
            }
            return Optional.of(new SimpleEntry(TerminalEventMessage.INSTANCE, Context.with(ConsistencyMarker.RESOURCE_KEY, new GlobalIndexConsistencyMarker(this.end))));
        }

        @Override // org.axonframework.eventsourcing.eventstore.inmemory.InMemoryEventStorageEngine.MapBackedMessageStream
        public boolean isCompleted() {
            return super.isCompleted() && this.sharedLastEntry.get();
        }

        @Override // org.axonframework.eventsourcing.eventstore.inmemory.InMemoryEventStorageEngine.MapBackedMessageStream
        public boolean hasNextAvailable() {
            return super.hasNextAvailable() || !this.sharedLastEntry.get();
        }
    }

    /* loaded from: input_file:org/axonframework/eventsourcing/eventstore/inmemory/InMemoryEventStorageEngine$MapBackedStreamingEventMessageStream.class */
    private class MapBackedStreamingEventMessageStream extends MapBackedMessageStream {
        private MapBackedStreamingEventMessageStream(InMemoryEventStorageEngine inMemoryEventStorageEngine, long j, EventsCondition eventsCondition) {
            super(j, Long.MAX_VALUE, eventsCondition);
        }

        @Override // org.axonframework.eventsourcing.eventstore.inmemory.InMemoryEventStorageEngine.MapBackedMessageStream
        Optional<MessageStream.Entry<EventMessage<?>>> lastEntry() {
            return Optional.empty();
        }
    }

    public InMemoryEventStorageEngine() {
        this(0L);
    }

    public InMemoryEventStorageEngine(long j) {
        this.eventStorage = new ConcurrentSkipListMap();
        this.appendLock = new ReentrantLock();
        this.openStreams = new CopyOnWriteArraySet();
        this.offset = j;
    }

    @Override // org.axonframework.eventsourcing.eventstore.EventStorageEngine
    public CompletableFuture<EventStorageEngine.AppendTransaction> appendEvents(@Nonnull final AppendCondition appendCondition, @Nonnull final List<TaggedEventMessage<?>> list) {
        return containsConflicts(appendCondition) ? CompletableFuture.failedFuture(AppendEventsTransactionRejectedException.conflictingEventsDetected(appendCondition.consistencyMarker())) : CompletableFuture.completedFuture(new EventStorageEngine.AppendTransaction() { // from class: org.axonframework.eventsourcing.eventstore.inmemory.InMemoryEventStorageEngine.1
            private final AtomicBoolean finished = new AtomicBoolean(false);

            @Override // org.axonframework.eventsourcing.eventstore.EventStorageEngine.AppendTransaction
            public CompletableFuture<ConsistencyMarker> commit() {
                if (this.finished.getAndSet(true)) {
                    return CompletableFuture.failedFuture(new IllegalStateException("Already committed or rolled back"));
                }
                InMemoryEventStorageEngine.this.appendLock.lock();
                try {
                    if (InMemoryEventStorageEngine.this.containsConflicts(appendCondition)) {
                        return CompletableFuture.failedFuture(AppendEventsTransactionRejectedException.conflictingEventsDetected(appendCondition.consistencyMarker()));
                    }
                    Optional reduce = list.stream().map(taggedEventMessage -> {
                        long nextIndex = InMemoryEventStorageEngine.this.nextIndex();
                        long j = nextIndex + 1;
                        InMemoryEventStorageEngine.this.eventStorage.put(Long.valueOf(nextIndex), taggedEventMessage);
                        if (InMemoryEventStorageEngine.logger.isDebugEnabled()) {
                            InMemoryEventStorageEngine.logger.debug("Appended event [{}] with position [{}] and timestamp [{}].", new Object[]{taggedEventMessage.event().getIdentifier(), Long.valueOf(nextIndex), taggedEventMessage.event().getTimestamp()});
                        }
                        return new GlobalIndexConsistencyMarker(j);
                    }).reduce((v0, v1) -> {
                        return v0.upperBound(v1);
                    });
                    InMemoryEventStorageEngine.this.openStreams.forEach(mapBackedMessageStream -> {
                        mapBackedMessageStream.callback().run();
                    });
                    return CompletableFuture.completedFuture((ConsistencyMarker) reduce.orElse(ConsistencyMarker.ORIGIN));
                } finally {
                    InMemoryEventStorageEngine.this.appendLock.unlock();
                }
            }

            @Override // org.axonframework.eventsourcing.eventstore.EventStorageEngine.AppendTransaction
            public void rollback() {
                this.finished.set(true);
            }
        });
    }

    private long nextIndex() {
        if (this.eventStorage.isEmpty()) {
            return 0L;
        }
        return this.eventStorage.lastKey().longValue() + 1;
    }

    private boolean containsConflicts(AppendCondition appendCondition) {
        if (Objects.equals(appendCondition.consistencyMarker(), ConsistencyMarker.INFINITY)) {
            return false;
        }
        return this.eventStorage.tailMap(Long.valueOf(GlobalIndexConsistencyMarker.position(appendCondition.consistencyMarker()) + 1)).values().stream().map(taggedEventMessage -> {
            return taggedEventMessage;
        }).anyMatch(taggedEventMessage2 -> {
            return appendCondition.matches(taggedEventMessage2.event().type().qualifiedName(), taggedEventMessage2.tags());
        });
    }

    @Override // org.axonframework.eventsourcing.eventstore.EventStorageEngine
    public MessageStream<EventMessage<?>> source(@Nonnull SourcingCondition sourcingCondition) {
        if (logger.isDebugEnabled()) {
            logger.debug("Start sourcing events with condition [{}].", sourcingCondition);
        }
        MapBackedSourcingEventMessageStream mapBackedSourcingEventMessageStream = new MapBackedSourcingEventMessageStream(this, sourcingCondition.start(), this.eventStorage.isEmpty() ? -1L : this.eventStorage.lastKey().longValue(), sourcingCondition);
        this.openStreams.add(mapBackedSourcingEventMessageStream);
        return mapBackedSourcingEventMessageStream;
    }

    @Override // org.axonframework.eventsourcing.eventstore.EventStorageEngine
    public MessageStream<EventMessage<?>> stream(@Nonnull StreamingCondition streamingCondition) {
        if (logger.isDebugEnabled()) {
            logger.debug("Start streaming events with condition [{}].", streamingCondition);
        }
        MapBackedStreamingEventMessageStream mapBackedStreamingEventMessageStream = new MapBackedStreamingEventMessageStream(this, streamingCondition.position().position().orElse(-1L), streamingCondition);
        this.openStreams.add(mapBackedStreamingEventMessageStream);
        return mapBackedStreamingEventMessageStream;
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [org.axonframework.eventhandling.EventMessage] */
    private static boolean match(TaggedEventMessage<?> taggedEventMessage, EventsCondition eventsCondition) {
        return eventsCondition.matches(taggedEventMessage.event().type().qualifiedName(), taggedEventMessage.tags());
    }

    @Override // org.axonframework.eventsourcing.eventstore.EventStorageEngine
    public CompletableFuture<TrackingToken> tailToken() {
        if (logger.isDebugEnabled()) {
            logger.debug("Operation tailToken() is invoked.");
        }
        return CompletableFuture.completedFuture(this.eventStorage.isEmpty() ? new GlobalSequenceTrackingToken(this.offset - 1) : new GlobalSequenceTrackingToken(this.eventStorage.firstKey().longValue()));
    }

    @Override // org.axonframework.eventsourcing.eventstore.EventStorageEngine
    public CompletableFuture<TrackingToken> headToken() {
        if (logger.isDebugEnabled()) {
            logger.debug("Operation headToken() is invoked.");
        }
        return CompletableFuture.completedFuture(this.eventStorage.isEmpty() ? new GlobalSequenceTrackingToken(this.offset - 1) : new GlobalSequenceTrackingToken(this.eventStorage.lastKey().longValue() + 1));
    }

    @Override // org.axonframework.eventsourcing.eventstore.EventStorageEngine
    public CompletableFuture<TrackingToken> tokenAt(@Nonnull Instant instant) {
        if (logger.isDebugEnabled()) {
            logger.debug("Operation tokenAt() is invoked with Instant [{}].", instant);
        }
        return (CompletableFuture) this.eventStorage.entrySet().stream().filter(entry -> {
            Instant timestamp = ((TaggedEventMessage) entry.getValue()).event().getTimestamp();
            return timestamp.equals(instant) || timestamp.isAfter(instant);
        }).map((v0) -> {
            return v0.getKey();
        }).min(Comparator.comparingLong((v0) -> {
            return v0.longValue();
        })).map(l -> {
            return Long.valueOf(l.longValue() - 1);
        }).map((v1) -> {
            return new GlobalSequenceTrackingToken(v1);
        }).map(globalSequenceTrackingToken -> {
            return globalSequenceTrackingToken;
        }).map((v0) -> {
            return CompletableFuture.completedFuture(v0);
        }).orElseGet(this::headToken);
    }

    public void describeTo(@Nonnull ComponentDescriptor componentDescriptor) {
        componentDescriptor.describeProperty("offset", Long.valueOf(this.offset));
    }
}
