package org.axonframework.integrationtests.queryhandling;

import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.awaitility.Awaitility;
import org.axonframework.axonserver.connector.AxonServerConfiguration;
import org.axonframework.axonserver.connector.AxonServerConnectionManager;
import org.axonframework.axonserver.connector.query.AxonServerQueryBus;
import org.axonframework.messaging.responsetypes.ResponseTypes;
import org.axonframework.queryhandling.GenericQueryMessage;
import org.axonframework.queryhandling.QueryResponseMessage;
import org.axonframework.queryhandling.SimpleQueryBus;
import org.axonframework.serialization.json.JacksonSerializer;
import org.axonframework.test.server.AxonServerContainer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.images.PullPolicy;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;

@Testcontainers
/* loaded from: input_file:org/axonframework/integrationtests/queryhandling/QueryThreadingIntegrationTest.class */
class QueryThreadingIntegrationTest {
    private AxonServerConnectionManager connectionManager;
    private AxonServerQueryBus queryBus;
    private AxonServerQueryBus queryBus2;
    private static final Logger log = LoggerFactory.getLogger(QueryThreadingIntegrationTest.class);
    private static final AtomicBoolean secondaryQueryBlock = new AtomicBoolean(true);
    private static final AtomicInteger waitingQueries = new AtomicInteger(0);
    private static final String HOSTNAME = "localhost";

    @Container
    private static final AxonServerContainer axonServer = new AxonServerContainer().withAxonServerName("axonserver").withAxonServerHostname(HOSTNAME).withDevMode(true).withEnv("AXONIQ_AXONSERVER_INSTRUCTION-CACHE-TIMEOUT", "1000").withImagePullPolicy(PullPolicy.ageBased(Duration.ofDays(1))).withNetworkAliases(new String[]{"axonserver"});

    QueryThreadingIntegrationTest() {
    }

    @BeforeEach
    void setUp() {
        JacksonSerializer defaultSerializer = JacksonSerializer.defaultSerializer();
        AxonServerConfiguration build = AxonServerConfiguration.builder().componentName("threadingTest").servers(axonServer.getHost() + ":" + axonServer.getGrpcPort()).build();
        build.setCommandThreads(5);
        build.setQueryThreads(5);
        build.setQueryResponseThreads(5);
        this.connectionManager = AxonServerConnectionManager.builder().axonServerConfiguration(build).channelCustomizer((v0) -> {
            return v0.directExecutor();
        }).build();
        this.connectionManager.start();
        SimpleQueryBus build2 = SimpleQueryBus.builder().build();
        this.queryBus = AxonServerQueryBus.builder().axonServerConnectionManager(this.connectionManager).configuration(build).localSegment(build2).updateEmitter(build2.queryUpdateEmitter()).messageSerializer(defaultSerializer).genericSerializer(defaultSerializer).build();
        this.queryBus.start();
        this.queryBus2 = AxonServerQueryBus.builder().axonServerConnectionManager(this.connectionManager).configuration(build).localSegment(SimpleQueryBus.builder().build()).updateEmitter(build2.queryUpdateEmitter()).messageSerializer(defaultSerializer).genericSerializer(defaultSerializer).build();
        this.queryBus2.start();
        waitingQueries.set(0);
    }

    @AfterEach
    void tearDown() {
        this.queryBus.shutdownDispatching();
        this.queryBus.disconnect();
        this.queryBus2.shutdownDispatching();
        this.queryBus2.disconnect();
        this.connectionManager.shutdown();
    }

    @Test
    void canStillHandleQueryResponsesWhileManyQueriesHandling() throws InterruptedException {
        this.queryBus2.subscribe("query-b", String.class, queryMessage -> {
            while (secondaryQueryBlock.get()) {
                try {
                    Thread.sleep(10L);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
            return "b";
        });
        this.queryBus.subscribe("query-a", String.class, queryMessage2 -> {
            waitingQueries.incrementAndGet();
            QueryResponseMessage queryResponseMessage = (QueryResponseMessage) this.queryBus.query(new GenericQueryMessage("start", "query-b", ResponseTypes.instanceOf(String.class))).get();
            waitingQueries.decrementAndGet();
            return "a" + ((String) queryResponseMessage.getPayload());
        });
        CompletableFuture query = this.queryBus.query(new GenericQueryMessage("start", "query-a", ResponseTypes.instanceOf(String.class)));
        CompletableFuture query2 = this.queryBus.query(new GenericQueryMessage("start", "query-a", ResponseTypes.instanceOf(String.class)));
        CompletableFuture query3 = this.queryBus.query(new GenericQueryMessage("start", "query-a", ResponseTypes.instanceOf(String.class)));
        CompletableFuture query4 = this.queryBus.query(new GenericQueryMessage("start", "query-a", ResponseTypes.instanceOf(String.class)));
        CompletableFuture query5 = this.queryBus.query(new GenericQueryMessage("start", "query-a", ResponseTypes.instanceOf(String.class)));
        CompletableFuture query6 = this.queryBus.query(new GenericQueryMessage("start", "query-a", ResponseTypes.instanceOf(String.class)));
        Awaitility.await().pollDelay(500L, TimeUnit.MILLISECONDS).atMost(10L, TimeUnit.SECONDS).until(() -> {
            log.info("Waiting queries: {}", Integer.valueOf(waitingQueries.get()));
            return Boolean.valueOf(waitingQueries.get() == 5);
        });
        Assertions.assertFalse(query.isDone());
        Assertions.assertFalse(query2.isDone());
        Assertions.assertFalse(query3.isDone());
        Assertions.assertFalse(query4.isDone());
        Assertions.assertFalse(query5.isDone());
        Assertions.assertFalse(query6.isDone());
        secondaryQueryBlock.set(false);
        Awaitility.await().atMost(5L, TimeUnit.SECONDS).untilAsserted(() -> {
            Assertions.assertEquals(0, waitingQueries.get());
            Assertions.assertTrue(query.isDone());
            Assertions.assertTrue(query2.isDone());
            Assertions.assertTrue(query3.isDone());
            Assertions.assertTrue(query4.isDone());
            Assertions.assertTrue(query5.isDone());
            Assertions.assertTrue(query6.isDone());
        });
    }
}
