package org.axonframework.integrationtests.axonserverconnector;

import com.fasterxml.jackson.annotation.JsonProperty;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.axonframework.axonserver.connector.AxonServerConfiguration;
import org.axonframework.axonserver.connector.AxonServerConnectionManager;
import org.axonframework.axonserver.connector.command.AxonServerCommandBus;
import org.axonframework.axonserver.connector.query.AxonServerQueryBus;
import org.axonframework.axonserver.connector.query.QueryPriorityCalculator;
import org.axonframework.commandhandling.GenericCommandMessage;
import org.axonframework.commandhandling.SimpleCommandBus;
import org.axonframework.commandhandling.distributed.AnnotationRoutingStrategy;
import org.axonframework.messaging.responsetypes.ResponseTypes;
import org.axonframework.queryhandling.GenericQueryMessage;
import org.axonframework.queryhandling.SimpleQueryBus;
import org.axonframework.serialization.json.JacksonSerializer;
import org.axonframework.test.server.AxonServerContainer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.testcontainers.images.PullPolicy;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;

@Testcontainers
/* loaded from: input_file:org/axonframework/integrationtests/axonserverconnector/MessagePriorityIntegrationTest.class */
class MessagePriorityIntegrationTest {
    private static final int PRIORITY = 42;
    private static final int REGULAR = 0;
    private AxonServerConnectionManager connectionManager;
    private AxonServerCommandBus commandBus;
    private AxonServerQueryBus queryBus;
    private static final String HOSTNAME = "localhost";

    @Container
    private static final AxonServerContainer axonServer = new AxonServerContainer().withAxonServerName("axonserver").withAxonServerHostname(HOSTNAME).withDevMode(true).withEnv("AXONIQ_AXONSERVER_INSTRUCTION-CACHE-TIMEOUT", "1000").withImagePullPolicy(PullPolicy.ageBased(Duration.ofDays(1))).withNetworkAliases(new String[]{"axonserver"});

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/axonframework/integrationtests/axonserverconnector/MessagePriorityIntegrationTest$Handled.class */
    public static class Handled {
        private final boolean priority;

        private static Handled priority() {
            return new Handled(true);
        }

        private static Handled regular() {
            return new Handled(false);
        }

        private Handled(boolean z) {
            this.priority = z;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            return obj != null && getClass() == obj.getClass() && this.priority == ((Handled) obj).priority;
        }

        public int hashCode() {
            return Objects.hash(Boolean.valueOf(this.priority));
        }

        public String toString() {
            return this.priority ? "P" : "R";
        }

        static /* synthetic */ Handled access$100() {
            return regular();
        }

        static /* synthetic */ Handled access$200() {
            return priority();
        }
    }

    /* loaded from: input_file:org/axonframework/integrationtests/axonserverconnector/MessagePriorityIntegrationTest$PriorityMessage.class */
    private static class PriorityMessage {
        private final String text;

        public PriorityMessage(@JsonProperty("text") String str) {
            this.text = str;
        }

        public String getText() {
            return this.text;
        }
    }

    /* loaded from: input_file:org/axonframework/integrationtests/axonserverconnector/MessagePriorityIntegrationTest$RegularMessage.class */
    private static class RegularMessage {
        private final String text;

        public RegularMessage(@JsonProperty("text") String str) {
            this.text = str;
        }

        public String getText() {
            return this.text;
        }
    }

    MessagePriorityIntegrationTest() {
    }

    @BeforeEach
    void setUp() {
        JacksonSerializer defaultSerializer = JacksonSerializer.defaultSerializer();
        AxonServerConfiguration build = AxonServerConfiguration.builder().componentName("messagePriority").servers(axonServer.getHost() + ":" + axonServer.getGrpcPort()).build();
        build.setCommandThreads(1);
        build.setQueryThreads(1);
        this.connectionManager = AxonServerConnectionManager.builder().axonServerConfiguration(build).channelCustomizer((v0) -> {
            return v0.directExecutor();
        }).build();
        this.connectionManager.start();
        this.commandBus = AxonServerCommandBus.builder().axonServerConnectionManager(this.connectionManager).configuration(build).localSegment(SimpleCommandBus.builder().build()).serializer(defaultSerializer).routingStrategy(AnnotationRoutingStrategy.defaultStrategy()).priorityCalculator(commandMessage -> {
            return Objects.equals(commandMessage.getPayloadType(), PriorityMessage.class) ? PRIORITY : REGULAR;
        }).build();
        this.commandBus.start();
        QueryPriorityCalculator queryPriorityCalculator = queryMessage -> {
            return Objects.equals(queryMessage.getPayloadType(), PriorityMessage.class) ? PRIORITY : REGULAR;
        };
        SimpleQueryBus build2 = SimpleQueryBus.builder().build();
        this.queryBus = AxonServerQueryBus.builder().axonServerConnectionManager(this.connectionManager).configuration(build).localSegment(build2).updateEmitter(build2.queryUpdateEmitter()).messageSerializer(defaultSerializer).genericSerializer(defaultSerializer).priorityCalculator(queryPriorityCalculator).build();
        this.queryBus.start();
    }

    @AfterEach
    void tearDown() {
        this.commandBus.shutdownDispatching();
        this.queryBus.shutdownDispatching();
        this.commandBus.disconnect();
        this.queryBus.disconnect();
        this.connectionManager.shutdown();
    }

    @Test
    void commandPriorityIsRespectedWithinThresholdByAxonServerCommandBus() throws InterruptedException {
        int i = 10;
        int i2 = 10 - (10 / 5);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(10);
        this.commandBus.subscribe("processGate", commandMessage -> {
            countDownLatch.await();
            return "start-processing";
        });
        this.commandBus.subscribe("regular", commandMessage2 -> {
            return "regular";
        });
        this.commandBus.subscribe("priority", commandMessage3 -> {
            return "priority";
        });
        Thread thread = new Thread(() -> {
            this.commandBus.dispatch(new GenericCommandMessage(GenericCommandMessage.asCommandMessage("start"), "processGate"));
            for (int i3 = REGULAR; i3 < i; i3++) {
                this.commandBus.dispatch(i3 % 5 == 0 ? new GenericCommandMessage(GenericCommandMessage.asCommandMessage(new PriorityMessage(Integer.toString(i3))), "priority") : new GenericCommandMessage(GenericCommandMessage.asCommandMessage(new RegularMessage(Integer.toString(i3))), "regular"), (commandMessage4, commandResultMessage) -> {
                    if (commandResultMessage.getPayload().equals("regular")) {
                        concurrentLinkedQueue.add(Handled.access$100());
                    } else {
                        concurrentLinkedQueue.add(Handled.access$200());
                    }
                    countDownLatch2.countDown();
                });
            }
            countDownLatch.countDown();
        });
        thread.start();
        Assertions.assertTrue(countDownLatch2.await(2L, TimeUnit.SECONDS), () -> {
            return "Failed with [" + countDownLatch2.getCount() + "] unprocessed command(s).";
        });
        Assertions.assertEquals(10, concurrentLinkedQueue.size());
        for (int i3 = REGULAR; i3 < concurrentLinkedQueue.size(); i3++) {
            Handled handled = (Handled) concurrentLinkedQueue.poll();
            if (i3 >= i2) {
                Assertions.assertFalse(handled.priority, "A priority command was handled at index [" + i3 + "], while it is at least expected to come before [" + i2 + "].");
            }
        }
        thread.join(1000L);
    }

    @Test
    void queryPriorityIsRespectedWithinThresholdByAxonServerQueryBus() throws InterruptedException {
        int i = 10;
        int i2 = 10 - (10 / 5);
        ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(10);
        this.queryBus.subscribe("processGate", String.class, queryMessage -> {
            countDownLatch.await();
            return "start-processing";
        });
        this.queryBus.subscribe("regular", String.class, queryMessage2 -> {
            return "regular";
        });
        this.queryBus.subscribe("priority", String.class, queryMessage3 -> {
            return "priority";
        });
        Thread thread = new Thread(() -> {
            this.queryBus.query(new GenericQueryMessage("start", "processGate", ResponseTypes.instanceOf(String.class)));
            for (int i3 = REGULAR; i3 < i; i3++) {
                this.queryBus.query(i3 % 5 == 0 ? new GenericQueryMessage(new PriorityMessage(Integer.toString(i3)), "priority", ResponseTypes.instanceOf(String.class)) : new GenericQueryMessage(new RegularMessage(Integer.toString(i3)), "regular", ResponseTypes.instanceOf(String.class))).whenComplete((queryResponseMessage, th) -> {
                    if (((String) queryResponseMessage.getPayload()).equals("regular")) {
                        concurrentLinkedQueue.add(Handled.access$100());
                    } else {
                        concurrentLinkedQueue.add(Handled.access$200());
                    }
                    countDownLatch2.countDown();
                });
            }
            countDownLatch.countDown();
        });
        thread.start();
        Assertions.assertTrue(countDownLatch2.await(2L, TimeUnit.SECONDS), () -> {
            return "Failed with [" + countDownLatch2.getCount() + "] unprocessed query/queries";
        });
        Assertions.assertEquals(10, concurrentLinkedQueue.size());
        for (int i3 = REGULAR; i3 < concurrentLinkedQueue.size(); i3++) {
            Handled handled = (Handled) concurrentLinkedQueue.poll();
            if (i3 >= i2) {
                Assertions.assertFalse(handled.priority, "A priority command was handled at index [" + i3 + "], while it is at least expected to come before [" + i2 + "].");
            }
        }
        thread.join(1000L);
    }
}
