package org.axonframework.eventsourcing.eventstore;

import jakarta.annotation.Nonnull;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.common.AxonThreadFactory;
import org.axonframework.common.BuilderUtils;
import org.axonframework.common.io.IOUtils;
import org.axonframework.configuration.LifecycleRegistry;
import org.axonframework.eventhandling.AbstractEventBus;
import org.axonframework.eventhandling.EventBusSpanFactory;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.TrackedEventMessage;
import org.axonframework.eventhandling.TrackingEventStream;
import org.axonframework.eventhandling.TrackingToken;
import org.axonframework.eventsourcing.eventstore.AbstractLegacyEventStore;
import org.axonframework.lifecycle.Lifecycle;
import org.axonframework.monitoring.MessageMonitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Deprecated(since = "5.0.0")
/* loaded from: input_file:org/axonframework/eventsourcing/eventstore/LegacyEmbeddedEventStore.class */
public class LegacyEmbeddedEventStore extends AbstractLegacyEventStore implements Lifecycle {
    private static final Logger logger = LoggerFactory.getLogger(LegacyEmbeddedEventStore.class);
    private static final ThreadGroup THREAD_GROUP = new ThreadGroup(LegacyEmbeddedEventStore.class.getSimpleName());
    private static final String OPTIMIZE_EVENT_CONSUMPTION_SYSTEM_PROPERTY = "optimize-event-consumption";
    private final Lock consumerLock;
    private final Condition consumableEventsCondition;
    private final Set<EventConsumer> tailingConsumers;
    private final EventProducer producer;
    private final long cleanupDelayMillis;
    private final ThreadFactory threadFactory;
    private final boolean optimizeEventConsumption;
    private final ScheduledExecutorService cleanupService;
    private final AtomicBoolean producerStarted;
    private volatile Node oldest;

    /* loaded from: input_file:org/axonframework/eventsourcing/eventstore/LegacyEmbeddedEventStore$Builder.class */
    public static class Builder extends AbstractLegacyEventStore.Builder {
        private int cachedEvents = 10000;
        private long fetchDelay = 1000;
        private long cleanupDelay = 10000;
        private TimeUnit timeUnit = TimeUnit.MILLISECONDS;
        private ThreadFactory threadFactory = new AxonThreadFactory(LegacyEmbeddedEventStore.THREAD_GROUP);
        private boolean optimizeEventConsumption = fetchEventConsumptionSystemPropertyOrDefault();

        private static boolean fetchEventConsumptionSystemPropertyOrDefault() {
            String property = System.getProperty(LegacyEmbeddedEventStore.OPTIMIZE_EVENT_CONSUMPTION_SYSTEM_PROPERTY);
            return property == null || Boolean.TRUE.toString().equalsIgnoreCase(property);
        }

        @Override // org.axonframework.eventsourcing.eventstore.AbstractLegacyEventStore.Builder
        public Builder storageEngine(LegacyEventStorageEngine legacyEventStorageEngine) {
            super.storageEngine(legacyEventStorageEngine);
            return this;
        }

        @Override // org.axonframework.eventsourcing.eventstore.AbstractLegacyEventStore.Builder
        public Builder messageMonitor(@Nonnull MessageMonitor<? super EventMessage<?>> messageMonitor) {
            super.messageMonitor(messageMonitor);
            return this;
        }

        /* renamed from: spanFactory, reason: merged with bridge method [inline-methods] */
        public Builder m33spanFactory(@Nonnull EventBusSpanFactory eventBusSpanFactory) {
            super.spanFactory(eventBusSpanFactory);
            return this;
        }

        public Builder cachedEvents(int i) {
            BuilderUtils.assertPositive(i, "The cached events count should be a positive number");
            this.cachedEvents = i;
            return this;
        }

        public Builder fetchDelay(long j) {
            BuilderUtils.assertPositive(j, "The fetch delay should be a positive number");
            this.fetchDelay = j;
            return this;
        }

        public Builder cleanupDelay(long j) {
            BuilderUtils.assertPositive(j, "The clean-up delay should be a positive number");
            this.cleanupDelay = j;
            return this;
        }

        public Builder timeUnit(TimeUnit timeUnit) {
            BuilderUtils.assertNonNull(timeUnit, "TimeUnit may not be null");
            this.timeUnit = timeUnit;
            return this;
        }

        public Builder threadFactory(ThreadFactory threadFactory) {
            BuilderUtils.assertNonNull(threadFactory, "ThreadFactory may not be null");
            this.threadFactory = threadFactory;
            return this;
        }

        public Builder optimizeEventConsumption(boolean z) {
            this.optimizeEventConsumption = z;
            return this;
        }

        public LegacyEmbeddedEventStore build() {
            return new LegacyEmbeddedEventStore(this);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.axonframework.eventsourcing.eventstore.AbstractLegacyEventStore.Builder
        public void validate() throws AxonConfigurationException {
            super.validate();
        }

        @Override // org.axonframework.eventsourcing.eventstore.AbstractLegacyEventStore.Builder
        public /* bridge */ /* synthetic */ AbstractLegacyEventStore.Builder messageMonitor(@Nonnull MessageMonitor messageMonitor) {
            return messageMonitor((MessageMonitor<? super EventMessage<?>>) messageMonitor);
        }

        @Override // org.axonframework.eventsourcing.eventstore.AbstractLegacyEventStore.Builder
        /* renamed from: messageMonitor */
        public /* bridge */ /* synthetic */ AbstractEventBus.Builder mo24messageMonitor(@Nonnull MessageMonitor messageMonitor) {
            return messageMonitor((MessageMonitor<? super EventMessage<?>>) messageMonitor);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/axonframework/eventsourcing/eventstore/LegacyEmbeddedEventStore$Cleaner.class */
    public class Cleaner implements Runnable {
        private Cleaner() {
        }

        @Override // java.lang.Runnable
        public void run() {
            Node node = LegacyEmbeddedEventStore.this.oldest;
            if (node == null || node.previousToken == null) {
                return;
            }
            LegacyEmbeddedEventStore.this.tailingConsumers.stream().filter((v0) -> {
                return v0.behindGlobalCache();
            }).forEach(eventConsumer -> {
                LegacyEmbeddedEventStore.logger.debug("An event stream cannot read from the local cache. It either runs behind, or its current token cannot be found in the cache. Opening a dedicated stream.");
                eventConsumer.stopTailingGlobalStream();
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/axonframework/eventsourcing/eventstore/LegacyEmbeddedEventStore$EventConsumer.class */
    public class EventConsumer implements TrackingEventStream {
        private Stream<? extends TrackedEventMessage<?>> privateStream;
        private Iterator<? extends TrackedEventMessage<?>> privateIterator;
        private volatile TrackingToken lastToken;
        private volatile Node lastNode;
        private TrackedEventMessage<?> peekedEvent;

        private EventConsumer(LegacyEmbeddedEventStore legacyEmbeddedEventStore, Node node) {
            this(node.event.trackingToken());
            this.lastNode = node;
        }

        private EventConsumer(TrackingToken trackingToken) {
            this.lastToken = trackingToken;
        }

        public Optional<TrackedEventMessage<?>> peek() {
            return Optional.ofNullable((this.peekedEvent != null || hasNextAvailable()) ? this.peekedEvent : null);
        }

        public boolean hasNextAvailable(int i, TimeUnit timeUnit) throws InterruptedException {
            if (this.peekedEvent == null) {
                TrackedEventMessage<?> peek = peek(i, timeUnit);
                this.peekedEvent = peek;
                if (peek == null) {
                    return false;
                }
            }
            return true;
        }

        /* renamed from: nextAvailable, reason: merged with bridge method [inline-methods] */
        public TrackedEventMessage<?> m34nextAvailable() throws InterruptedException {
            while (this.peekedEvent == null) {
                this.peekedEvent = peek(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
            }
            TrackedEventMessage<?> trackedEventMessage = this.peekedEvent;
            this.peekedEvent = null;
            return trackedEventMessage;
        }

        private TrackedEventMessage<?> peek(int i, TimeUnit timeUnit) throws InterruptedException {
            boolean z = LegacyEmbeddedEventStore.this.optimizeEventConsumption;
            if (LegacyEmbeddedEventStore.this.tailingConsumers.contains(this)) {
                if (!behindGlobalCache()) {
                    return peekGlobalStream(i, timeUnit);
                }
                stopTailingGlobalStream();
                z = false;
            }
            return peekPrivateStream(z, i, timeUnit);
        }

        private boolean behindGlobalCache() {
            return LegacyEmbeddedEventStore.this.oldest != null && (this.lastNode == null ? nextNode() == null : this.lastNode.index < LegacyEmbeddedEventStore.this.oldest.index);
        }

        private void stopTailingGlobalStream() {
            LegacyEmbeddedEventStore.this.tailingConsumers.remove(this);
            this.lastNode = null;
        }

        private TrackedEventMessage<?> peekGlobalStream(int i, TimeUnit timeUnit) throws InterruptedException {
            Node nextNode = nextNode();
            Node node = nextNode;
            if (nextNode == null && i > 0) {
                LegacyEmbeddedEventStore.this.consumerLock.lock();
                try {
                    if (LegacyEmbeddedEventStore.this.consumableEventsCondition.await(i, timeUnit)) {
                        node = nextNode();
                    }
                } finally {
                    LegacyEmbeddedEventStore.this.consumerLock.unlock();
                }
            }
            if (node == null) {
                return null;
            }
            if (LegacyEmbeddedEventStore.this.tailingConsumers.contains(this)) {
                this.lastNode = node;
            }
            this.lastToken = node.event.trackingToken();
            return node.event;
        }

        private TrackedEventMessage<?> peekPrivateStream(boolean z, int i, TimeUnit timeUnit) throws InterruptedException {
            if (this.privateIterator == null) {
                this.privateStream = LegacyEmbeddedEventStore.this.storageEngine().readEvents(this.lastToken, false);
                this.privateIterator = this.privateStream.iterator();
            }
            if (this.privateIterator.hasNext()) {
                TrackedEventMessage<?> next = this.privateIterator.next();
                this.lastToken = next.trackingToken();
                return next;
            }
            if (z) {
                closePrivateStream();
                this.lastNode = LegacyEmbeddedEventStore.this.findNode(this.lastToken);
                LegacyEmbeddedEventStore.this.tailingConsumers.add(this);
                LegacyEmbeddedEventStore.this.ensureProducerStarted();
                if (i > 0) {
                    return peek(i, timeUnit);
                }
                return null;
            }
            LegacyEmbeddedEventStore.this.consumerLock.lock();
            try {
                if (!LegacyEmbeddedEventStore.this.consumableEventsCondition.await(i, timeUnit) || !this.privateIterator.hasNext()) {
                    return null;
                }
                TrackedEventMessage<?> next2 = this.privateIterator.next();
                this.lastToken = next2.trackingToken();
                LegacyEmbeddedEventStore.this.consumerLock.unlock();
                return next2;
            } finally {
                LegacyEmbeddedEventStore.this.consumerLock.unlock();
            }
        }

        private Node nextNode() {
            Node node;
            Node node2 = this.lastNode;
            if (node2 != null) {
                return node2.next;
            }
            Node node3 = LegacyEmbeddedEventStore.this.oldest;
            while (true) {
                node = node3;
                if (node == null || Objects.equals(node.previousToken, this.lastToken)) {
                    break;
                }
                node3 = node.next;
            }
            return node;
        }

        private TrackingToken lastToken() {
            return this.lastToken;
        }

        public void close() {
            closePrivateStream();
            stopTailingGlobalStream();
        }

        private void closePrivateStream() {
            Optional.ofNullable(this.privateStream).ifPresent(stream -> {
                this.privateStream = null;
                this.privateIterator = null;
                stream.close();
            });
        }
    }

    /* loaded from: input_file:org/axonframework/eventsourcing/eventstore/LegacyEmbeddedEventStore$EventProducer.class */
    private class EventProducer implements AutoCloseable {
        private final Lock lock = new ReentrantLock();
        private final Condition dataAvailableCondition = this.lock.newCondition();
        private final long fetchDelayNanos;
        private final int cachedEvents;
        private volatile boolean shouldFetch;
        private volatile boolean closed;
        private Stream<? extends TrackedEventMessage<?>> eventStream;
        private Node newest;

        private EventProducer(long j, int i) {
            this.fetchDelayNanos = j;
            this.cachedEvents = i;
        }

        private void run() throws InterruptedException {
            boolean z = false;
            while (!this.closed) {
                this.shouldFetch = true;
                while (this.shouldFetch) {
                    this.shouldFetch = false;
                    z = fetchData();
                }
                if (!z) {
                    waitForData();
                }
            }
        }

        private void waitForData() throws InterruptedException {
            this.lock.lock();
            try {
                if (!this.shouldFetch) {
                    this.dataAvailableCondition.awaitNanos(this.fetchDelayNanos);
                }
            } finally {
                this.lock.unlock();
            }
        }

        private void fetchIfWaiting() {
            this.shouldFetch = true;
            this.lock.lock();
            try {
                this.dataAvailableCondition.signalAll();
            } finally {
                this.lock.unlock();
            }
        }

        private boolean fetchData() {
            Node node = this.newest;
            if (!LegacyEmbeddedEventStore.this.tailingConsumers.isEmpty()) {
                try {
                    this.eventStream = LegacyEmbeddedEventStore.this.storageEngine().readEvents(lastToken(), true);
                    this.eventStream.forEach(trackedEventMessage -> {
                        Node node2 = new Node(nextIndex(), lastToken(), trackedEventMessage);
                        if (this.newest != null) {
                            this.newest.next = node2;
                        }
                        this.newest = node2;
                        if (LegacyEmbeddedEventStore.this.oldest == null) {
                            LegacyEmbeddedEventStore.this.oldest = node2;
                        }
                        notifyConsumers();
                        trimCache();
                    });
                } catch (Exception e) {
                    LegacyEmbeddedEventStore.logger.error("Failed to read events from the underlying event storage", e);
                }
            }
            return !Objects.equals(this.newest, node);
        }

        private TrackingToken lastToken() {
            if (this.newest != null) {
                return this.newest.event.trackingToken();
            }
            List list = (List) LegacyEmbeddedEventStore.this.tailingConsumers.stream().map((v0) -> {
                return v0.lastToken();
            }).collect(Collectors.toList());
            if (list.isEmpty() || list.contains(null)) {
                return null;
            }
            return (TrackingToken) list.get(0);
        }

        private long nextIndex() {
            if (this.newest == null) {
                return 0L;
            }
            return this.newest.index + 1;
        }

        private void notifyConsumers() {
            LegacyEmbeddedEventStore.this.consumerLock.lock();
            try {
                LegacyEmbeddedEventStore.this.consumableEventsCondition.signalAll();
            } finally {
                LegacyEmbeddedEventStore.this.consumerLock.unlock();
            }
        }

        private void trimCache() {
            Node node;
            Node node2 = LegacyEmbeddedEventStore.this.oldest;
            while (true) {
                node = node2;
                if (this.newest == null || node == null || this.newest.index - node.index < this.cachedEvents) {
                    break;
                } else {
                    node2 = node.next;
                }
            }
            LegacyEmbeddedEventStore.this.oldest = node;
        }

        @Override // java.lang.AutoCloseable
        public void close() {
            this.closed = true;
            if (this.eventStream != null) {
                this.eventStream.close();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/axonframework/eventsourcing/eventstore/LegacyEmbeddedEventStore$Node.class */
    public static class Node {
        private final long index;
        private final TrackingToken previousToken;
        private final TrackedEventMessage<?> event;
        private volatile Node next;

        private Node(long j, TrackingToken trackingToken, TrackedEventMessage<?> trackedEventMessage) {
            this.index = j;
            this.previousToken = trackingToken;
            this.event = trackedEventMessage;
        }
    }

    protected LegacyEmbeddedEventStore(Builder builder) {
        super(builder);
        this.consumerLock = new ReentrantLock();
        this.consumableEventsCondition = this.consumerLock.newCondition();
        this.tailingConsumers = new CopyOnWriteArraySet();
        this.producerStarted = new AtomicBoolean();
        this.threadFactory = builder.threadFactory;
        this.optimizeEventConsumption = builder.optimizeEventConsumption;
        this.cleanupService = Executors.newScheduledThreadPool(1, this.threadFactory);
        TimeUnit timeUnit = builder.timeUnit;
        this.producer = new EventProducer(timeUnit.toNanos(builder.fetchDelay), builder.cachedEvents);
        this.cleanupDelayMillis = timeUnit.toMillis(builder.cleanupDelay);
    }

    public static Builder builder() {
        return new Builder();
    }

    public void shutDown() {
        this.tailingConsumers.forEach((v0) -> {
            IOUtils.closeQuietly(v0);
        });
        IOUtils.closeQuietly(this.producer);
        this.cleanupService.shutdownNow();
    }

    private void ensureProducerStarted() {
        if (this.producerStarted.compareAndSet(false, true)) {
            this.threadFactory.newThread(() -> {
                try {
                    this.producer.run();
                } catch (InterruptedException e) {
                    logger.warn("Producer thread was interrupted. Shutting down event store.", e);
                    Thread.currentThread().interrupt();
                }
            }).start();
            this.cleanupService.scheduleWithFixedDelay(new Cleaner(), this.cleanupDelayMillis, this.cleanupDelayMillis, TimeUnit.MILLISECONDS);
        }
    }

    protected void afterCommit(List<? extends EventMessage<?>> list) {
        this.producer.fetchIfWaiting();
    }

    /* renamed from: openStream, reason: merged with bridge method [inline-methods] */
    public TrackingEventStream m32openStream(TrackingToken trackingToken) {
        EventConsumer eventConsumer;
        Node findNode = findNode(trackingToken);
        if (findNode == null || !this.optimizeEventConsumption) {
            eventConsumer = new EventConsumer(trackingToken);
        } else {
            eventConsumer = new EventConsumer(this, findNode);
            this.tailingConsumers.add(eventConsumer);
        }
        return eventConsumer;
    }

    private Node findNode(TrackingToken trackingToken) {
        Node node;
        Node node2 = this.oldest;
        while (true) {
            node = node2;
            if (node == null || node.event.trackingToken().equals(trackingToken)) {
                break;
            }
            node2 = node.next;
        }
        return node;
    }

    public void registerLifecycleHandlers(@Nonnull LifecycleRegistry lifecycleRegistry) {
        lifecycleRegistry.onShutdown(1073741813, this::shutDown);
    }
}
