package org.axonframework.integrationtests.queryhandling;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.axonframework.eventhandling.GenericEventMessage;
import org.axonframework.integrationtests.utils.AssertUtils;
import org.axonframework.messaging.responsetypes.ResponseTypes;
import org.axonframework.messaging.unitofwork.DefaultUnitOfWork;
import org.axonframework.queryhandling.DefaultQueryGateway;
import org.axonframework.queryhandling.GenericSubscriptionQueryMessage;
import org.axonframework.queryhandling.GenericSubscriptionQueryUpdateMessage;
import org.axonframework.queryhandling.QueryBus;
import org.axonframework.queryhandling.QueryHandler;
import org.axonframework.queryhandling.QueryUpdateEmitter;
import org.axonframework.queryhandling.SubscriptionQueryBackpressure;
import org.axonframework.queryhandling.SubscriptionQueryMessage;
import org.axonframework.queryhandling.SubscriptionQueryResult;
import org.axonframework.queryhandling.SubscriptionQueryUpdateMessage;
import org.axonframework.queryhandling.annotation.AnnotationQueryHandlerAdapter;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Hooks;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import reactor.test.StepVerifierOptions;
import reactor.util.concurrent.Queues;

/* loaded from: input_file:org/axonframework/integrationtests/queryhandling/AbstractSubscriptionQueryTestSuite.class */
public abstract class AbstractSubscriptionQueryTestSuite {
    private static final String FOUND = "found";
    private static final String TEST_PAYLOAD = "axonFrameworkCR";
    private QueryBus queryBus;
    private QueryUpdateEmitter queryUpdateEmitter;
    private ChatQueryHandler chatQueryHandler;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:org/axonframework/integrationtests/queryhandling/AbstractSubscriptionQueryTestSuite$ChatQueryHandler.class */
    private static class ChatQueryHandler {
        private final QueryUpdateEmitter emitter;
        private final RuntimeException toBeThrown;

        private ChatQueryHandler(QueryUpdateEmitter queryUpdateEmitter) {
            this.toBeThrown = new RuntimeException("oops");
            this.emitter = queryUpdateEmitter;
        }

        @QueryHandler(queryName = "chatMessages")
        public List<String> chatMessages(String str) {
            return Arrays.asList("Message1", "Message2", "Message3");
        }

        @QueryHandler(queryName = "numberOfMessages")
        public Integer numberOfMessages(Integer num) {
            return 0;
        }

        @QueryHandler(queryName = "failingQuery")
        public String failingQuery(String str) {
            throw this.toBeThrown;
        }

        @QueryHandler(queryName = "emitFirstThenReturnInitial")
        public String emitFirstThenReturnInitial(String str) throws InterruptedException {
            CountDownLatch countDownLatch = new CountDownLatch(1);
            Executors.newSingleThreadExecutor().submit(() -> {
                QueryUpdateEmitter queryUpdateEmitter = this.emitter;
                String str2 = AbstractSubscriptionQueryTestSuite.TEST_PAYLOAD;
                queryUpdateEmitter.emit(String.class, (v1) -> {
                    return r2.equals(v1);
                }, GenericSubscriptionQueryUpdateMessage.asUpdateMessage("Update1"));
                QueryUpdateEmitter queryUpdateEmitter2 = this.emitter;
                String str3 = AbstractSubscriptionQueryTestSuite.TEST_PAYLOAD;
                queryUpdateEmitter2.emit(String.class, (v1) -> {
                    return r2.equals(v1);
                }, GenericSubscriptionQueryUpdateMessage.asUpdateMessage("Update2"));
                QueryUpdateEmitter queryUpdateEmitter3 = this.emitter;
                String str4 = AbstractSubscriptionQueryTestSuite.TEST_PAYLOAD;
                queryUpdateEmitter3.complete(String.class, (v1) -> {
                    return r2.equals(v1);
                });
                countDownLatch.countDown();
            });
            countDownLatch.await();
            return "Initial";
        }

        @QueryHandler
        public String someQueryHandler(SomeQuery someQuery) {
            if (AbstractSubscriptionQueryTestSuite.FOUND.equals(someQuery.getFilter())) {
                return AbstractSubscriptionQueryTestSuite.FOUND;
            }
            return null;
        }
    }

    /* loaded from: input_file:org/axonframework/integrationtests/queryhandling/AbstractSubscriptionQueryTestSuite$SomeQuery.class */
    private static class SomeQuery {
        private final String filter;

        private SomeQuery(String str) {
            this.filter = str;
        }

        public String getFilter() {
            return this.filter;
        }
    }

    @BeforeEach
    void setUp() {
        this.queryBus = queryBus();
        this.queryUpdateEmitter = queryUpdateEmitter();
        this.chatQueryHandler = new ChatQueryHandler(this.queryUpdateEmitter);
        new AnnotationQueryHandlerAdapter(this.chatQueryHandler).subscribe(this.queryBus);
        Hooks.onErrorDropped(th -> {
        });
    }

    @AfterEach
    void tearDown() {
        Hooks.resetOnErrorDropped();
    }

    public abstract QueryBus queryBus();

    public abstract QueryUpdateEmitter queryUpdateEmitter();

    @Test
    void emittingAnUpdate() {
        GenericSubscriptionQueryMessage genericSubscriptionQueryMessage = new GenericSubscriptionQueryMessage(TEST_PAYLOAD, "chatMessages", ResponseTypes.multipleInstancesOf(String.class), ResponseTypes.instanceOf(String.class));
        GenericSubscriptionQueryMessage genericSubscriptionQueryMessage2 = new GenericSubscriptionQueryMessage(5, "numberOfMessages", ResponseTypes.instanceOf(Integer.class), ResponseTypes.instanceOf(Integer.class));
        SubscriptionQueryResult subscriptionQuery = this.queryBus.subscriptionQuery(genericSubscriptionQueryMessage);
        SubscriptionQueryResult subscriptionQuery2 = this.queryBus.subscriptionQuery(genericSubscriptionQueryMessage2);
        QueryUpdateEmitter queryUpdateEmitter = this.chatQueryHandler.emitter;
        String str = TEST_PAYLOAD;
        queryUpdateEmitter.emit(String.class, (v1) -> {
            return r2.equals(v1);
        }, "Update11");
        QueryUpdateEmitter queryUpdateEmitter2 = this.chatQueryHandler.emitter;
        String str2 = TEST_PAYLOAD;
        queryUpdateEmitter2.complete(String.class, (v1) -> {
            return r2.equals(v1);
        });
        QueryUpdateEmitter queryUpdateEmitter3 = this.chatQueryHandler.emitter;
        String str3 = TEST_PAYLOAD;
        queryUpdateEmitter3.emit(String.class, (v1) -> {
            return r2.equals(v1);
        }, GenericSubscriptionQueryUpdateMessage.asUpdateMessage("Update12"));
        StepVerifier.create(subscriptionQuery.initialResult().map((v0) -> {
            return v0.getPayload();
        })).expectNext(Arrays.asList("Message1", "Message2", "Message3")).expectComplete().verify();
        StepVerifier.create(subscriptionQuery.updates().map((v0) -> {
            return v0.getPayload();
        })).expectNext("Update11").expectComplete().verify();
        this.chatQueryHandler.emitter.emit(Integer.class, num -> {
            return num.intValue() == 5;
        }, GenericSubscriptionQueryUpdateMessage.asUpdateMessage(1));
        this.chatQueryHandler.emitter.complete(Integer.class, num2 -> {
            return num2.intValue() == 5;
        });
        this.chatQueryHandler.emitter.emit(Integer.class, num3 -> {
            return num3.intValue() == 5;
        }, 2);
        StepVerifier.create(subscriptionQuery2.initialResult().map((v0) -> {
            return v0.getPayload();
        })).expectNext(0).verifyComplete();
        StepVerifier.create(subscriptionQuery2.updates().map((v0) -> {
            return v0.getPayload();
        })).expectNext(1).verifyComplete();
    }

    @Test
    void emittingNullUpdate() {
        SubscriptionQueryResult subscriptionQuery = this.queryBus.subscriptionQuery(new GenericSubscriptionQueryMessage(TEST_PAYLOAD, "chatMessages", ResponseTypes.multipleInstancesOf(String.class), ResponseTypes.instanceOf(String.class)));
        QueryUpdateEmitter queryUpdateEmitter = this.chatQueryHandler.emitter;
        String str = TEST_PAYLOAD;
        queryUpdateEmitter.emit(String.class, (v1) -> {
            return r2.equals(v1);
        }, new GenericSubscriptionQueryUpdateMessage(String.class, (Object) null));
        QueryUpdateEmitter queryUpdateEmitter2 = this.chatQueryHandler.emitter;
        String str2 = TEST_PAYLOAD;
        queryUpdateEmitter2.complete(String.class, (v1) -> {
            return r2.equals(v1);
        });
        StepVerifier.create(subscriptionQuery.updates()).expectNextMatches(subscriptionQueryUpdateMessage -> {
            return subscriptionQueryUpdateMessage.getPayload() == null;
        }).verifyComplete();
    }

    @Test
    void emittingUpdateInUnitOfWorkLifecycleRunsUpdatesOnAfterCommit() {
        String str = TEST_PAYLOAD;
        List singletonList = Collections.singletonList("some-update");
        DefaultUnitOfWork defaultUnitOfWork = new DefaultUnitOfWork(GenericEventMessage.asEventMessage("some-event-payload"));
        defaultUnitOfWork.start();
        SubscriptionQueryResult subscriptionQuery = this.queryBus.subscriptionQuery(new GenericSubscriptionQueryMessage(TEST_PAYLOAD, "chatMessages", ResponseTypes.multipleInstancesOf(String.class), ResponseTypes.instanceOf(String.class)));
        QueryUpdateEmitter queryUpdateEmitter = this.chatQueryHandler.emitter;
        Objects.requireNonNull(TEST_PAYLOAD);
        queryUpdateEmitter.emit(String.class, (v1) -> {
            return r2.equals(v1);
        }, "some-update");
        Flux updates = subscriptionQuery.updates();
        ArrayList arrayList = new ArrayList();
        Flux map = updates.map((v0) -> {
            return v0.getPayload();
        });
        Objects.requireNonNull(arrayList);
        map.subscribe((v1) -> {
            r1.add(v1);
        });
        Assertions.assertTrue(arrayList.isEmpty());
        defaultUnitOfWork.commit();
        Assertions.assertEquals(singletonList, arrayList);
    }

    @Test
    void completingSubscriptionQueryExceptionally() {
        GenericSubscriptionQueryMessage genericSubscriptionQueryMessage = new GenericSubscriptionQueryMessage(TEST_PAYLOAD, "chatMessages", ResponseTypes.multipleInstancesOf(String.class), ResponseTypes.instanceOf(String.class));
        RuntimeException runtimeException = new RuntimeException();
        SubscriptionQueryResult subscriptionQuery = this.queryBus.subscriptionQuery(genericSubscriptionQueryMessage, Queues.SMALL_BUFFER_SIZE);
        Executors.newSingleThreadScheduledExecutor().schedule(() -> {
            QueryUpdateEmitter queryUpdateEmitter = this.chatQueryHandler.emitter;
            String str = TEST_PAYLOAD;
            queryUpdateEmitter.emit(String.class, (v1) -> {
                return r2.equals(v1);
            }, "Update1");
            QueryUpdateEmitter queryUpdateEmitter2 = this.chatQueryHandler.emitter;
            String str2 = TEST_PAYLOAD;
            queryUpdateEmitter2.completeExceptionally(String.class, (v1) -> {
                return r2.equals(v1);
            }, runtimeException);
            QueryUpdateEmitter queryUpdateEmitter3 = this.chatQueryHandler.emitter;
            String str3 = TEST_PAYLOAD;
            queryUpdateEmitter3.emit(String.class, (v1) -> {
                return r2.equals(v1);
            }, "Update2");
        }, 500L, TimeUnit.MILLISECONDS);
        StepVerifier.create(subscriptionQuery.initialResult().map((v0) -> {
            return v0.getPayload();
        })).expectNext(Arrays.asList("Message1", "Message2", "Message3")).verifyComplete();
        StepVerifier.Step expectNext = StepVerifier.create(subscriptionQuery.updates().map((v0) -> {
            return v0.getPayload();
        })).expectNext("Update1");
        Objects.requireNonNull(runtimeException);
        expectNext.expectErrorMatches((v1) -> {
            return r1.equals(v1);
        }).verify();
    }

    @Test
    @Deprecated
    void completingSubscriptionQueryExceptionallyDeprecated() {
        GenericSubscriptionQueryMessage genericSubscriptionQueryMessage = new GenericSubscriptionQueryMessage(TEST_PAYLOAD, "chatMessages", ResponseTypes.multipleInstancesOf(String.class), ResponseTypes.instanceOf(String.class));
        RuntimeException runtimeException = new RuntimeException();
        SubscriptionQueryResult subscriptionQuery = this.queryBus.subscriptionQuery(genericSubscriptionQueryMessage, new SubscriptionQueryBackpressure(FluxSink.OverflowStrategy.IGNORE), Queues.SMALL_BUFFER_SIZE);
        Executors.newSingleThreadScheduledExecutor().schedule(() -> {
            QueryUpdateEmitter queryUpdateEmitter = this.chatQueryHandler.emitter;
            String str = TEST_PAYLOAD;
            queryUpdateEmitter.emit(String.class, (v1) -> {
                return r2.equals(v1);
            }, "Update1");
            QueryUpdateEmitter queryUpdateEmitter2 = this.chatQueryHandler.emitter;
            String str2 = TEST_PAYLOAD;
            queryUpdateEmitter2.completeExceptionally(String.class, (v1) -> {
                return r2.equals(v1);
            }, runtimeException);
            QueryUpdateEmitter queryUpdateEmitter3 = this.chatQueryHandler.emitter;
            String str3 = TEST_PAYLOAD;
            queryUpdateEmitter3.emit(String.class, (v1) -> {
                return r2.equals(v1);
            }, "Update2");
        }, 500L, TimeUnit.MILLISECONDS);
        StepVerifier.create(subscriptionQuery.initialResult().map((v0) -> {
            return v0.getPayload();
        })).expectNext(Arrays.asList("Message1", "Message2", "Message3")).verifyComplete();
        StepVerifier.Step expectNext = StepVerifier.create(subscriptionQuery.updates().map((v0) -> {
            return v0.getPayload();
        })).expectNext("Update1");
        Objects.requireNonNull(runtimeException);
        expectNext.expectErrorMatches((v1) -> {
            return r1.equals(v1);
        }).verify();
    }

    @Test
    void completingSubscriptionQueryExceptionallyWhenOneOfSubscriptionFails() {
        GenericSubscriptionQueryMessage genericSubscriptionQueryMessage = new GenericSubscriptionQueryMessage(TEST_PAYLOAD, "chatMessages", ResponseTypes.multipleInstancesOf(String.class), ResponseTypes.instanceOf(String.class));
        GenericSubscriptionQueryMessage genericSubscriptionQueryMessage2 = new GenericSubscriptionQueryMessage(TEST_PAYLOAD, "chatMessages", ResponseTypes.multipleInstancesOf(String.class), ResponseTypes.instanceOf(String.class));
        SubscriptionQueryResult subscriptionQuery = this.queryBus.subscriptionQuery(genericSubscriptionQueryMessage);
        SubscriptionQueryResult subscriptionQuery2 = this.queryBus.subscriptionQuery(genericSubscriptionQueryMessage2);
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        Flux map = subscriptionQuery.updates().map((v0) -> {
            return v0.getPayload();
        });
        Objects.requireNonNull(arrayList);
        map.subscribe((v1) -> {
            r1.add(v1);
        }, th -> {
            arrayList.add("Error1");
            throw ((RuntimeException) th);
        });
        Flux map2 = subscriptionQuery2.updates().map((v0) -> {
            return v0.getPayload();
        });
        Objects.requireNonNull(arrayList2);
        map2.subscribe((v1) -> {
            r1.add(v1);
        }, th2 -> {
            arrayList2.add("Error2");
        });
        QueryUpdateEmitter queryUpdateEmitter = this.chatQueryHandler.emitter;
        String str = TEST_PAYLOAD;
        queryUpdateEmitter.emit(String.class, (v1) -> {
            return r2.equals(v1);
        }, "Update1");
        QueryUpdateEmitter queryUpdateEmitter2 = this.chatQueryHandler.emitter;
        String str2 = TEST_PAYLOAD;
        queryUpdateEmitter2.completeExceptionally(String.class, (v1) -> {
            return r2.equals(v1);
        }, new RuntimeException());
        QueryUpdateEmitter queryUpdateEmitter3 = this.chatQueryHandler.emitter;
        String str3 = TEST_PAYLOAD;
        queryUpdateEmitter3.emit(String.class, (v1) -> {
            return r2.equals(v1);
        }, "Update2");
        Assertions.assertEquals(Arrays.asList("Update1", "Error1"), arrayList);
        Assertions.assertEquals(Arrays.asList("Update1", "Error2"), arrayList2);
    }

    @Test
    void completingSubscriptionExceptionallyInUnitOfWorkLifecycleRunsUpdatesOnAfterCommit() {
        String str = TEST_PAYLOAD;
        String str2 = "some-update";
        List singletonList = Collections.singletonList("some-update");
        DefaultUnitOfWork defaultUnitOfWork = new DefaultUnitOfWork(GenericEventMessage.asEventMessage("some-event-payload"));
        defaultUnitOfWork.start();
        SubscriptionQueryResult subscriptionQuery = this.queryBus.subscriptionQuery(new GenericSubscriptionQueryMessage(TEST_PAYLOAD, "chatMessages", ResponseTypes.multipleInstancesOf(String.class), ResponseTypes.instanceOf(String.class)));
        QueryUpdateEmitter queryUpdateEmitter = this.chatQueryHandler.emitter;
        Objects.requireNonNull(TEST_PAYLOAD);
        queryUpdateEmitter.emit(String.class, (v1) -> {
            return r2.equals(v1);
        }, "some-update");
        QueryUpdateEmitter queryUpdateEmitter2 = this.chatQueryHandler.emitter;
        Objects.requireNonNull(TEST_PAYLOAD);
        queryUpdateEmitter2.completeExceptionally(String.class, (v1) -> {
            return r2.equals(v1);
        }, new RuntimeException());
        Flux updates = subscriptionQuery.updates();
        ArrayList arrayList = new ArrayList();
        Flux map = updates.map((v0) -> {
            return v0.getPayload();
        });
        Objects.requireNonNull(arrayList);
        map.subscribe((v1) -> {
            r1.add(v1);
        });
        Assertions.assertTrue(arrayList.isEmpty());
        defaultUnitOfWork.commit();
        Assertions.assertEquals(singletonList, arrayList);
        StepVerifier.create(updates).expectNextMatches(subscriptionQueryUpdateMessage -> {
            return ((String) subscriptionQueryUpdateMessage.getPayload()).equals(str2);
        }).verifyError(RuntimeException.class);
    }

    @Test
    void completingSubscriptionQuery() {
        SubscriptionQueryResult subscriptionQuery = this.queryBus.subscriptionQuery(new GenericSubscriptionQueryMessage(TEST_PAYLOAD, "chatMessages", ResponseTypes.multipleInstancesOf(String.class), ResponseTypes.instanceOf(String.class)));
        Executors.newSingleThreadScheduledExecutor().schedule(() -> {
            QueryUpdateEmitter queryUpdateEmitter = this.chatQueryHandler.emitter;
            String str = TEST_PAYLOAD;
            queryUpdateEmitter.emit(String.class, (v1) -> {
                return r2.equals(v1);
            }, "Update1");
            QueryUpdateEmitter queryUpdateEmitter2 = this.chatQueryHandler.emitter;
            String str2 = TEST_PAYLOAD;
            queryUpdateEmitter2.complete(String.class, (v1) -> {
                return r2.equals(v1);
            });
            QueryUpdateEmitter queryUpdateEmitter3 = this.chatQueryHandler.emitter;
            String str3 = TEST_PAYLOAD;
            queryUpdateEmitter3.emit(String.class, (v1) -> {
                return r2.equals(v1);
            }, "Update2");
        }, 500L, TimeUnit.MILLISECONDS);
        StepVerifier.create(subscriptionQuery.initialResult().map((v0) -> {
            return v0.getPayload();
        })).expectNext(Arrays.asList("Message1", "Message2", "Message3")).verifyComplete();
        StepVerifier.create(subscriptionQuery.updates().map((v0) -> {
            return v0.getPayload();
        })).expectNext("Update1").verifyComplete();
    }

    @Test
    void completingSubscriptionInUnitOfWorkLifecycleRunsUpdatesOnAfterCommit() {
        String str = TEST_PAYLOAD;
        String str2 = "some-update";
        List singletonList = Collections.singletonList("some-update");
        DefaultUnitOfWork defaultUnitOfWork = new DefaultUnitOfWork(GenericEventMessage.asEventMessage("some-event-payload"));
        defaultUnitOfWork.start();
        SubscriptionQueryResult subscriptionQuery = this.queryBus.subscriptionQuery(new GenericSubscriptionQueryMessage(TEST_PAYLOAD, "chatMessages", ResponseTypes.multipleInstancesOf(String.class), ResponseTypes.instanceOf(String.class)));
        QueryUpdateEmitter queryUpdateEmitter = this.chatQueryHandler.emitter;
        Objects.requireNonNull(TEST_PAYLOAD);
        queryUpdateEmitter.emit(String.class, (v1) -> {
            return r2.equals(v1);
        }, "some-update");
        QueryUpdateEmitter queryUpdateEmitter2 = this.chatQueryHandler.emitter;
        Objects.requireNonNull(TEST_PAYLOAD);
        queryUpdateEmitter2.complete(String.class, (v1) -> {
            return r2.equals(v1);
        });
        Flux updates = subscriptionQuery.updates();
        ArrayList arrayList = new ArrayList();
        Flux map = updates.map((v0) -> {
            return v0.getPayload();
        });
        Objects.requireNonNull(arrayList);
        map.subscribe((v1) -> {
            r1.add(v1);
        });
        Assertions.assertTrue(arrayList.isEmpty());
        defaultUnitOfWork.commit();
        AssertUtils.assertWithin(500, TimeUnit.MILLISECONDS, () -> {
            Assertions.assertEquals(singletonList, arrayList);
        });
        StepVerifier.create(updates).expectNextMatches(subscriptionQueryUpdateMessage -> {
            return ((String) subscriptionQueryUpdateMessage.getPayload()).equals(str2);
        }).verifyComplete();
    }

    @Test
    void orderingOfOperationOnUpdateHandler() {
        SubscriptionQueryResult subscriptionQuery = this.queryBus.subscriptionQuery(new GenericSubscriptionQueryMessage(TEST_PAYLOAD, "emitFirstThenReturnInitial", ResponseTypes.instanceOf(String.class), ResponseTypes.instanceOf(String.class)));
        StepVerifier.create(subscriptionQuery.initialResult().map((v0) -> {
            return v0.getPayload();
        })).expectNext("Initial").expectComplete().verify();
        StepVerifier.create(subscriptionQuery.updates().map((v0) -> {
            return v0.getPayload();
        })).expectNext("Update1", "Update2").verifyComplete();
    }

    @Test
    void subscribingQueryHandlerFailing() {
        StepVerifier.create(this.queryBus.subscriptionQuery(new GenericSubscriptionQueryMessage(TEST_PAYLOAD, "failingQuery", ResponseTypes.instanceOf(String.class), ResponseTypes.instanceOf(String.class))).initialResult()).assertNext(queryResponseMessage -> {
            Assertions.assertTrue(queryResponseMessage.isExceptional());
            Assertions.assertEquals(this.chatQueryHandler.toBeThrown, queryResponseMessage.exceptionResult());
        }).expectComplete().verify();
    }

    @Test
    void severalSubscriptions() {
        SubscriptionQueryResult subscriptionQuery = this.queryBus.subscriptionQuery(new GenericSubscriptionQueryMessage(TEST_PAYLOAD, "chatMessages", ResponseTypes.multipleInstancesOf(String.class), ResponseTypes.instanceOf(String.class)), 8);
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        ArrayList arrayList3 = new ArrayList();
        ArrayList arrayList4 = new ArrayList();
        ArrayList arrayList5 = new ArrayList();
        Mono map = subscriptionQuery.initialResult().map((v0) -> {
            return v0.getPayload();
        });
        Objects.requireNonNull(arrayList);
        map.subscribe((v1) -> {
            r1.addAll(v1);
        });
        Mono map2 = subscriptionQuery.initialResult().map((v0) -> {
            return v0.getPayload();
        });
        Objects.requireNonNull(arrayList2);
        map2.subscribe((v1) -> {
            r1.addAll(v1);
        });
        QueryUpdateEmitter queryUpdateEmitter = this.chatQueryHandler.emitter;
        String str = TEST_PAYLOAD;
        queryUpdateEmitter.emit(String.class, (v1) -> {
            return r2.equals(v1);
        }, "Update1");
        Flux map3 = subscriptionQuery.updates().map((v0) -> {
            return v0.getPayload();
        });
        Objects.requireNonNull(arrayList3);
        map3.subscribe((v1) -> {
            r1.add(v1);
        });
        Flux map4 = subscriptionQuery.updates().map((v0) -> {
            return v0.getPayload();
        });
        Objects.requireNonNull(arrayList4);
        map4.subscribe((v1) -> {
            r1.add(v1);
        });
        for (int i = 2; i < 10; i++) {
            QueryUpdateEmitter queryUpdateEmitter2 = this.chatQueryHandler.emitter;
            String str2 = TEST_PAYLOAD;
            queryUpdateEmitter2.emit(String.class, (v1) -> {
                return r2.equals(v1);
            }, "Update" + i);
        }
        Flux map5 = subscriptionQuery.updates().map((v0) -> {
            return v0.getPayload();
        });
        Objects.requireNonNull(arrayList5);
        map5.subscribe((v1) -> {
            r1.add(v1);
        });
        QueryUpdateEmitter queryUpdateEmitter3 = this.chatQueryHandler.emitter;
        String str3 = TEST_PAYLOAD;
        queryUpdateEmitter3.emit(String.class, (v1) -> {
            return r2.equals(v1);
        }, "Update10");
        QueryUpdateEmitter queryUpdateEmitter4 = this.chatQueryHandler.emitter;
        String str4 = TEST_PAYLOAD;
        queryUpdateEmitter4.emit(String.class, (v1) -> {
            return r2.equals(v1);
        }, "Update11");
        QueryUpdateEmitter queryUpdateEmitter5 = this.chatQueryHandler.emitter;
        String str5 = TEST_PAYLOAD;
        queryUpdateEmitter5.complete(String.class, (v1) -> {
            return r2.equals(v1);
        });
        Assertions.assertEquals(Arrays.asList("Message1", "Message2", "Message3"), arrayList);
        Assertions.assertEquals(Arrays.asList("Message1", "Message2", "Message3"), arrayList2);
        Assertions.assertEquals(Arrays.asList("Update1", "Update2", "Update3", "Update4", "Update5", "Update6", "Update7", "Update8", "Update9", "Update10", "Update11"), arrayList3);
        Assertions.assertEquals(Arrays.asList("Update1", "Update2", "Update3", "Update4", "Update5", "Update6", "Update7", "Update8", "Update9", "Update10", "Update11"), arrayList4);
        Assertions.assertEquals(Arrays.asList("Update2", "Update3", "Update4", "Update5", "Update6", "Update7", "Update8", "Update9", "Update10", "Update11"), arrayList5);
    }

    @Test
    @Deprecated
    void severalSubscriptionsDeprecated() {
        SubscriptionQueryResult subscriptionQuery = this.queryBus.subscriptionQuery(new GenericSubscriptionQueryMessage(TEST_PAYLOAD, "chatMessages", ResponseTypes.multipleInstancesOf(String.class), ResponseTypes.instanceOf(String.class)), new SubscriptionQueryBackpressure(FluxSink.OverflowStrategy.ERROR), 8);
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        ArrayList arrayList3 = new ArrayList();
        ArrayList arrayList4 = new ArrayList();
        ArrayList arrayList5 = new ArrayList();
        Mono map = subscriptionQuery.initialResult().map((v0) -> {
            return v0.getPayload();
        });
        Objects.requireNonNull(arrayList);
        map.subscribe((v1) -> {
            r1.addAll(v1);
        });
        Mono map2 = subscriptionQuery.initialResult().map((v0) -> {
            return v0.getPayload();
        });
        Objects.requireNonNull(arrayList2);
        map2.subscribe((v1) -> {
            r1.addAll(v1);
        });
        QueryUpdateEmitter queryUpdateEmitter = this.chatQueryHandler.emitter;
        String str = TEST_PAYLOAD;
        queryUpdateEmitter.emit(String.class, (v1) -> {
            return r2.equals(v1);
        }, "Update1");
        Flux map3 = subscriptionQuery.updates().map((v0) -> {
            return v0.getPayload();
        });
        Objects.requireNonNull(arrayList3);
        map3.subscribe((v1) -> {
            r1.add(v1);
        });
        Flux map4 = subscriptionQuery.updates().map((v0) -> {
            return v0.getPayload();
        });
        Objects.requireNonNull(arrayList4);
        map4.subscribe((v1) -> {
            r1.add(v1);
        });
        for (int i = 2; i < 10; i++) {
            QueryUpdateEmitter queryUpdateEmitter2 = this.chatQueryHandler.emitter;
            String str2 = TEST_PAYLOAD;
            queryUpdateEmitter2.emit(String.class, (v1) -> {
                return r2.equals(v1);
            }, "Update" + i);
        }
        Flux map5 = subscriptionQuery.updates().map((v0) -> {
            return v0.getPayload();
        });
        Objects.requireNonNull(arrayList5);
        map5.subscribe((v1) -> {
            r1.add(v1);
        });
        QueryUpdateEmitter queryUpdateEmitter3 = this.chatQueryHandler.emitter;
        String str3 = TEST_PAYLOAD;
        queryUpdateEmitter3.emit(String.class, (v1) -> {
            return r2.equals(v1);
        }, "Update10");
        QueryUpdateEmitter queryUpdateEmitter4 = this.chatQueryHandler.emitter;
        String str4 = TEST_PAYLOAD;
        queryUpdateEmitter4.emit(String.class, (v1) -> {
            return r2.equals(v1);
        }, "Update11");
        QueryUpdateEmitter queryUpdateEmitter5 = this.chatQueryHandler.emitter;
        String str5 = TEST_PAYLOAD;
        queryUpdateEmitter5.complete(String.class, (v1) -> {
            return r2.equals(v1);
        });
        Assertions.assertEquals(Arrays.asList("Message1", "Message2", "Message3"), arrayList);
        Assertions.assertEquals(Arrays.asList("Message1", "Message2", "Message3"), arrayList2);
        Assertions.assertEquals(Arrays.asList("Update1", "Update2", "Update3", "Update4", "Update5", "Update6", "Update7", "Update8", "Update9", "Update10", "Update11"), arrayList3);
        Assertions.assertEquals(Arrays.asList("Update1", "Update2", "Update3", "Update4", "Update5", "Update6", "Update7", "Update8", "Update9", "Update10", "Update11"), arrayList4);
        Assertions.assertEquals(Arrays.asList("Update2", "Update3", "Update4", "Update5", "Update6", "Update7", "Update8", "Update9", "Update10", "Update11"), arrayList5);
    }

    @Test
    void doubleSubscriptionMessage() {
        GenericSubscriptionQueryMessage genericSubscriptionQueryMessage = new GenericSubscriptionQueryMessage(TEST_PAYLOAD, "chatMessages", ResponseTypes.multipleInstancesOf(String.class), ResponseTypes.instanceOf(String.class));
        this.queryBus.subscriptionQuery(genericSubscriptionQueryMessage);
        Assertions.assertThrows(IllegalArgumentException.class, () -> {
            this.queryBus.subscriptionQuery(genericSubscriptionQueryMessage);
        });
    }

    @Test
    void replayBufferOverflow() {
        SubscriptionQueryResult subscriptionQuery = this.queryBus.subscriptionQuery(new GenericSubscriptionQueryMessage(TEST_PAYLOAD, "chatMessages", ResponseTypes.multipleInstancesOf(String.class), ResponseTypes.instanceOf(String.class)), 100);
        for (int i = 0; i <= 200; i++) {
            QueryUpdateEmitter queryUpdateEmitter = this.chatQueryHandler.emitter;
            String str = TEST_PAYLOAD;
            queryUpdateEmitter.emit(String.class, (v1) -> {
                return r2.equals(v1);
            }, "Update" + i);
        }
        QueryUpdateEmitter queryUpdateEmitter2 = this.chatQueryHandler.emitter;
        String str2 = TEST_PAYLOAD;
        queryUpdateEmitter2.complete(String.class, (v1) -> {
            return r2.equals(v1);
        });
        StepVerifier.create(subscriptionQuery.updates()).recordWith(LinkedList::new).thenConsumeWhile(subscriptionQueryUpdateMessage -> {
            return true;
        }).expectRecordedMatches(AbstractSubscriptionQueryTestSuite::assertRecorded).verifyComplete();
    }

    private static boolean assertRecorded(Collection<SubscriptionQueryUpdateMessage<String>> collection) {
        LinkedList linkedList = new LinkedList(collection);
        if (!$assertionsDisabled && linkedList.peekFirst() == null) {
            throw new AssertionError();
        }
        boolean equals = ((String) ((SubscriptionQueryUpdateMessage) linkedList.peekFirst()).getPayload()).equals("Update101");
        if ($assertionsDisabled || linkedList.peekLast() != null) {
            return collection.size() == 100 && equals && ((String) ((SubscriptionQueryUpdateMessage) linkedList.peekLast()).getPayload()).equals("Update200");
        }
        throw new AssertionError();
    }

    @Test
    void onBackpressureError() {
        StepVerifier.create(this.queryBus.subscriptionQuery(new GenericSubscriptionQueryMessage(TEST_PAYLOAD, "chatMessages", ResponseTypes.multipleInstancesOf(String.class), ResponseTypes.instanceOf(String.class)), 100).updates().onBackpressureBuffer(100), StepVerifierOptions.create().initialRequest(0L)).expectSubscription().then(() -> {
            for (int i = 0; i < 200; i++) {
                QueryUpdateEmitter queryUpdateEmitter = this.chatQueryHandler.emitter;
                String str = TEST_PAYLOAD;
                queryUpdateEmitter.emit(String.class, (v1) -> {
                    return r2.equals(v1);
                }, "Update" + i);
            }
            QueryUpdateEmitter queryUpdateEmitter2 = this.chatQueryHandler.emitter;
            String str2 = TEST_PAYLOAD;
            queryUpdateEmitter2.complete(String.class, (v1) -> {
                return r2.equals(v1);
            });
        }).expectNoEvent(Duration.ofMillis(100L)).thenRequest(100L).expectNextCount(100L).expectErrorMatches(Exceptions::isOverflow).verify(Duration.ofSeconds(5L));
    }

    @Test
    void subscriptionDisposal() {
        SubscriptionQueryResult subscriptionQuery = this.queryBus.subscriptionQuery(new GenericSubscriptionQueryMessage(TEST_PAYLOAD, "chatMessages", ResponseTypes.multipleInstancesOf(String.class), ResponseTypes.instanceOf(String.class)));
        QueryUpdateEmitter queryUpdateEmitter = this.chatQueryHandler.emitter;
        String str = TEST_PAYLOAD;
        queryUpdateEmitter.emit(String.class, (v1) -> {
            return r2.equals(v1);
        }, "Update1");
        subscriptionQuery.close();
        QueryUpdateEmitter queryUpdateEmitter2 = this.chatQueryHandler.emitter;
        String str2 = TEST_PAYLOAD;
        queryUpdateEmitter2.emit(String.class, (v1) -> {
            return r2.equals(v1);
        }, "Update2");
        StepVerifier.create(subscriptionQuery.updates().map((v0) -> {
            return v0.getPayload();
        })).expectNext("Update1").verifyComplete();
    }

    @Test
    void subscriptionQueryWithInterceptors() {
        List asList = Arrays.asList("fakeReply1", "fakeReply2");
        this.queryBus.registerDispatchInterceptor(list -> {
            return (num, queryMessage) -> {
                return queryMessage.andMetaData(Collections.singletonMap("key", "value"));
            };
        });
        this.queryBus.registerHandlerInterceptor((unitOfWork, interceptorChain) -> {
            return unitOfWork.getMessage().getMetaData().containsKey("key") ? asList : interceptorChain.proceed();
        });
        StepVerifier.create(this.queryBus.subscriptionQuery(new GenericSubscriptionQueryMessage(TEST_PAYLOAD, "chatMessages", ResponseTypes.multipleInstancesOf(String.class), ResponseTypes.instanceOf(String.class))).initialResult().map((v0) -> {
            return v0.getPayload();
        })).expectNext(asList).verifyComplete();
    }

    @Test
    void subscriptionQueryUpdateWithInterceptors() {
        Map singletonMap = Collections.singletonMap("key", "value");
        this.queryUpdateEmitter.registerDispatchInterceptor(list -> {
            return (num, subscriptionQueryUpdateMessage) -> {
                return subscriptionQueryUpdateMessage.andMetaData(singletonMap);
            };
        });
        SubscriptionQueryResult subscriptionQuery = this.queryBus.subscriptionQuery(new GenericSubscriptionQueryMessage(TEST_PAYLOAD, "chatMessages", ResponseTypes.multipleInstancesOf(String.class), ResponseTypes.instanceOf(String.class)));
        QueryUpdateEmitter queryUpdateEmitter = this.chatQueryHandler.emitter;
        String str = TEST_PAYLOAD;
        queryUpdateEmitter.emit(String.class, (v1) -> {
            return r2.equals(v1);
        }, "Update1");
        subscriptionQuery.close();
        StepVerifier.create(subscriptionQuery.updates()).expectNextMatches(subscriptionQueryUpdateMessage -> {
            return subscriptionQueryUpdateMessage.getMetaData().equals(singletonMap);
        }).verifyComplete();
    }

    @Test
    void activeSubscriptions() {
        SubscriptionQueryMessage genericSubscriptionQueryMessage = new GenericSubscriptionQueryMessage(TEST_PAYLOAD, "chatMessages", ResponseTypes.multipleInstancesOf(String.class), ResponseTypes.instanceOf(String.class));
        SubscriptionQueryMessage genericSubscriptionQueryMessage2 = new GenericSubscriptionQueryMessage(5, "numberOfMessages", ResponseTypes.instanceOf(Integer.class), ResponseTypes.instanceOf(Integer.class));
        this.queryBus.subscriptionQuery(genericSubscriptionQueryMessage);
        this.queryBus.subscriptionQuery(genericSubscriptionQueryMessage2);
        Assertions.assertEquals(new HashSet(Arrays.asList(genericSubscriptionQueryMessage, genericSubscriptionQueryMessage2)), this.queryUpdateEmitter.activeSubscriptions());
    }

    @Test
    void subscriptionQueryResultHandle() throws InterruptedException {
        GenericSubscriptionQueryMessage genericSubscriptionQueryMessage = new GenericSubscriptionQueryMessage(TEST_PAYLOAD, "emitFirstThenReturnInitial", ResponseTypes.instanceOf(String.class), ResponseTypes.instanceOf(String.class));
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        CountDownLatch countDownLatch = new CountDownLatch(3);
        this.queryBus.subscriptionQuery(genericSubscriptionQueryMessage).handle(queryResponseMessage -> {
            arrayList.add((String) queryResponseMessage.getPayload());
            countDownLatch.countDown();
        }, subscriptionQueryUpdateMessage -> {
            arrayList2.add((String) subscriptionQueryUpdateMessage.getPayload());
            countDownLatch.countDown();
        });
        countDownLatch.await(500L, TimeUnit.MILLISECONDS);
        Assertions.assertEquals(Collections.singletonList("Initial"), arrayList);
        Assertions.assertEquals(Arrays.asList("Update1", "Update2"), arrayList2);
    }

    @Test
    void subscriptionQueryResultHandleWhenThereIsAnErrorConsumingAnInitialResult() throws InterruptedException {
        GenericSubscriptionQueryMessage genericSubscriptionQueryMessage = new GenericSubscriptionQueryMessage(TEST_PAYLOAD, "emitFirstThenReturnInitial", ResponseTypes.instanceOf(String.class), ResponseTypes.instanceOf(String.class));
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        CountDownLatch countDownLatch = new CountDownLatch(3);
        this.queryBus.subscriptionQuery(genericSubscriptionQueryMessage).handle(queryResponseMessage -> {
            arrayList.add((String) queryResponseMessage.getPayload());
            countDownLatch.countDown();
            throw new IllegalStateException("oops");
        }, subscriptionQueryUpdateMessage -> {
            arrayList2.add((String) subscriptionQueryUpdateMessage.getPayload());
            countDownLatch.countDown();
        });
        countDownLatch.await(500L, TimeUnit.MILLISECONDS);
        Assertions.assertEquals(Collections.singletonList("Initial"), arrayList);
        Assertions.assertTrue(arrayList2.isEmpty());
    }

    @Test
    @Deprecated
    void subscriptionQueryResultHandleWhenThereIsAnErrorConsumingAnUpdateDeprecated() {
        GenericSubscriptionQueryMessage genericSubscriptionQueryMessage = new GenericSubscriptionQueryMessage(TEST_PAYLOAD, "chatMessages", ResponseTypes.multipleInstancesOf(String.class), ResponseTypes.instanceOf(String.class));
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        this.queryBus.subscriptionQuery(genericSubscriptionQueryMessage, new SubscriptionQueryBackpressure(FluxSink.OverflowStrategy.DROP), 1).handle(queryResponseMessage -> {
            arrayList.addAll((Collection) queryResponseMessage.getPayload());
        }, subscriptionQueryUpdateMessage -> {
            arrayList2.add((String) subscriptionQueryUpdateMessage.getPayload());
            throw new IllegalStateException("oops");
        });
        QueryUpdateEmitter queryUpdateEmitter = this.chatQueryHandler.emitter;
        String str = TEST_PAYLOAD;
        queryUpdateEmitter.emit(String.class, (v1) -> {
            return r2.equals(v1);
        }, "Update1");
        QueryUpdateEmitter queryUpdateEmitter2 = this.chatQueryHandler.emitter;
        String str2 = TEST_PAYLOAD;
        queryUpdateEmitter2.emit(String.class, (v1) -> {
            return r2.equals(v1);
        }, "Update2");
        Assertions.assertEquals(Arrays.asList("Message1", "Message2", "Message3"), arrayList);
        Assertions.assertEquals(Collections.singletonList("Update1"), arrayList2);
        Assertions.assertTrue(this.queryUpdateEmitter.activeSubscriptions().isEmpty(), "Expected subscriptions to be cancelled");
    }

    @Test
    @Deprecated
    void bufferOverflowDeprecated() {
        SubscriptionQueryResult subscriptionQuery = this.queryBus.subscriptionQuery(new GenericSubscriptionQueryMessage(TEST_PAYLOAD, "chatMessages", ResponseTypes.multipleInstancesOf(String.class), ResponseTypes.instanceOf(String.class)), new SubscriptionQueryBackpressure(FluxSink.OverflowStrategy.ERROR), 200);
        for (int i = 0; i < 201; i++) {
            QueryUpdateEmitter queryUpdateEmitter = this.chatQueryHandler.emitter;
            String str = TEST_PAYLOAD;
            queryUpdateEmitter.emit(String.class, (v1) -> {
                return r2.equals(v1);
            }, "Update" + i);
        }
        QueryUpdateEmitter queryUpdateEmitter2 = this.chatQueryHandler.emitter;
        String str2 = TEST_PAYLOAD;
        queryUpdateEmitter2.complete(String.class, (v1) -> {
            return r2.equals(v1);
        });
        StepVerifier.create(subscriptionQuery.updates()).expectErrorMatches(th -> {
            return "The receiver is overrun by more signals than expected (bounded queue...)".equals(th.getMessage());
        }).verify();
    }

    @Test
    @Deprecated
    void subscriptionQueryResultHandleWhenThereIsAnErrorConsumingABufferedUpdateDeprecated() {
        GenericSubscriptionQueryMessage genericSubscriptionQueryMessage = new GenericSubscriptionQueryMessage(TEST_PAYLOAD, "chatMessages", ResponseTypes.multipleInstancesOf(String.class), ResponseTypes.instanceOf(String.class));
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        this.queryBus.subscriptionQuery(genericSubscriptionQueryMessage, new SubscriptionQueryBackpressure(FluxSink.OverflowStrategy.DROP), 1).handle(queryResponseMessage -> {
            QueryUpdateEmitter queryUpdateEmitter = this.chatQueryHandler.emitter;
            String str = TEST_PAYLOAD;
            queryUpdateEmitter.emit(String.class, (v1) -> {
                return r2.equals(v1);
            }, "Update1");
            arrayList.addAll((Collection) queryResponseMessage.getPayload());
        }, subscriptionQueryUpdateMessage -> {
            arrayList2.add((String) subscriptionQueryUpdateMessage.getPayload());
            throw new IllegalStateException("oops");
        });
        QueryUpdateEmitter queryUpdateEmitter = this.chatQueryHandler.emitter;
        String str = TEST_PAYLOAD;
        queryUpdateEmitter.emit(String.class, (v1) -> {
            return r2.equals(v1);
        }, "Update2");
        Assertions.assertEquals(Arrays.asList("Message1", "Message2", "Message3"), arrayList);
        Assertions.assertEquals(Collections.singletonList("Update1"), arrayList2);
        Assertions.assertTrue(this.queryUpdateEmitter.activeSubscriptions().isEmpty(), "Expected subscriptions to be cancelled");
    }

    @Test
    void subscriptionQueryResultHandleWhenThereIsAnErrorConsumingAnUpdate() {
        GenericSubscriptionQueryMessage genericSubscriptionQueryMessage = new GenericSubscriptionQueryMessage(TEST_PAYLOAD, "chatMessages", ResponseTypes.multipleInstancesOf(String.class), ResponseTypes.instanceOf(String.class));
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        this.queryBus.subscriptionQuery(genericSubscriptionQueryMessage, 1).handle(queryResponseMessage -> {
            arrayList.addAll((Collection) queryResponseMessage.getPayload());
        }, subscriptionQueryUpdateMessage -> {
            arrayList2.add((String) subscriptionQueryUpdateMessage.getPayload());
            throw new IllegalStateException("oops");
        });
        QueryUpdateEmitter queryUpdateEmitter = this.chatQueryHandler.emitter;
        String str = TEST_PAYLOAD;
        queryUpdateEmitter.emit(String.class, (v1) -> {
            return r2.equals(v1);
        }, "Update1");
        QueryUpdateEmitter queryUpdateEmitter2 = this.chatQueryHandler.emitter;
        String str2 = TEST_PAYLOAD;
        queryUpdateEmitter2.emit(String.class, (v1) -> {
            return r2.equals(v1);
        }, "Update2");
        Assertions.assertEquals(Arrays.asList("Message1", "Message2", "Message3"), arrayList);
        Assertions.assertEquals(Collections.singletonList("Update1"), arrayList2);
        Assertions.assertTrue(this.queryUpdateEmitter.activeSubscriptions().isEmpty(), "Expected subscriptions to be cancelled");
    }

    @Test
    void subscriptionQueryResultHandleWhenThereIsAnErrorConsumingABufferedUpdate() {
        GenericSubscriptionQueryMessage genericSubscriptionQueryMessage = new GenericSubscriptionQueryMessage(TEST_PAYLOAD, "chatMessages", ResponseTypes.multipleInstancesOf(String.class), ResponseTypes.instanceOf(String.class));
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        this.queryBus.subscriptionQuery(genericSubscriptionQueryMessage, 1).handle(queryResponseMessage -> {
            QueryUpdateEmitter queryUpdateEmitter = this.chatQueryHandler.emitter;
            String str = TEST_PAYLOAD;
            queryUpdateEmitter.emit(String.class, (v1) -> {
                return r2.equals(v1);
            }, "Update1");
            arrayList.addAll((Collection) queryResponseMessage.getPayload());
        }, subscriptionQueryUpdateMessage -> {
            arrayList2.add((String) subscriptionQueryUpdateMessage.getPayload());
            throw new IllegalStateException("oops");
        });
        QueryUpdateEmitter queryUpdateEmitter = this.chatQueryHandler.emitter;
        String str = TEST_PAYLOAD;
        queryUpdateEmitter.emit(String.class, (v1) -> {
            return r2.equals(v1);
        }, "Update2");
        Assertions.assertEquals(Arrays.asList("Message1", "Message2", "Message3"), arrayList);
        Assertions.assertEquals(Collections.singletonList("Update1"), arrayList2);
        Assertions.assertTrue(this.queryUpdateEmitter.activeSubscriptions().isEmpty(), "Expected subscriptions to be cancelled");
    }

    @Test
    void subscriptionQueryResultHandleWhenThereIsAnErrorOnInitialResult() {
        GenericSubscriptionQueryMessage genericSubscriptionQueryMessage = new GenericSubscriptionQueryMessage(TEST_PAYLOAD, "failingQuery", ResponseTypes.instanceOf(String.class), ResponseTypes.instanceOf(String.class));
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        this.queryBus.subscriptionQuery(genericSubscriptionQueryMessage).handle(queryResponseMessage -> {
            arrayList.add((String) queryResponseMessage.getPayload());
        }, subscriptionQueryUpdateMessage -> {
            arrayList2.add((String) subscriptionQueryUpdateMessage.getPayload());
        });
        QueryUpdateEmitter queryUpdateEmitter = this.chatQueryHandler.emitter;
        String str = TEST_PAYLOAD;
        queryUpdateEmitter.emit(String.class, (v1) -> {
            return r2.equals(v1);
        }, "Update1");
        QueryUpdateEmitter queryUpdateEmitter2 = this.chatQueryHandler.emitter;
        String str2 = TEST_PAYLOAD;
        queryUpdateEmitter2.emit(String.class, (v1) -> {
            return r2.equals(v1);
        }, "Update2");
        QueryUpdateEmitter queryUpdateEmitter3 = this.chatQueryHandler.emitter;
        String str3 = TEST_PAYLOAD;
        queryUpdateEmitter3.complete(String.class, (v1) -> {
            return r2.equals(v1);
        });
        Assertions.assertTrue(arrayList.isEmpty());
        Assertions.assertTrue(arrayList2.isEmpty());
    }

    @Test
    void subscriptionQueryResultHandleWhenThereIsAnErrorOnUpdate() {
        GenericSubscriptionQueryMessage genericSubscriptionQueryMessage = new GenericSubscriptionQueryMessage(TEST_PAYLOAD, "failingQuery", ResponseTypes.instanceOf(String.class), ResponseTypes.instanceOf(String.class));
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        this.queryBus.subscriptionQuery(genericSubscriptionQueryMessage).handle(queryResponseMessage -> {
            arrayList.add((String) queryResponseMessage.getPayload());
        }, subscriptionQueryUpdateMessage -> {
            arrayList2.add((String) subscriptionQueryUpdateMessage.getPayload());
        });
        QueryUpdateEmitter queryUpdateEmitter = this.chatQueryHandler.emitter;
        String str = TEST_PAYLOAD;
        queryUpdateEmitter.completeExceptionally(String.class, (v1) -> {
            return r2.equals(v1);
        }, new RuntimeException());
        QueryUpdateEmitter queryUpdateEmitter2 = this.chatQueryHandler.emitter;
        String str2 = TEST_PAYLOAD;
        queryUpdateEmitter2.emit(String.class, (v1) -> {
            return r2.equals(v1);
        }, "Update1");
        Assertions.assertTrue(arrayList.isEmpty());
        Assertions.assertTrue(arrayList2.isEmpty());
    }

    @Test
    void queryGatewayCorrectlyReturnsNullOnSubscriptionQueryWithNullInitialResult() throws ExecutionException, InterruptedException {
        Assertions.assertNull(DefaultQueryGateway.builder().queryBus(this.queryBus).build().subscriptionQuery(new SomeQuery("not found"), String.class, String.class).initialResult().toFuture().get());
    }

    @Test
    void queryGatewayCorrectlyReturnsOnSubscriptionQuery() throws ExecutionException, InterruptedException {
        Assertions.assertEquals(FOUND, (String) DefaultQueryGateway.builder().queryBus(this.queryBus).build().subscriptionQuery(new SomeQuery(FOUND), String.class, String.class).initialResult().toFuture().get());
    }

    static {
        $assertionsDisabled = !AbstractSubscriptionQueryTestSuite.class.desiredAssertionStatus();
    }
}
