package org.axonframework.queryhandling;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.axonframework.common.ReflectionUtils;
import org.axonframework.common.Registration;
import org.axonframework.common.transaction.Transaction;
import org.axonframework.common.transaction.TransactionManager;
import org.axonframework.messaging.Message;
import org.axonframework.messaging.MessageHandler;
import org.axonframework.messaging.MessageType;
import org.axonframework.messaging.MetaData;
import org.axonframework.messaging.correlation.CorrelationDataProvider;
import org.axonframework.messaging.correlation.MessageOriginProvider;
import org.axonframework.messaging.interceptors.CorrelationDataInterceptor;
import org.axonframework.messaging.responsetypes.ResponseType;
import org.axonframework.messaging.responsetypes.ResponseTypes;
import org.axonframework.monitoring.MessageMonitor;
import org.axonframework.queryhandling.registration.DuplicateQueryHandlerResolution;
import org.axonframework.queryhandling.registration.DuplicateQueryHandlerSubscriptionException;
import org.axonframework.tracing.TestSpanFactory;
import org.axonframework.utils.MockException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/axonframework/queryhandling/SimpleQueryBusTest.class */
class SimpleQueryBusTest {
    private static final String TRACE_ID = "traceId";
    private static final String CORRELATION_ID = "correlationId";
    private final ResponseType<String> singleStringResponse = ResponseTypes.instanceOf(String.class);
    private final ResponseType<List<String>> multipleStringResponse = ResponseTypes.multipleInstancesOf(String.class);
    private SimpleQueryBus testSubject;
    private MessageMonitor<QueryMessage<?, ?>> messageMonitor;
    private QueryInvocationErrorHandler errorHandler;
    private MessageMonitor.MonitorCallback monitorCallback;
    private TestSpanFactory spanFactory;
    private QueryBusSpanFactory queryBusSpanFactory;
    private QueryUpdateEmitterSpanFactory queryUpdateEmitterSpanFactory;

    SimpleQueryBusTest() {
    }

    @BeforeEach
    void setUp() {
        this.spanFactory = new TestSpanFactory();
        this.queryBusSpanFactory = DefaultQueryBusSpanFactory.builder().spanFactory(this.spanFactory).build();
        this.queryUpdateEmitterSpanFactory = DefaultQueryUpdateEmitterSpanFactory.builder().spanFactory(this.spanFactory).build();
        this.messageMonitor = (MessageMonitor) Mockito.mock(MessageMonitor.class);
        this.errorHandler = (QueryInvocationErrorHandler) Mockito.mock(QueryInvocationErrorHandler.class);
        this.monitorCallback = (MessageMonitor.MonitorCallback) Mockito.mock(MessageMonitor.MonitorCallback.class);
        Mockito.when(this.messageMonitor.onMessageIngested((QueryMessage) Mockito.any())).thenReturn(this.monitorCallback);
        this.testSubject = SimpleQueryBus.builder().messageMonitor(this.messageMonitor).errorHandler(this.errorHandler).spanFactory(this.queryBusSpanFactory).queryUpdateEmitter(SimpleQueryUpdateEmitter.builder().spanFactory(this.queryUpdateEmitterSpanFactory).build()).duplicateQueryHandlerResolver(DuplicateQueryHandlerResolution.silentlyAdd()).build();
        this.testSubject.registerHandlerInterceptor(new CorrelationDataInterceptor(new CorrelationDataProvider[]{new MessageOriginProvider(CORRELATION_ID, TRACE_ID)}));
    }

    @Test
    public void handlerInterceptorThrowsException() throws ExecutionException, InterruptedException {
        this.testSubject.subscribe("test", String.class, queryMessage -> {
            return queryMessage.getPayload().toString();
        });
        this.testSubject.registerHandlerInterceptor((legacyUnitOfWork, interceptorChain) -> {
            throw new RuntimeException("Faking");
        });
        CompletableFuture query = this.testSubject.query(new GenericQueryMessage(new MessageType("test"), "hello", ResponseTypes.instanceOf(String.class)));
        Assertions.assertTrue(query.isDone());
        Assertions.assertTrue(((QueryResponseMessage) query.get()).isExceptional());
    }

    @Test
    void subscribe() {
        this.testSubject.subscribe("test", String.class, (v0) -> {
            return v0.getPayload();
        });
        Assertions.assertEquals(1, this.testSubject.getSubscriptions().size());
        Assertions.assertEquals(1, ((Collection) this.testSubject.getSubscriptions().values().iterator().next()).size());
        this.testSubject.subscribe("test", String.class, queryMessage -> {
            return "aa" + String.valueOf(queryMessage.getPayload());
        });
        Assertions.assertEquals(1, this.testSubject.getSubscriptions().size());
        Assertions.assertEquals(2, ((Collection) this.testSubject.getSubscriptions().values().iterator().next()).size());
        this.testSubject.subscribe("test2", String.class, queryMessage2 -> {
            return "aa" + String.valueOf(queryMessage2.getPayload());
        });
        Assertions.assertEquals(2, this.testSubject.getSubscriptions().size());
    }

    @Test
    void subscribingSameHandlerTwiceInvokedOnce() throws Exception {
        AtomicInteger atomicInteger = new AtomicInteger();
        MessageHandler messageHandler = queryMessage -> {
            atomicInteger.incrementAndGet();
            return "reply";
        };
        Registration subscribe = this.testSubject.subscribe("test", String.class, messageHandler);
        this.testSubject.subscribe("test", String.class, messageHandler);
        GenericQueryMessage genericQueryMessage = new GenericQueryMessage(new MessageType("test"), "request", this.singleStringResponse);
        Assertions.assertEquals("reply", (String) this.testSubject.query(genericQueryMessage).thenApply((v0) -> {
            return v0.getPayload();
        }).get());
        Assertions.assertEquals(1, atomicInteger.get());
        Assertions.assertTrue(subscribe.cancel());
        Assertions.assertTrue(this.testSubject.query(genericQueryMessage).isDone());
        Assertions.assertTrue(this.testSubject.query(genericQueryMessage).isCompletedExceptionally());
    }

    @Test
    void subscribingSameQueryTwiceWithThrowingDuplicateResolver() {
        this.testSubject = SimpleQueryBus.builder().messageMonitor(this.messageMonitor).errorHandler(this.errorHandler).duplicateQueryHandlerResolver(DuplicateQueryHandlerResolution.rejectDuplicates()).build();
        MessageHandler messageHandler = queryMessage -> {
            return "reply";
        };
        MessageHandler messageHandler2 = queryMessage2 -> {
            return "reply";
        };
        this.testSubject.subscribe("test", String.class, messageHandler);
        Assertions.assertThrows(DuplicateQueryHandlerSubscriptionException.class, () -> {
            this.testSubject.subscribe("test", String.class, messageHandler2);
        });
    }

    @Test
    void queryResultContainsCorrelationData() throws Exception {
        this.testSubject.subscribe(String.class.getName(), String.class, queryMessage -> {
            return String.valueOf(queryMessage.getPayload()) + "1234";
        });
        QueryMessage andMetaData = new GenericQueryMessage(new MessageType(String.class), "hello", this.singleStringResponse).andMetaData(Collections.singletonMap(TRACE_ID, "fakeTraceId"));
        CompletableFuture query = this.testSubject.query(andMetaData);
        Assertions.assertTrue(query.isDone(), "SimpleQueryBus should resolve CompletableFutures directly");
        Assertions.assertEquals("hello1234", ((QueryResponseMessage) query.get()).getPayload());
        Assertions.assertEquals(MetaData.with(CORRELATION_ID, andMetaData.getIdentifier()).and(TRACE_ID, "fakeTraceId"), ((QueryResponseMessage) query.get()).getMetaData());
    }

    @Test
    void nullResponseProperlyReturned() throws ExecutionException, InterruptedException {
        this.testSubject.subscribe(String.class.getName(), String.class, queryMessage -> {
            return null;
        });
        QueryMessage andMetaData = new GenericQueryMessage(new MessageType(String.class), "hello", this.singleStringResponse).andMetaData(Collections.singletonMap(TRACE_ID, "fakeTraceId"));
        CompletableFuture query = this.testSubject.query(andMetaData);
        Assertions.assertTrue(query.isDone(), "SimpleQueryBus should resolve CompletableFutures directly");
        Assertions.assertNull(((QueryResponseMessage) query.get()).getPayload());
        Assertions.assertEquals(String.class, ((QueryResponseMessage) query.get()).getPayloadType());
        Assertions.assertEquals(MetaData.with(CORRELATION_ID, andMetaData.getIdentifier()).and(TRACE_ID, "fakeTraceId"), ((QueryResponseMessage) query.get()).getMetaData());
    }

    @Test
    void queryWithTransaction() throws Exception {
        TransactionManager transactionManager = (TransactionManager) Mockito.mock(TransactionManager.class);
        Transaction transaction = (Transaction) Mockito.mock(Transaction.class);
        Mockito.when(transactionManager.startTransaction()).thenReturn(transaction);
        this.testSubject = SimpleQueryBus.builder().transactionManager(transactionManager).build();
        this.testSubject.subscribe(String.class.getName(), ReflectionUtils.methodOf(getClass(), "stringListQueryHandler", new Class[0]).getGenericReturnType(), queryMessage -> {
            return Arrays.asList(String.valueOf(queryMessage.getPayload()) + "1234", String.valueOf(queryMessage.getPayload()) + "567");
        });
        CompletableFuture thenApply = this.testSubject.query(new GenericQueryMessage(new MessageType(String.class), "hello", ResponseTypes.multipleInstancesOf(String.class))).thenApply((v0) -> {
            return v0.getPayload();
        });
        Assertions.assertTrue(thenApply.isDone());
        List list = (List) thenApply.get();
        Assertions.assertTrue(list.contains("hello1234"));
        Assertions.assertTrue(list.contains("hello567"));
        ((TransactionManager) Mockito.verify(transactionManager)).startTransaction();
        ((Transaction) Mockito.verify(transaction)).commit();
    }

    public List<String> stringListQueryHandler() {
        return new ArrayList();
    }

    @Test
    void querySingleWithTransaction() throws Exception {
        TransactionManager transactionManager = (TransactionManager) Mockito.mock(TransactionManager.class);
        Transaction transaction = (Transaction) Mockito.mock(Transaction.class);
        Mockito.when(transactionManager.startTransaction()).thenReturn(transaction);
        this.testSubject = SimpleQueryBus.builder().transactionManager(transactionManager).build();
        this.testSubject.subscribe(String.class.getName(), String.class, queryMessage -> {
            return String.valueOf(queryMessage.getPayload()) + "1234";
        });
        Assertions.assertEquals("hello1234", this.testSubject.query(new GenericQueryMessage(new MessageType(String.class), "hello", this.singleStringResponse)).thenApply((v0) -> {
            return v0.getPayload();
        }).get());
        ((TransactionManager) Mockito.verify(transactionManager)).startTransaction();
        ((Transaction) Mockito.verify(transaction)).commit();
    }

    @Test
    void querySingleIsTraced() throws ExecutionException, InterruptedException {
        Message<?> genericQueryMessage = new GenericQueryMessage<>(new MessageType(String.class), "hello", this.singleStringResponse);
        this.testSubject.subscribe(String.class.getName(), String.class, queryMessage -> {
            this.spanFactory.verifySpanActive("QueryBus.query", genericQueryMessage);
            return String.valueOf(queryMessage.getPayload()) + "1234";
        });
        this.testSubject.query(genericQueryMessage).get();
        this.spanFactory.verifySpanCompleted("QueryBus.query", genericQueryMessage);
    }

    @Test
    void ScatterGatherIsTraced() {
        Message<?> genericQueryMessage = new GenericQueryMessage<>(new MessageType(String.class), "hello", this.multipleStringResponse);
        this.testSubject.subscribe(String.class.getName(), String.class, queryMessage -> {
            this.spanFactory.verifySpanActive("QueryBus.scatterGatherQuery", genericQueryMessage);
            this.spanFactory.verifySpanActive("QueryBus.scatterGatherQuery-0");
            return String.valueOf(queryMessage.getPayload()) + "1234";
        });
        this.testSubject.subscribe(String.class.getName(), String.class, queryMessage2 -> {
            this.spanFactory.verifySpanActive("QueryBus.scatterGatherQuery", genericQueryMessage);
            this.spanFactory.verifySpanActive("QueryBus.scatterGatherQuery-1");
            return String.valueOf(queryMessage2.getPayload()) + "12345678";
        });
        this.testSubject.scatterGather(genericQueryMessage, 500L, TimeUnit.MILLISECONDS).toList();
        this.spanFactory.verifySpanCompleted("QueryBus.scatterGatherQuery", genericQueryMessage);
        this.spanFactory.verifySpanCompleted("QueryBus.scatterGatherHandler-0");
        this.spanFactory.verifySpanCompleted("QueryBus.scatterGatherHandler-1");
    }

    @Test
    void queryListWithSingleHandlerReturnsSingleAsList() throws Exception {
        this.testSubject.subscribe(String.class.getName(), String.class, queryMessage -> {
            return String.valueOf(queryMessage.getPayload()) + "1234";
        });
        CompletableFuture thenApply = this.testSubject.query(new GenericQueryMessage(new MessageType(String.class), "hello", this.multipleStringResponse)).thenApply((v0) -> {
            return v0.getPayload();
        });
        Assertions.assertEquals(1, ((List) thenApply.get()).size());
        Assertions.assertEquals("hello1234", ((List) thenApply.get()).getFirst());
    }

    @Test
    void queryListWithBothSingleHandlerAndListHandlerReturnsListResult() throws Exception {
        this.testSubject.subscribe(String.class.getName(), String.class, queryMessage -> {
            return String.valueOf(queryMessage.getPayload()) + "1234";
        });
        this.testSubject.subscribe(String.class.getName(), String[].class, queryMessage2 -> {
            return Arrays.asList(String.valueOf(queryMessage2.getPayload()) + "1234", String.valueOf(queryMessage2.getPayload()) + "5678");
        });
        CompletableFuture thenApply = this.testSubject.query(new GenericQueryMessage(new MessageType(String.class), "hello", this.multipleStringResponse)).thenApply((v0) -> {
            return v0.getPayload();
        });
        Assertions.assertEquals(2, ((List) thenApply.get()).size());
        Assertions.assertEquals("hello1234", ((List) thenApply.get()).get(0));
        Assertions.assertEquals("hello5678", ((List) thenApply.get()).get(1));
    }

    @Test
    void queryForSingleResultWithUnsuitableHandlers() throws Exception {
        AtomicInteger atomicInteger = new AtomicInteger();
        MessageHandler messageHandler = queryMessage -> {
            atomicInteger.incrementAndGet();
            throw new NoHandlerForQueryException("Mock");
        };
        MessageHandler messageHandler2 = queryMessage2 -> {
            atomicInteger.incrementAndGet();
            return "reply";
        };
        this.testSubject.subscribe("query", String.class, messageHandler);
        this.testSubject.subscribe("query", String.class, queryMessage3 -> {
            return messageHandler.handleSync(queryMessage3);
        });
        this.testSubject.subscribe("query", String.class, messageHandler2);
        CompletableFuture thenApply = this.testSubject.query(new GenericQueryMessage(new MessageType("query"), "query", this.singleStringResponse)).thenApply((v0) -> {
            return v0.getPayload();
        });
        Assertions.assertTrue(thenApply.isDone());
        Assertions.assertEquals("reply", thenApply.get());
        Assertions.assertEquals(3, atomicInteger.get());
    }

    @Test
    void queryWithOnlyUnsuitableResultsInException() throws Exception {
        this.testSubject.subscribe("query", String.class, queryMessage -> {
            throw new NoHandlerForQueryException("Mock");
        });
        CompletableFuture query = this.testSubject.query(new GenericQueryMessage(new MessageType(String.class), "query", this.singleStringResponse));
        Assertions.assertTrue(query.isDone());
        Assertions.assertTrue(query.isCompletedExceptionally());
        Assertions.assertEquals("NoHandlerForQueryException", query.thenApply((v0) -> {
            return v0.getPayload();
        }).exceptionally(th -> {
            return th.getCause().getClass().getSimpleName();
        }).get());
    }

    @Test
    void queryReturnsResponseMessageFromHandlerAsIs() throws Exception {
        GenericQueryResponseMessage genericQueryResponseMessage = new GenericQueryResponseMessage(new MessageType(String.class), "soleResult");
        this.testSubject.subscribe("query", String.class, queryMessage -> {
            return genericQueryResponseMessage;
        });
        CompletableFuture query = this.testSubject.query(new GenericQueryMessage(new MessageType("query"), "query", this.singleStringResponse));
        Assertions.assertTrue(query.isDone());
        Assertions.assertSame(query.get(), genericQueryResponseMessage);
    }

    @Test
    void queryWithHandlersResultsInException() throws Exception {
        CompletableFuture query = this.testSubject.query(new GenericQueryMessage(new MessageType("query"), "query", this.singleStringResponse));
        Assertions.assertTrue(query.isDone());
        Assertions.assertTrue(query.isCompletedExceptionally());
        Assertions.assertEquals("NoHandlerForQueryException", query.thenApply((v0) -> {
            return v0.getPayload();
        }).exceptionally(th -> {
            return th.getCause().getClass().getSimpleName();
        }).get());
    }

    @Test
    void queryForSingleResultWillReportErrors() throws Exception {
        this.testSubject.subscribe("query", String.class, queryMessage -> {
            throw new MockException("Mock");
        });
        CompletableFuture query = this.testSubject.query(new GenericQueryMessage(new MessageType("query"), "query", this.singleStringResponse));
        Assertions.assertTrue(query.isDone());
        Assertions.assertFalse(query.isCompletedExceptionally());
        QueryResponseMessage queryResponseMessage = (QueryResponseMessage) query.get();
        Assertions.assertTrue(queryResponseMessage.isExceptional());
        Assertions.assertEquals("Mock", queryResponseMessage.exceptionResult().getMessage());
    }

    @Test
    void queryWithInterceptors() throws Exception {
        this.testSubject.registerDispatchInterceptor(list -> {
            return (num, queryMessage) -> {
                return queryMessage.andMetaData(Collections.singletonMap("key", "value"));
            };
        });
        this.testSubject.registerHandlerInterceptor((legacyUnitOfWork, interceptorChain) -> {
            return legacyUnitOfWork.getMessage().getMetaData().containsKey("key") ? "fakeReply" : interceptorChain.proceedSync();
        });
        this.testSubject.subscribe(String.class.getName(), String.class, queryMessage -> {
            return String.valueOf(queryMessage.getPayload()) + "1234";
        });
        Assertions.assertEquals("fakeReply", this.testSubject.query(new GenericQueryMessage(new MessageType(String.class), "hello", this.singleStringResponse)).thenApply((v0) -> {
            return v0.getPayload();
        }).get());
    }

    @Test
    void queryDoesNotArriveAtUnsubscribedHandler() throws Exception {
        this.testSubject.subscribe(String.class.getName(), String.class, queryMessage -> {
            return "1234";
        });
        this.testSubject.subscribe(String.class.getName(), String.class, queryMessage2 -> {
            return String.valueOf(queryMessage2.getPayload()) + " is not here!";
        }).cancel();
        Assertions.assertEquals("1234", this.testSubject.query(new GenericQueryMessage(new MessageType(String.class), "hello", this.singleStringResponse)).thenApply((v0) -> {
            return v0.getPayload();
        }).get());
    }

    @Test
    void queryReturnsException() throws Exception {
        MockException mockException = new MockException();
        this.testSubject.subscribe(String.class.getName(), String.class, queryMessage -> {
            throw mockException;
        });
        CompletableFuture query = this.testSubject.query(new GenericQueryMessage(new MessageType(String.class), "hello", this.singleStringResponse));
        Assertions.assertTrue(query.isDone());
        Assertions.assertFalse(query.isCompletedExceptionally());
        QueryResponseMessage queryResponseMessage = (QueryResponseMessage) query.get();
        Assertions.assertTrue(queryResponseMessage.isExceptional());
        Assertions.assertEquals(mockException, queryResponseMessage.exceptionResult());
    }

    @Test
    void queryUnknown() throws Exception {
        try {
            this.testSubject.query(new GenericQueryMessage(new MessageType(String.class), "hello", this.singleStringResponse)).get();
            Assertions.fail("Expected exception");
        } catch (ExecutionException e) {
            Assertions.assertEquals(NoHandlerForQueryException.class, e.getCause().getClass());
        }
        this.spanFactory.verifySpanHasException("QueryBus.query", NoHandlerForQueryException.class);
    }

    @Test
    void queryUnsubscribedHandlers() throws Exception {
        this.testSubject.subscribe(String.class.getName(), String.class, queryMessage -> {
            return String.valueOf(queryMessage.getPayload()) + " is not here!";
        }).cancel();
        this.testSubject.subscribe(String.class.getName(), String.class, queryMessage2 -> {
            return String.valueOf(queryMessage2.getPayload()) + " is not here!";
        }).cancel();
        try {
            this.testSubject.query(new GenericQueryMessage(new MessageType(String.class), "hello", this.singleStringResponse)).get();
            Assertions.fail("Expected exception");
        } catch (ExecutionException e) {
            Assertions.assertEquals(NoHandlerForQueryException.class, e.getCause().getClass());
        }
        ((MessageMonitor) Mockito.verify(this.messageMonitor, Mockito.times(1))).onMessageIngested((QueryMessage) Mockito.any());
        ((MessageMonitor.MonitorCallback) Mockito.verify(this.monitorCallback, Mockito.times(1))).reportFailure((Throwable) Mockito.any());
    }

    @Test
    void scatterGather() {
        this.testSubject.subscribe(String.class.getName(), String.class, queryMessage -> {
            return String.valueOf(queryMessage.getPayload()) + "1234";
        });
        this.testSubject.subscribe(String.class.getName(), String.class, queryMessage2 -> {
            return String.valueOf(queryMessage2.getPayload()) + "5678";
        });
        this.testSubject.subscribe(String.class.getName(), String.class, queryMessage3 -> {
            return String.valueOf(queryMessage3.getPayload()) + "90";
        });
        Set set = (Set) this.testSubject.scatterGather(new GenericQueryMessage(new MessageType(String.class), "Hello, World", this.singleStringResponse), 0L, TimeUnit.SECONDS).collect(Collectors.toSet());
        Assertions.assertEquals(3, set.size());
        Assertions.assertEquals(3, ((Set) set.stream().map((v0) -> {
            return v0.getPayload();
        }).collect(Collectors.toSet())).size());
        ((MessageMonitor) Mockito.verify(this.messageMonitor, Mockito.times(1))).onMessageIngested((QueryMessage) Mockito.any());
        ((MessageMonitor.MonitorCallback) Mockito.verify(this.monitorCallback, Mockito.times(3))).reportSuccess();
    }

    @Test
    void scatterGatherOnArrayQueryHandlers() throws NoSuchMethodException {
        this.testSubject.subscribe(String.class.getName(), ReflectionUtils.methodOf(getClass(), "stringArrayQueryHandler", new Class[0]).getGenericReturnType(), queryMessage -> {
            return new String[]{String.valueOf(queryMessage.getPayload()) + "12", String.valueOf(queryMessage.getPayload()) + "34"};
        });
        this.testSubject.subscribe(String.class.getName(), ReflectionUtils.methodOf(getClass(), "stringArrayQueryHandler", new Class[0]).getGenericReturnType(), queryMessage2 -> {
            return new String[]{String.valueOf(queryMessage2.getPayload()) + "56", String.valueOf(queryMessage2.getPayload()) + "78"};
        });
        this.testSubject.subscribe(String.class.getName(), ReflectionUtils.methodOf(getClass(), "stringArrayQueryHandler", new Class[0]).getGenericReturnType(), queryMessage3 -> {
            return new String[]{String.valueOf(queryMessage3.getPayload()) + "9", String.valueOf(queryMessage3.getPayload()) + "0"};
        });
        Set set = (Set) this.testSubject.scatterGather(new GenericQueryMessage(new MessageType(String.class), "Hello, World", ResponseTypes.multipleInstancesOf(String.class)), 0L, TimeUnit.SECONDS).collect(Collectors.toSet());
        Assertions.assertEquals(3, set.size());
        Assertions.assertEquals(6, ((Set) set.stream().map((v0) -> {
            return v0.getPayload();
        }).flatMap((v0) -> {
            return v0.stream();
        }).collect(Collectors.toSet())).size());
        ((MessageMonitor) Mockito.verify(this.messageMonitor, Mockito.times(1))).onMessageIngested((QueryMessage) Mockito.any());
        ((MessageMonitor.MonitorCallback) Mockito.verify(this.monitorCallback, Mockito.times(3))).reportSuccess();
    }

    public String[] stringArrayQueryHandler() {
        return new String[0];
    }

    @Test
    void scatterGatherWithTransaction() {
        TransactionManager transactionManager = (TransactionManager) Mockito.mock(TransactionManager.class);
        Transaction transaction = (Transaction) Mockito.mock(Transaction.class);
        Mockito.when(transactionManager.startTransaction()).thenReturn(transaction);
        this.testSubject = SimpleQueryBus.builder().messageMonitor(this.messageMonitor).transactionManager(transactionManager).errorHandler(this.errorHandler).duplicateQueryHandlerResolver(DuplicateQueryHandlerResolution.silentlyAdd()).build();
        this.testSubject.subscribe(String.class.getName(), String.class, queryMessage -> {
            return String.valueOf(queryMessage.getPayload()) + "1234";
        });
        this.testSubject.subscribe(String.class.getName(), String.class, queryMessage2 -> {
            return String.valueOf(queryMessage2.getPayload()) + "567";
        });
        Assertions.assertEquals(2, ((Set) this.testSubject.scatterGather(new GenericQueryMessage(new MessageType(String.class), "Hello, World", this.singleStringResponse), 0L, TimeUnit.SECONDS).collect(Collectors.toSet())).size());
        ((MessageMonitor) Mockito.verify(this.messageMonitor, Mockito.times(1))).onMessageIngested((QueryMessage) Mockito.any());
        ((MessageMonitor.MonitorCallback) Mockito.verify(this.monitorCallback, Mockito.times(2))).reportSuccess();
        ((TransactionManager) Mockito.verify(transactionManager, Mockito.times(2))).startTransaction();
        ((Transaction) Mockito.verify(transaction, Mockito.times(2))).commit();
    }

    @Test
    void scatterGatherWithTransactionRollsBackOnFailure() {
        TransactionManager transactionManager = (TransactionManager) Mockito.mock(TransactionManager.class);
        Transaction transaction = (Transaction) Mockito.mock(Transaction.class);
        Mockito.when(transactionManager.startTransaction()).thenReturn(transaction);
        this.testSubject = SimpleQueryBus.builder().messageMonitor(this.messageMonitor).transactionManager(transactionManager).errorHandler(this.errorHandler).duplicateQueryHandlerResolver(DuplicateQueryHandlerResolution.silentlyAdd()).build();
        this.testSubject.subscribe(String.class.getName(), String.class, queryMessage -> {
            return String.valueOf(queryMessage.getPayload()) + "1234";
        });
        this.testSubject.subscribe(String.class.getName(), String.class, queryMessage2 -> {
            throw new MockException();
        });
        Assertions.assertEquals(1, ((Set) this.testSubject.scatterGather(new GenericQueryMessage(new MessageType(String.class), "Hello, World", this.singleStringResponse), 0L, TimeUnit.SECONDS).collect(Collectors.toSet())).size());
        ((MessageMonitor) Mockito.verify(this.messageMonitor, Mockito.times(1))).onMessageIngested((QueryMessage) Mockito.any());
        ((MessageMonitor.MonitorCallback) Mockito.verify(this.monitorCallback, Mockito.times(1))).reportSuccess();
        ((MessageMonitor.MonitorCallback) Mockito.verify(this.monitorCallback, Mockito.times(1))).reportFailure((Throwable) Mockito.isA(MockException.class));
        ((TransactionManager) Mockito.verify(transactionManager, Mockito.times(2))).startTransaction();
        ((Transaction) Mockito.verify(transaction, Mockito.times(1))).commit();
        ((Transaction) Mockito.verify(transaction, Mockito.times(1))).rollback();
    }

    @Test
    void queryFirstFromScatterGatherWillCommitUnitOfWork() {
        TransactionManager transactionManager = (TransactionManager) Mockito.mock(TransactionManager.class);
        Transaction transaction = (Transaction) Mockito.mock(Transaction.class);
        Mockito.when(transactionManager.startTransaction()).thenReturn(transaction);
        this.testSubject = SimpleQueryBus.builder().messageMonitor(this.messageMonitor).transactionManager(transactionManager).errorHandler(this.errorHandler).duplicateQueryHandlerResolver(DuplicateQueryHandlerResolution.silentlyAdd()).build();
        this.testSubject.subscribe(String.class.getName(), String.class, queryMessage -> {
            return String.valueOf(queryMessage.getPayload()) + "1234";
        });
        this.testSubject.subscribe(String.class.getName(), String.class, queryMessage2 -> {
            return String.valueOf(queryMessage2.getPayload()) + "567";
        });
        Assertions.assertTrue(this.testSubject.scatterGather(new GenericQueryMessage(new MessageType(String.class), "Hello, World", this.singleStringResponse), 0L, TimeUnit.SECONDS).findFirst().isPresent());
        ((MessageMonitor) Mockito.verify(this.messageMonitor, Mockito.times(1))).onMessageIngested((QueryMessage) Mockito.any());
        ((MessageMonitor.MonitorCallback) Mockito.verify(this.monitorCallback, Mockito.atMost(2))).reportSuccess();
        ((TransactionManager) Mockito.verify(transactionManager)).startTransaction();
        ((Transaction) Mockito.verify(transaction)).commit();
    }

    @Test
    void scatterGatherWithInterceptors() {
        this.testSubject.registerDispatchInterceptor(list -> {
            return (num, queryMessage) -> {
                return queryMessage.andMetaData(Collections.singletonMap("key", "value"));
            };
        });
        this.testSubject.registerHandlerInterceptor((legacyUnitOfWork, interceptorChain) -> {
            return legacyUnitOfWork.getMessage().getMetaData().containsKey("key") ? "fakeReply" : interceptorChain.proceedSync();
        });
        this.testSubject.subscribe(String.class.getName(), String.class, queryMessage -> {
            return String.valueOf(queryMessage.getPayload()) + "1234";
        });
        this.testSubject.subscribe(String.class.getName(), String.class, queryMessage2 -> {
            return String.valueOf(queryMessage2.getPayload()) + "567";
        });
        List list2 = (List) this.testSubject.scatterGather(new GenericQueryMessage(new MessageType(String.class), "Hello, World", this.singleStringResponse), 0L, TimeUnit.SECONDS).map((v0) -> {
            return v0.getPayload();
        }).collect(Collectors.toList());
        Assertions.assertEquals(2, list2.size());
        ((MessageMonitor) Mockito.verify(this.messageMonitor, Mockito.times(1))).onMessageIngested((QueryMessage) Mockito.any());
        ((MessageMonitor.MonitorCallback) Mockito.verify(this.monitorCallback, Mockito.times(2))).reportSuccess();
        Assertions.assertEquals(Arrays.asList("fakeReply", "fakeReply"), list2);
    }

    @Test
    void scatterGatherReturnsEmptyStreamWhenNoHandlersAvailable() {
        Assertions.assertEquals(0, ((Set) this.testSubject.scatterGather(new GenericQueryMessage(new MessageType(String.class), "Hello, World", this.singleStringResponse), 0L, TimeUnit.SECONDS).collect(Collectors.toSet())).size());
        ((MessageMonitor) Mockito.verify(this.messageMonitor)).onMessageIngested((QueryMessage) Mockito.any());
        ((MessageMonitor.MonitorCallback) Mockito.verify(this.monitorCallback)).reportIgnored();
    }

    @Test
    void scatterGatherReportsExceptionsWithErrorHandler() {
        this.testSubject.subscribe(String.class.getName(), String.class, queryMessage -> {
            return String.valueOf(queryMessage.getPayload()) + "1234";
        });
        this.testSubject.subscribe(String.class.getName(), String.class, queryMessage2 -> {
            throw new MockException();
        });
        GenericQueryMessage genericQueryMessage = new GenericQueryMessage(new MessageType(String.class), "Hello, World", this.singleStringResponse);
        Assertions.assertEquals(1, ((Set) this.testSubject.scatterGather(genericQueryMessage, 0L, TimeUnit.SECONDS).collect(Collectors.toSet())).size());
        ((QueryInvocationErrorHandler) Mockito.verify(this.errorHandler)).onError((Throwable) Mockito.isA(MockException.class), (QueryMessage) Mockito.eq(genericQueryMessage), (MessageHandler) Mockito.isA(MessageHandler.class));
        ((MessageMonitor) Mockito.verify(this.messageMonitor, Mockito.times(1))).onMessageIngested((QueryMessage) Mockito.any());
        ((MessageMonitor.MonitorCallback) Mockito.verify(this.monitorCallback, Mockito.times(1))).reportSuccess();
        ((MessageMonitor.MonitorCallback) Mockito.verify(this.monitorCallback, Mockito.times(1))).reportFailure((Throwable) Mockito.isA(MockException.class));
    }

    @Test
    void queryResponseMessageCorrelationData() throws ExecutionException, InterruptedException {
        this.testSubject.subscribe(String.class.getName(), String.class, queryMessage -> {
            return String.valueOf(queryMessage.getPayload()) + "1234";
        });
        this.testSubject.registerHandlerInterceptor(new CorrelationDataInterceptor(new CorrelationDataProvider[]{new MessageOriginProvider()}));
        GenericQueryMessage genericQueryMessage = new GenericQueryMessage(new MessageType(String.class), "Hello, World", this.singleStringResponse);
        QueryResponseMessage queryResponseMessage = (QueryResponseMessage) this.testSubject.query(genericQueryMessage).get();
        Assertions.assertEquals(genericQueryMessage.getIdentifier(), queryResponseMessage.getMetaData().get(TRACE_ID));
        Assertions.assertEquals(genericQueryMessage.getIdentifier(), queryResponseMessage.getMetaData().get(CORRELATION_ID));
        Assertions.assertEquals("Hello, World1234", queryResponseMessage.getPayload());
    }

    @Test
    void subscriptionQueryReportsExceptionInInitialResult() {
        this.testSubject.subscribe(String.class.getName(), String.class, queryMessage -> {
            throw new MockException();
        });
        Mono initialResult = this.testSubject.subscriptionQuery(new GenericSubscriptionQueryMessage(new MessageType(String.class), "test", ResponseTypes.instanceOf(String.class), ResponseTypes.instanceOf(String.class))).initialResult();
        Mono map = initialResult.map(queryResponseMessage -> {
            return false;
        });
        Class<MockException> cls = MockException.class;
        Objects.requireNonNull(MockException.class);
        Assertions.assertFalse(((Boolean) map.onErrorReturn((v1) -> {
            return r1.isInstance(v1);
        }, true).block()).booleanValue(), "Exception by handler should be reported in result, not on Mono");
        Assertions.assertTrue(((QueryResponseMessage) initialResult.block()).isExceptional());
    }

    @Test
    void subscriptionQueryIncreasingProjection() throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        CountDownLatch countDownLatch3 = new CountDownLatch(1);
        AtomicLong atomicLong = new AtomicLong();
        this.testSubject.subscribe("queryName", Long.class, queryMessage -> {
            return Long.valueOf(atomicLong.get());
        });
        QueryUpdateEmitter queryUpdateEmitter = this.testSubject.queryUpdateEmitter();
        Disposable subscribe = Flux.interval(Duration.ofMillis(0L), Duration.ofMillis(3L)).doOnNext(l -> {
            if (l.longValue() == 10) {
                countDownLatch.countDown();
            }
            if (l.longValue() == 100) {
                countDownLatch2.countDown();
            }
            if (l.longValue() == 1000) {
                countDownLatch3.countDown();
            }
            atomicLong.set(l.longValue());
            queryUpdateEmitter.emit(subscriptionQueryMessage -> {
                return "queryName".equals(subscriptionQueryMessage.type().name());
            }, l);
        }).doOnComplete(() -> {
            queryUpdateEmitter.complete(subscriptionQueryMessage -> {
                return "queryName".equals(subscriptionQueryMessage.type().name());
            });
        }).subscribe();
        SubscriptionQueryResult subscriptionQuery = this.testSubject.subscriptionQuery(new GenericSubscriptionQueryMessage(new MessageType("queryName"), "test", ResponseTypes.instanceOf(Long.class), ResponseTypes.instanceOf(Long.class)));
        Mono initialResult = subscriptionQuery.initialResult();
        countDownLatch.await();
        Long l2 = (Long) ((QueryResponseMessage) Objects.requireNonNull((QueryResponseMessage) initialResult.block())).getPayload();
        countDownLatch2.await();
        Long l3 = (Long) ((SubscriptionQueryUpdateMessage) Objects.requireNonNull((SubscriptionQueryUpdateMessage) subscriptionQuery.updates().next().block())).getPayload();
        countDownLatch3.await();
        Long l4 = (Long) ((QueryResponseMessage) Objects.requireNonNull((QueryResponseMessage) initialResult.block())).getPayload();
        Assertions.assertTrue(l3.longValue() <= l2.longValue() + 1);
        Assertions.assertTrue(l2.longValue() <= l4.longValue());
        subscribe.dispose();
    }

    @Test
    void subscriptionQueryIsTraced() throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(2);
        AtomicLong atomicLong = new AtomicLong();
        this.testSubject.subscribe("queryName", Long.class, queryMessage -> {
            return Long.valueOf(atomicLong.get());
        });
        QueryUpdateEmitter queryUpdateEmitter = this.testSubject.queryUpdateEmitter();
        Disposable subscribe = Flux.interval(Duration.ofMillis(0L), Duration.ofMillis(20L)).doOnNext(l -> {
            countDownLatch.countDown();
            queryUpdateEmitter.emit(subscriptionQueryMessage -> {
                return "queryName".equals(subscriptionQueryMessage.type().name());
            }, l);
        }).doOnComplete(() -> {
            queryUpdateEmitter.complete(subscriptionQueryMessage -> {
                return "queryName".equals(subscriptionQueryMessage.type().name());
            });
        }).subscribe();
        try {
            SubscriptionQueryResult subscriptionQuery = this.testSubject.subscriptionQuery(new GenericSubscriptionQueryMessage(new MessageType("queryName"), "test", ResponseTypes.instanceOf(Long.class), ResponseTypes.instanceOf(Long.class)));
            ((QueryResponseMessage) Objects.requireNonNull((QueryResponseMessage) subscriptionQuery.initialResult().block())).getPayload();
            this.spanFactory.verifySpanCompleted("QueryBus.query");
            countDownLatch.await();
            ((SubscriptionQueryUpdateMessage) Objects.requireNonNull((SubscriptionQueryUpdateMessage) subscriptionQuery.updates().next().block())).getPayload();
            this.spanFactory.verifySpanCompleted("QueryUpdateEmitter.emitQueryUpdateMessage");
            subscribe.dispose();
        } catch (Throwable th) {
            subscribe.dispose();
            throw th;
        }
    }

    @Test
    void queryReportsExceptionInResponseMessage() throws ExecutionException, InterruptedException {
        this.testSubject.subscribe(String.class.getName(), String.class, queryMessage -> {
            throw new MockException();
        });
        CompletableFuture query = this.testSubject.query(new GenericQueryMessage(new MessageType(String.class), "test", ResponseTypes.instanceOf(String.class)));
        CompletableFuture thenApply = query.thenApply(queryResponseMessage -> {
            return false;
        });
        Class<MockException> cls = MockException.class;
        Objects.requireNonNull(MockException.class);
        Assertions.assertFalse(((Boolean) thenApply.exceptionally((v1) -> {
            return r1.isInstance(v1);
        }).get()).booleanValue(), "Exception by handler should be reported in result, not on Mono");
        Assertions.assertTrue(((QueryResponseMessage) query.get()).isExceptional());
    }

    @Test
    void queryHandlerDeclaresFutureResponseType() throws Exception {
        this.testSubject.subscribe(String.class.getName(), ReflectionUtils.methodOf(getClass(), "futureMethod", new Class[0]).getGenericReturnType(), queryMessage -> {
            return CompletableFuture.completedFuture(String.valueOf(queryMessage.getPayload()) + "1234");
        });
        CompletableFuture query = this.testSubject.query(new GenericQueryMessage(new MessageType(String.class), "hello", this.singleStringResponse));
        Assertions.assertTrue(query.isDone(), "SimpleQueryBus should resolve CompletableFutures directly");
        Assertions.assertEquals("hello1234", ((QueryResponseMessage) query.get()).getPayload());
    }

    @Test
    void queryHandlerDeclaresCompletableFutureResponseType() throws Exception {
        this.testSubject.subscribe(String.class.getName(), ReflectionUtils.methodOf(getClass(), "completableFutureMethod", new Class[0]).getGenericReturnType(), queryMessage -> {
            return CompletableFuture.completedFuture(String.valueOf(queryMessage.getPayload()) + "1234");
        });
        CompletableFuture query = this.testSubject.query(new GenericQueryMessage(new MessageType(String.class), "hello", this.singleStringResponse));
        Assertions.assertTrue(query.isDone(), "SimpleQueryBus should resolve CompletableFutures directly");
        Assertions.assertEquals("hello1234", ((QueryResponseMessage) query.get()).getPayload());
    }

    @Test
    void onSubscriptionQueryCancelTheActiveSubscriptionIsRemovedFromTheEmitterIfFluxIsNotSubscribed() {
        this.testSubject.subscribe(String.class.getName(), String.class, queryMessage -> {
            return String.valueOf(queryMessage.getPayload()) + "1234";
        });
        this.testSubject.subscriptionQuery(new GenericSubscriptionQueryMessage(new MessageType(String.class), "test", ResponseTypes.instanceOf(String.class), ResponseTypes.instanceOf(String.class))).cancel();
        Assertions.assertEquals(0, this.testSubject.queryUpdateEmitter().activeSubscriptions().size());
    }

    public Future<String> futureMethod() {
        return null;
    }

    public CompletableFuture<String> completableFutureMethod() {
        return null;
    }
}
