package org.axonframework.axonserver.connector.command;

import io.axoniq.axonserver.grpc.command.Command;
import io.axoniq.axonserver.grpc.command.CommandResponse;
import java.lang.invoke.MethodHandles;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nonnull;
import org.axonframework.axonserver.connector.AxonServerConfiguration;
import org.axonframework.axonserver.connector.AxonServerConnectionManager;
import org.axonframework.axonserver.connector.DispatchInterceptors;
import org.axonframework.axonserver.connector.ErrorCode;
import org.axonframework.axonserver.connector.PriorityRunnable;
import org.axonframework.axonserver.connector.TargetContextResolver;
import org.axonframework.axonserver.connector.util.ExecutorServiceBuilder;
import org.axonframework.axonserver.connector.util.ProcessingInstructionHelper;
import org.axonframework.commandhandling.CommandBus;
import org.axonframework.commandhandling.CommandBusSpanFactory;
import org.axonframework.commandhandling.CommandCallback;
import org.axonframework.commandhandling.CommandMessage;
import org.axonframework.commandhandling.DefaultCommandBusSpanFactory;
import org.axonframework.commandhandling.GenericCommandResultMessage;
import org.axonframework.commandhandling.callbacks.NoOpCallback;
import org.axonframework.commandhandling.distributed.RoutingStrategy;
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.common.BuilderUtils;
import org.axonframework.common.ObjectUtils;
import org.axonframework.common.Registration;
import org.axonframework.common.StringUtils;
import org.axonframework.lifecycle.Lifecycle;
import org.axonframework.lifecycle.ShutdownLatch;
import org.axonframework.messaging.Distributed;
import org.axonframework.messaging.MessageDispatchInterceptor;
import org.axonframework.messaging.MessageHandler;
import org.axonframework.messaging.MessageHandlerInterceptor;
import org.axonframework.serialization.Serializer;
import org.axonframework.tracing.NoOpSpanFactory;
import org.axonframework.tracing.Span;
import org.axonframework.tracing.SpanFactory;
import org.axonframework.tracing.SpanScope;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/axonframework/axonserver/connector/command/AxonServerCommandBus.class */
public class AxonServerCommandBus implements CommandBus, Distributed<CommandBus>, Lifecycle {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private static final AtomicLong TASK_SEQUENCE = new AtomicLong(Long.MIN_VALUE);
    private final AxonServerConnectionManager axonServerConnectionManager;
    private final CommandBus localSegment;
    private final CommandSerializer serializer;
    private final RoutingStrategy routingStrategy;
    private final CommandPriorityCalculator priorityCalculator;
    private final CommandLoadFactorProvider loadFactorProvider;
    private final String context;
    private final DispatchInterceptors<CommandMessage<?>> dispatchInterceptors;
    private final TargetContextResolver<? super CommandMessage<?>> targetContextResolver;
    private final CommandCallback<Object, Object> defaultCommandCallback;
    private final ShutdownLatch shutdownLatch = new ShutdownLatch();
    private final ExecutorService executorService;
    private final CommandBusSpanFactory spanFactory;

    /* loaded from: input_file:org/axonframework/axonserver/connector/command/AxonServerCommandBus$Builder.class */
    public static class Builder {
        private AxonServerConnectionManager axonServerConnectionManager;
        private AxonServerConfiguration configuration;
        private CommandBus localSegment;
        private Serializer serializer;
        private RoutingStrategy routingStrategy;
        private String defaultContext;
        private CommandCallback<Object, Object> defaultCommandCallback = NoOpCallback.INSTANCE;
        private CommandPriorityCalculator priorityCalculator = CommandPriorityCalculator.defaultCommandPriorityCalculator();
        private ExecutorServiceBuilder executorServiceBuilder = ExecutorServiceBuilder.defaultCommandExecutorServiceBuilder();
        private CommandLoadFactorProvider loadFactorProvider = str -> {
            return 100;
        };
        private TargetContextResolver<? super CommandMessage<?>> targetContextResolver = commandMessage -> {
            return StringUtils.nonEmptyOrNull(this.defaultContext) ? this.defaultContext : this.configuration.getContext();
        };
        private CommandBusSpanFactory spanFactory = DefaultCommandBusSpanFactory.builder().spanFactory(NoOpSpanFactory.INSTANCE).build();

        public Builder axonServerConnectionManager(AxonServerConnectionManager axonServerConnectionManager) {
            BuilderUtils.assertNonNull(axonServerConnectionManager, "AxonServerConnectionManager may not be null");
            this.axonServerConnectionManager = axonServerConnectionManager;
            return this;
        }

        public Builder configuration(AxonServerConfiguration axonServerConfiguration) {
            BuilderUtils.assertNonNull(axonServerConfiguration, "AxonServerConfiguration may not be null");
            this.configuration = axonServerConfiguration;
            return this;
        }

        public Builder localSegment(CommandBus commandBus) {
            BuilderUtils.assertNonNull(commandBus, "Local CommandBus may not be null");
            this.localSegment = commandBus;
            return this;
        }

        public Builder serializer(Serializer serializer) {
            BuilderUtils.assertNonNull(serializer, "Serializer may not be null");
            this.serializer = serializer;
            return this;
        }

        public Builder routingStrategy(RoutingStrategy routingStrategy) {
            BuilderUtils.assertNonNull(routingStrategy, "RoutingStrategy may not be null");
            this.routingStrategy = routingStrategy;
            return this;
        }

        public Builder defaultCommandCallback(CommandCallback<Object, Object> commandCallback) {
            this.defaultCommandCallback = (CommandCallback) ObjectUtils.getOrDefault(commandCallback, NoOpCallback.INSTANCE);
            return this;
        }

        public Builder priorityCalculator(CommandPriorityCalculator commandPriorityCalculator) {
            BuilderUtils.assertNonNull(commandPriorityCalculator, "CommandPriorityCalculator may not be null");
            this.priorityCalculator = commandPriorityCalculator;
            return this;
        }

        public Builder targetContextResolver(TargetContextResolver<? super CommandMessage<?>> targetContextResolver) {
            BuilderUtils.assertNonNull(targetContextResolver, "TargetContextResolver may not be null");
            this.targetContextResolver = targetContextResolver;
            return this;
        }

        public Builder executorServiceBuilder(ExecutorServiceBuilder executorServiceBuilder) {
            BuilderUtils.assertNonNull(executorServiceBuilder, "ExecutorServiceBuilder may not be null");
            this.executorServiceBuilder = executorServiceBuilder;
            return this;
        }

        public Builder loadFactorProvider(CommandLoadFactorProvider commandLoadFactorProvider) {
            BuilderUtils.assertNonNull(commandLoadFactorProvider, "CommandLoadFactorProvider may not be null");
            this.loadFactorProvider = commandLoadFactorProvider;
            return this;
        }

        public Builder defaultContext(String str) {
            BuilderUtils.assertNonEmpty(str, "The context may not be null or empty");
            this.defaultContext = str;
            return this;
        }

        @Deprecated
        public Builder spanFactory(@Nonnull SpanFactory spanFactory) {
            BuilderUtils.assertNonNull(spanFactory, "SpanFactory may not be null");
            this.spanFactory = DefaultCommandBusSpanFactory.builder().spanFactory(spanFactory).build();
            return this;
        }

        public Builder spanFactory(@Nonnull CommandBusSpanFactory commandBusSpanFactory) {
            BuilderUtils.assertNonNull(commandBusSpanFactory, "SpanFactory may not be null");
            this.spanFactory = commandBusSpanFactory;
            return this;
        }

        public AxonServerCommandBus build() {
            return new AxonServerCommandBus(this);
        }

        protected CommandSerializer buildSerializer() {
            return new CommandSerializer(this.serializer, this.configuration);
        }

        protected void validate() throws AxonConfigurationException {
            BuilderUtils.assertNonNull(this.axonServerConnectionManager, "The AxonServerConnectionManager is a hard requirement and should be provided");
            BuilderUtils.assertNonNull(this.configuration, "The AxonServerConfiguration is a hard requirement and should be provided");
            BuilderUtils.assertNonNull(this.localSegment, "The Local CommandBus is a hard requirement and should be provided");
            BuilderUtils.assertNonNull(this.serializer, "The Serializer is a hard requirement and should be provided");
            BuilderUtils.assertNonNull(this.routingStrategy, "The RoutingStrategy is a hard requirement and should be provided");
        }
    }

    /* loaded from: input_file:org/axonframework/axonserver/connector/command/AxonServerCommandBus$CommandProcessingTask.class */
    private static class CommandProcessingTask implements Runnable {
        private final CompletableFuture<CommandResponse> result;
        private final CommandBus localSegment;
        private final Command command;
        private final CommandSerializer serializer;
        private final CommandBusSpanFactory spanFactory;

        public CommandProcessingTask(Command command, CommandSerializer commandSerializer, CompletableFuture<CommandResponse> completableFuture, CommandBus commandBus, CommandBusSpanFactory commandBusSpanFactory) {
            this.command = command;
            this.serializer = commandSerializer;
            this.result = completableFuture;
            this.localSegment = commandBus;
            this.spanFactory = commandBusSpanFactory;
        }

        @Override // java.lang.Runnable
        public void run() {
            CommandMessage<?> deserialize = this.serializer.deserialize(this.command);
            Span createHandleCommandSpan = this.spanFactory.createHandleCommandSpan(deserialize, true);
            createHandleCommandSpan.run(() -> {
                try {
                    this.localSegment.dispatch(deserialize, (commandMessage, commandResultMessage) -> {
                        if (commandResultMessage.isExceptional()) {
                            createHandleCommandSpan.recordException(commandResultMessage.exceptionResult());
                        }
                        this.result.complete(this.serializer.serialize(commandResultMessage, this.command.getMessageIdentifier()));
                    });
                } catch (Exception e) {
                    createHandleCommandSpan.recordException(e);
                    this.result.completeExceptionally(e);
                }
            });
        }
    }

    public static Builder builder() {
        return new Builder();
    }

    public AxonServerCommandBus(Builder builder) {
        builder.validate();
        this.axonServerConnectionManager = builder.axonServerConnectionManager;
        AxonServerConfiguration axonServerConfiguration = builder.configuration;
        this.localSegment = builder.localSegment;
        this.serializer = builder.buildSerializer();
        this.routingStrategy = builder.routingStrategy;
        this.priorityCalculator = builder.priorityCalculator;
        this.defaultCommandCallback = builder.defaultCommandCallback;
        this.loadFactorProvider = builder.loadFactorProvider;
        String context = StringUtils.nonEmptyOrNull(builder.defaultContext) ? builder.defaultContext : axonServerConfiguration.getContext();
        this.context = context;
        this.targetContextResolver = builder.targetContextResolver.orElse(message -> {
            return context;
        });
        this.executorService = builder.executorServiceBuilder.apply(builder.configuration, new PriorityBlockingQueue(1000));
        this.spanFactory = builder.spanFactory;
        this.dispatchInterceptors = new DispatchInterceptors<>();
    }

    public void registerLifecycleHandlers(@Nonnull Lifecycle.LifecycleRegistry lifecycleRegistry) {
        lifecycleRegistry.onStart(536870911, this::start);
        lifecycleRegistry.onShutdown(536870911, this::disconnect);
        lifecycleRegistry.onShutdown(0, this::shutdownDispatching);
    }

    public void start() {
        this.shutdownLatch.initialize();
    }

    public <C> void dispatch(@Nonnull CommandMessage<C> commandMessage) {
        dispatch(commandMessage, this.defaultCommandCallback);
    }

    public <C, R> void dispatch(@Nonnull CommandMessage<C> commandMessage, @Nonnull CommandCallback<? super C, ? super R> commandCallback) {
        logger.debug("Dispatch command [{}] with callback", commandMessage.getCommandName());
        doDispatch((CommandMessage) this.dispatchInterceptors.intercept(commandMessage), commandCallback);
    }

    private <C, R> void doDispatch(CommandMessage<C> commandMessage, CommandCallback<? super C, ? super R> commandCallback) {
        this.shutdownLatch.ifShuttingDown("Cannot dispatch new commands as this bus is being shutdown");
        ShutdownLatch.ActivityHandle registerActivity = this.shutdownLatch.registerActivity();
        Span start = this.spanFactory.createDispatchCommandSpan(commandMessage, true).start();
        try {
            SpanScope makeCurrent = start.makeCurrent();
            Throwable th = null;
            try {
                try {
                    this.axonServerConnectionManager.getConnection(this.targetContextResolver.resolveContext(commandMessage)).commandChannel().sendCommand(this.serializer.serialize(this.spanFactory.propagateContext(commandMessage), this.routingStrategy.getRoutingKey(commandMessage), this.priorityCalculator.determinePriority(commandMessage))).thenApply(commandResponse -> {
                        return this.serializer.deserialize(commandResponse);
                    }).exceptionally(GenericCommandResultMessage::asCommandResultMessage).thenAccept(commandResultMessage -> {
                        if (commandResultMessage.isExceptional()) {
                            start.recordException(commandResultMessage.exceptionResult());
                        }
                        commandCallback.onResult(commandMessage, commandResultMessage);
                    }).whenComplete((r4, th2) -> {
                        registerActivity.end();
                        start.end();
                    });
                    if (makeCurrent != null) {
                        if (0 != 0) {
                            try {
                                makeCurrent.close();
                            } catch (Throwable th3) {
                                th.addSuppressed(th3);
                            }
                        } else {
                            makeCurrent.close();
                        }
                    }
                } catch (Throwable th4) {
                    th = th4;
                    throw th4;
                }
            } finally {
            }
        } catch (Exception e) {
            start.recordException(e).end();
            registerActivity.end();
            commandCallback.onResult(commandMessage, GenericCommandResultMessage.asCommandResultMessage(new AxonServerCommandDispatchException(ErrorCode.COMMAND_DISPATCH_ERROR.errorCode(), "Exception while dispatching a command to AxonServer", e)));
        }
    }

    public Registration subscribe(@Nonnull String str, @Nonnull MessageHandler<? super CommandMessage<?>> messageHandler) {
        logger.debug("Subscribing command with name [{}] to this distributed CommandBus. Expect similar logging on the local segment.", str);
        Registration subscribe = this.localSegment.subscribe(str, messageHandler);
        io.axoniq.axonserver.connector.Registration registerCommandHandler = this.axonServerConnectionManager.getConnection(this.context).commandChannel().registerCommandHandler(command -> {
            CompletableFuture completableFuture = new CompletableFuture();
            this.executorService.execute(new PriorityRunnable(new CommandProcessingTask(command, this.serializer, completableFuture, this.localSegment, this.spanFactory), ProcessingInstructionHelper.priority(command.getProcessingInstructionsList()), TASK_SEQUENCE.incrementAndGet()));
            return completableFuture;
        }, this.loadFactorProvider.getFor(str), new String[]{str});
        registerCommandHandler.getClass();
        return new AxonServerRegistration(subscribe, registerCommandHandler::cancel);
    }

    /* renamed from: localSegment, reason: merged with bridge method [inline-methods] */
    public CommandBus m3localSegment() {
        return this.localSegment;
    }

    public Registration registerHandlerInterceptor(@Nonnull MessageHandlerInterceptor<? super CommandMessage<?>> messageHandlerInterceptor) {
        return this.localSegment.registerHandlerInterceptor(messageHandlerInterceptor);
    }

    public Registration registerDispatchInterceptor(@Nonnull MessageDispatchInterceptor<? super CommandMessage<?>> messageDispatchInterceptor) {
        return this.dispatchInterceptors.registerDispatchInterceptor(messageDispatchInterceptor);
    }

    public CompletableFuture<Void> disconnect() {
        return this.axonServerConnectionManager.isConnected(this.context) ? this.axonServerConnectionManager.getConnection(this.context).commandChannel().prepareDisconnect() : CompletableFuture.completedFuture(null);
    }

    public CompletableFuture<Void> shutdownDispatching() {
        return this.shutdownLatch.initiateShutdown();
    }
}
