package org.axonframework.eventsourcing.eventstore;

import jakarta.annotation.Nonnull;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.axonframework.common.ObjectUtils;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventsourcing.eventstore.EventStorageEngine;
import org.axonframework.messaging.Context;
import org.axonframework.messaging.MessageStream;
import org.axonframework.messaging.unitofwork.ProcessingContext;

/* loaded from: input_file:org/axonframework/eventsourcing/eventstore/DefaultEventStoreTransaction.class */
public class DefaultEventStoreTransaction implements EventStoreTransaction {
    private final EventStorageEngine eventStorageEngine;
    private final ProcessingContext processingContext;
    private final TagResolver tagResolver;
    private final List<Consumer<EventMessage<?>>> callbacks = new CopyOnWriteArrayList();
    private final Context.ResourceKey<AppendCondition> appendConditionKey = Context.ResourceKey.withLabel("appendCondition");
    private final Context.ResourceKey<List<TaggedEventMessage<?>>> eventQueueKey = Context.ResourceKey.withLabel("eventQueue");
    private final Context.ResourceKey<ConsistencyMarker> appendPositionKey = Context.ResourceKey.withLabel("appendPosition");

    public DefaultEventStoreTransaction(@Nonnull EventStorageEngine eventStorageEngine, @Nonnull ProcessingContext processingContext, @Nonnull TagResolver tagResolver) {
        this.eventStorageEngine = eventStorageEngine;
        this.processingContext = processingContext;
        this.tagResolver = tagResolver;
    }

    @Override // org.axonframework.eventsourcing.eventstore.EventStoreTransaction
    public MessageStream<? extends EventMessage<?>> source(@Nonnull SourcingCondition sourcingCondition) {
        AppendCondition appendCondition = (AppendCondition) this.processingContext.updateResource(this.appendConditionKey, appendCondition2 -> {
            return appendCondition2 == null ? AppendCondition.withCriteria(sourcingCondition.criteria()) : appendCondition2.orCriteria(sourcingCondition.criteria());
        });
        MessageStream<EventMessage<?>> source = this.eventStorageEngine.source(sourcingCondition);
        if (appendCondition.consistencyMarker() != ConsistencyMarker.ORIGIN) {
            return source;
        }
        AtomicReference atomicReference = new AtomicReference(appendCondition.consistencyMarker());
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        return source.onNext(entry -> {
            ConsistencyMarker consistencyMarker = (ConsistencyMarker) entry.getResource(ConsistencyMarker.RESOURCE_KEY);
            if (consistencyMarker != null) {
                atomicReference.set(consistencyMarker);
            }
            atomicBoolean.set(true);
        }).whenComplete(() -> {
            this.processingContext.updateResource(this.appendPositionKey, consistencyMarker -> {
                return !atomicBoolean.get() ? consistencyMarker : (consistencyMarker == null || consistencyMarker == ConsistencyMarker.ORIGIN) ? (ConsistencyMarker) atomicReference.get() : consistencyMarker.lowerBound((ConsistencyMarker) atomicReference.get());
            });
        });
    }

    @Override // org.axonframework.eventsourcing.eventstore.EventStoreTransaction
    public void appendEvent(@Nonnull EventMessage<?> eventMessage) {
        ((List) this.processingContext.computeResourceIfAbsent(this.eventQueueKey, () -> {
            attachAppendEventsStep();
            return new CopyOnWriteArrayList();
        })).add(new GenericTaggedEventMessage(eventMessage, this.tagResolver.resolve(eventMessage)));
        this.callbacks.forEach(consumer -> {
            consumer.accept(eventMessage);
        });
    }

    private void attachAppendEventsStep() {
        this.processingContext.onPrepareCommit(processingContext -> {
            return this.eventStorageEngine.appendEvents((AppendCondition) processingContext.updateResource(this.appendConditionKey, appendCondition -> {
                return (appendCondition == null || AppendCondition.none().equals(appendCondition)) ? AppendCondition.none() : appendCondition.withMarker((ConsistencyMarker) ObjectUtils.getOrDefault((ConsistencyMarker) processingContext.getResource(this.appendPositionKey), appendCondition.consistencyMarker()));
            }), (List<TaggedEventMessage<?>>) processingContext.getResource(this.eventQueueKey)).thenAccept(appendTransaction -> {
                this.processingContext.onCommit(processingContext -> {
                    return doCommit(processingContext, appendTransaction);
                });
                this.processingContext.onError((processingContext2, phase, th) -> {
                    appendTransaction.rollback();
                });
            });
        });
    }

    private CompletableFuture<ConsistencyMarker> doCommit(ProcessingContext processingContext, EventStorageEngine.AppendTransaction appendTransaction) {
        return appendTransaction.commit().whenComplete((consistencyMarker, th) -> {
            if (consistencyMarker != null) {
                processingContext.updateResource(this.appendPositionKey, consistencyMarker -> {
                    return consistencyMarker.upperBound((ConsistencyMarker) Objects.requireNonNullElse(consistencyMarker, ConsistencyMarker.ORIGIN));
                });
            }
        });
    }

    @Override // org.axonframework.eventsourcing.eventstore.EventStoreTransaction
    public void onAppend(@Nonnull Consumer<EventMessage<?>> consumer) {
        this.callbacks.add(consumer);
    }

    @Override // org.axonframework.eventsourcing.eventstore.EventStoreTransaction
    public ConsistencyMarker appendPosition() {
        return (ConsistencyMarker) ObjectUtils.getOrDefault((ConsistencyMarker) this.processingContext.getResource(this.appendPositionKey), ConsistencyMarker.ORIGIN);
    }
}
