package org.axonframework.axonserver.connector.event;

import io.axoniq.axonserver.connector.AxonServerConnection;
import io.axoniq.axonserver.connector.event.DcbEventChannel;
import jakarta.annotation.Nonnull;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.ObjectMethods;
import java.time.Instant;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;
import org.axonframework.common.infra.ComponentDescriptor;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.GlobalSequenceTrackingToken;
import org.axonframework.eventhandling.TrackingToken;
import org.axonframework.eventsourcing.eventstore.AppendCondition;
import org.axonframework.eventsourcing.eventstore.AppendEventsTransactionRejectedException;
import org.axonframework.eventsourcing.eventstore.ConsistencyMarker;
import org.axonframework.eventsourcing.eventstore.EmptyAppendTransaction;
import org.axonframework.eventsourcing.eventstore.EventStorageEngine;
import org.axonframework.eventsourcing.eventstore.GlobalIndexConsistencyMarker;
import org.axonframework.eventsourcing.eventstore.SourcingCondition;
import org.axonframework.eventsourcing.eventstore.TaggedEventMessage;
import org.axonframework.eventstreaming.StreamingCondition;
import org.axonframework.messaging.MessageStream;
import org.axonframework.serialization.Converter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/axonframework/axonserver/connector/event/AxonServerEventStorageEngine.class */
public class AxonServerEventStorageEngine implements EventStorageEngine {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private final AxonServerConnection connection;
    private final EventConverter converter;

    /* loaded from: input_file:org/axonframework/axonserver/connector/event/AxonServerEventStorageEngine$AxonServerAppendTransaction.class */
    private static final class AxonServerAppendTransaction extends Record implements EventStorageEngine.AppendTransaction {
        private final DcbEventChannel.AppendEventsTransaction appendTransaction;

        private AxonServerAppendTransaction(DcbEventChannel.AppendEventsTransaction appendEventsTransaction) {
            this.appendTransaction = appendEventsTransaction;
        }

        public CompletableFuture<ConsistencyMarker> commit() {
            AxonServerEventStorageEngine.logger.debug("Committing append event transaction...");
            return this.appendTransaction.commit().exceptionallyCompose(th -> {
                AxonServerEventStorageEngine.logger.warn("Committing append transaction failed.", th);
                return CompletableFuture.failedFuture(new AppendEventsTransactionRejectedException(th.getMessage()));
            }).thenApply(appendEventsResponse -> {
                long consistencyMarker = appendEventsResponse.getConsistencyMarker();
                AxonServerEventStorageEngine.logger.debug("Committing append transaction succeeded with marker [{}].", Long.valueOf(consistencyMarker));
                return new GlobalIndexConsistencyMarker(consistencyMarker);
            });
        }

        public void rollback() {
            this.appendTransaction.rollback();
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, AxonServerAppendTransaction.class), AxonServerAppendTransaction.class, "appendTransaction", "FIELD:Lorg/axonframework/axonserver/connector/event/AxonServerEventStorageEngine$AxonServerAppendTransaction;->appendTransaction:Lio/axoniq/axonserver/connector/event/DcbEventChannel$AppendEventsTransaction;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, AxonServerAppendTransaction.class), AxonServerAppendTransaction.class, "appendTransaction", "FIELD:Lorg/axonframework/axonserver/connector/event/AxonServerEventStorageEngine$AxonServerAppendTransaction;->appendTransaction:Lio/axoniq/axonserver/connector/event/DcbEventChannel$AppendEventsTransaction;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, AxonServerAppendTransaction.class, Object.class), AxonServerAppendTransaction.class, "appendTransaction", "FIELD:Lorg/axonframework/axonserver/connector/event/AxonServerEventStorageEngine$AxonServerAppendTransaction;->appendTransaction:Lio/axoniq/axonserver/connector/event/DcbEventChannel$AppendEventsTransaction;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        public DcbEventChannel.AppendEventsTransaction appendTransaction() {
            return this.appendTransaction;
        }
    }

    public AxonServerEventStorageEngine(@Nonnull AxonServerConnection axonServerConnection, @Nonnull Converter converter) {
        this.connection = (AxonServerConnection) Objects.requireNonNull(axonServerConnection, "The Axon Server connection cannot be null.");
        this.converter = new EventConverter(converter);
    }

    public CompletableFuture<EventStorageEngine.AppendTransaction> appendEvents(@Nonnull AppendCondition appendCondition, @Nonnull List<TaggedEventMessage<?>> list) {
        if (list.isEmpty()) {
            return CompletableFuture.completedFuture(EmptyAppendTransaction.INSTANCE);
        }
        DcbEventChannel.AppendEventsTransaction startTransaction = eventChannel().startTransaction(ConditionConverter.convertAppendCondition(appendCondition));
        Stream<TaggedEventMessage<?>> stream = list.stream();
        EventConverter eventConverter = this.converter;
        Objects.requireNonNull(eventConverter);
        stream.map(eventConverter::convertTaggedEventMessage).forEach(taggedEvent -> {
            if (logger.isDebugEnabled()) {
                logger.debug("Appended event [{}] with timestamp [{}].", taggedEvent.getEvent().getIdentifier(), Long.valueOf(taggedEvent.getEvent().getTimestamp()));
            }
            startTransaction.append(taggedEvent);
        });
        return CompletableFuture.completedFuture(new AxonServerAppendTransaction(startTransaction));
    }

    public MessageStream<EventMessage<?>> source(@Nonnull SourcingCondition sourcingCondition) {
        if (logger.isDebugEnabled()) {
            logger.debug("Start sourcing events with condition [{}].", sourcingCondition);
        }
        return new SourcingEventMessageStream(eventChannel().source(ConditionConverter.convertSourcingCondition(sourcingCondition)), this.converter);
    }

    public MessageStream<EventMessage<?>> stream(@Nonnull StreamingCondition streamingCondition) {
        if (logger.isDebugEnabled()) {
            logger.debug("Start streaming events with condition [{}].", streamingCondition);
        }
        return new StreamingEventMessageStream(eventChannel().stream(ConditionConverter.convertStreamingCondition(streamingCondition)), this.converter);
    }

    public CompletableFuture<TrackingToken> firstToken() {
        if (logger.isDebugEnabled()) {
            logger.debug("Operation firstToken() is invoked.");
        }
        return eventChannel().tail().thenApply(getTailResponse -> {
            return new GlobalSequenceTrackingToken(getTailResponse.getSequence());
        });
    }

    public CompletableFuture<TrackingToken> latestToken() {
        if (logger.isDebugEnabled()) {
            logger.debug("Operation latestToken() is invoked.");
        }
        return eventChannel().head().thenApply(getHeadResponse -> {
            return new GlobalSequenceTrackingToken(getHeadResponse.getSequence());
        });
    }

    public CompletableFuture<TrackingToken> tokenAt(@Nonnull Instant instant) {
        if (logger.isDebugEnabled()) {
            logger.debug("Operation tokenAt() is invoked with Instant [{}].", instant);
        }
        return eventChannel().getSequenceAt(instant).thenApply(getSequenceAtResponse -> {
            return new GlobalSequenceTrackingToken(getSequenceAtResponse.getSequence());
        });
    }

    private DcbEventChannel eventChannel() {
        return this.connection.dcbEventChannel();
    }

    public void describeTo(@Nonnull ComponentDescriptor componentDescriptor) {
        componentDescriptor.describeProperty("connection", this.connection);
        componentDescriptor.describeProperty("converter", this.converter);
    }
}
