package org.axonframework.commandhandling.distributed;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import org.axonframework.commandhandling.CommandMessage;
import org.axonframework.commandhandling.CommandResultMessage;
import org.axonframework.commandhandling.GenericCommandMessage;
import org.axonframework.commandhandling.GenericCommandResultMessage;
import org.axonframework.commandhandling.NoHandlerForCommandException;
import org.axonframework.commandhandling.SimpleCommandBus;
import org.axonframework.commandhandling.distributed.Connector;
import org.axonframework.common.infra.ComponentDescriptor;
import org.axonframework.messaging.Message;
import org.axonframework.messaging.MessageStream;
import org.axonframework.messaging.MessageType;
import org.axonframework.messaging.unitofwork.ProcessingContext;
import org.axonframework.messaging.unitofwork.ProcessingLifecycleHandlerRegistrar;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/axonframework/commandhandling/distributed/DistributedCommandBusTest.class */
class DistributedCommandBusTest {
    private CommandMessage<?> testCommand;
    private StubConnector connector;
    private SimpleCommandBus delegate;
    private DistributedCommandBus testSubject;

    /* loaded from: input_file:org/axonframework/commandhandling/distributed/DistributedCommandBusTest$StubConnector.class */
    private static class StubConnector implements Connector {
        private final Map<CommandMessage<?>, CompletableFuture<?>> dispatchedCommands = new ConcurrentHashMap();
        private final Map<String, Integer> subscriptions = new ConcurrentHashMap();
        private final AtomicReference<BiConsumer<CommandMessage<?>, Connector.ResultCallback>> handler = new AtomicReference<>();

        private StubConnector() {
        }

        public CompletableFuture<CommandResultMessage<?>> dispatch(CommandMessage<?> commandMessage, ProcessingContext processingContext) {
            CompletableFuture<?> completableFuture = new CompletableFuture<>();
            this.dispatchedCommands.put(commandMessage, completableFuture);
            return completableFuture;
        }

        public void subscribe(String str, int i) {
            this.subscriptions.put(str, Integer.valueOf(i));
        }

        public boolean unsubscribe(String str) {
            return this.subscriptions.remove(str) != null;
        }

        public void onIncomingCommand(BiConsumer<CommandMessage<?>, Connector.ResultCallback> biConsumer) {
            this.handler.set(biConsumer);
        }

        public Map<CommandMessage<?>, CompletableFuture<?>> getDispatchedCommands() {
            return this.dispatchedCommands;
        }

        public Map<String, Integer> getSubscriptions() {
            return this.subscriptions;
        }

        public BiConsumer<CommandMessage<?>, Connector.ResultCallback> getHandler() {
            return this.handler.get();
        }
    }

    DistributedCommandBusTest() {
    }

    @BeforeEach
    void setUp() {
        this.testCommand = new GenericCommandMessage(new MessageType("command"), "test");
        this.connector = new StubConnector();
        this.delegate = new SimpleCommandBus(new ProcessingLifecycleHandlerRegistrar[0]);
        this.testSubject = new DistributedCommandBus(this.delegate, this.connector);
    }

    @Test
    void publishedCommandsAreSentToConnector() {
        CompletableFuture dispatch = this.testSubject.dispatch(this.testCommand, (ProcessingContext) null);
        Assertions.assertSame(dispatch, this.connector.getDispatchedCommands().get(this.testCommand));
        Assertions.assertFalse(dispatch.isDone());
    }

    @Test
    void incomingCommandsAreRejectedWhenNoHandlerRegistered() {
        Connector.ResultCallback resultCallback = (Connector.ResultCallback) Mockito.mock(new Connector.ResultCallback[0]);
        this.connector.handler.get().accept(this.testCommand, resultCallback);
        ((Connector.ResultCallback) Mockito.verify(resultCallback)).error((Throwable) Mockito.isA(NoHandlerForCommandException.class));
    }

    @Test
    void incomingCommandsAreDelegatedToSubscribedHandlers() {
        GenericCommandResultMessage genericCommandResultMessage = new GenericCommandResultMessage(new MessageType("result"), "OK");
        this.testSubject.subscribe(this.testCommand.type().qualifiedName(), (commandMessage, processingContext) -> {
            return MessageStream.just(genericCommandResultMessage);
        });
        Connector.ResultCallback resultCallback = (Connector.ResultCallback) Mockito.mock(new Connector.ResultCallback[0]);
        this.connector.handler.get().accept(this.testCommand, resultCallback);
        ((Connector.ResultCallback) Mockito.verify(resultCallback)).success((Message) Mockito.same(genericCommandResultMessage));
    }

    @Test
    void describeToMentionsConnector() {
        ComponentDescriptor componentDescriptor = (ComponentDescriptor) Mockito.mock(new ComponentDescriptor[0]);
        this.testSubject.describeTo(componentDescriptor);
        ((ComponentDescriptor) Mockito.verify(componentDescriptor)).describeWrapperOf(this.delegate);
        ((ComponentDescriptor) Mockito.verify(componentDescriptor)).describeProperty("connector", this.connector);
    }
}
