package org.axonframework.queryhandling;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.LongStream;
import java.util.stream.Stream;
import org.axonframework.queryhandling.annotation.AnnotationQueryHandlerAdapter;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

/* loaded from: input_file:org/axonframework/queryhandling/StreamingQueryTest.class */
class StreamingQueryTest {
    private final SimpleQueryBus queryBus = SimpleQueryBus.builder().build();
    private final MyQueryHandler myQueryHandler = new MyQueryHandler();
    private final AnnotationQueryHandlerAdapter<MyQueryHandler> annotationQueryHandlerAdapter = new AnnotationQueryHandlerAdapter<>(this.myQueryHandler);
    private final ErrorQueryHandler errorQueryHandler = new ErrorQueryHandler();
    private final AnnotationQueryHandlerAdapter<ErrorQueryHandler> errorQueryHandlerAdapter = new AnnotationQueryHandlerAdapter<>(this.errorQueryHandler);
    private static final ConcurrentLinkedQueue<String> handlersInvoked = new ConcurrentLinkedQueue<>();

    /* loaded from: input_file:org/axonframework/queryhandling/StreamingQueryTest$ErrorQueryHandler.class */
    private static class ErrorQueryHandler {
        private ErrorQueryHandler() {
        }

        @QueryHandler(queryName = "listQuery")
        public Flux<String> listQuery(String str) {
            StreamingQueryTest.handlersInvoked.add("handler_error");
            throw new RuntimeException("ooops");
        }
    }

    /* loaded from: input_file:org/axonframework/queryhandling/StreamingQueryTest$MyQueryHandler.class */
    private static class MyQueryHandler {
        AtomicBoolean errorThrown;

        private MyQueryHandler() {
            this.errorThrown = new AtomicBoolean(false);
        }

        @QueryHandler(queryName = "fluxQuery")
        public Flux<String> fluxQuery(String str) {
            return Flux.just(new String[]{"a", "b", "c", "d"});
        }

        @QueryHandler(queryName = "listQuery")
        public List<String> listQuery(String str) {
            StreamingQueryTest.handlersInvoked.add("handler_healthy");
            return Arrays.asList("a", "b", "c", "d");
        }

        @QueryHandler(queryName = "streamQuery")
        public Stream<String> streamQuery(String str) {
            return Stream.of((Object[]) new String[]{"a", "b", "c", "d"});
        }

        @QueryHandler(queryName = "singleResultQuery")
        public String singleResultQuery(String str) {
            return "lonely";
        }

        @QueryHandler(queryName = "optionalResultQuery")
        public Optional<String> optionalResultQuery(String str) {
            return Optional.of("optional");
        }

        @QueryHandler(queryName = "emptyOptionalResultQuery")
        public Optional<String> emptyOptionalResultQuery(String str) {
            return Optional.empty();
        }

        @QueryHandler(queryName = "completableFutureQuery")
        public CompletableFuture<String> completableFutureQuery(String str) {
            return CompletableFuture.completedFuture("future");
        }

        @QueryHandler(queryName = "streamingAfterHandlerCompletesQuery")
        public Flux<Long> streamingAfterHandlerCompletesQuery(String str) {
            return Flux.interval(Duration.ofSeconds(1L)).take(5L);
        }

        @QueryHandler(queryName = "monoQuery")
        public Mono<String> monoQuery(String str) {
            return Mono.fromCallable(() -> {
                return "helloMono";
            }).delayElement(Duration.ofMillis(100L));
        }

        @QueryHandler(queryName = "nullQuery")
        public Flux<String> nullQuery(String str) {
            return null;
        }

        @QueryHandler(queryName = "exceptionQuery")
        public Flux<String> exceptionQuery(String str) {
            throw new RuntimeException("oops");
        }

        @QueryHandler(queryName = "throttledFluxQuery")
        public Flux<Long> throttledFlux(String str) {
            return Flux.interval(Duration.ofMillis(100L)).window(2).take(4L).flatMap(Function.identity());
        }

        @QueryHandler(queryName = "backPressure")
        public Flux<Long> backPressureQuery(String str) {
            return Flux.create(fluxSink -> {
                fluxSink.onRequest(j -> {
                    LongStream range = LongStream.range(0L, j);
                    Objects.requireNonNull(fluxSink);
                    range.forEach((v1) -> {
                        r1.next(v1);
                    });
                });
            });
        }

        @QueryHandler(queryName = "errorStream")
        public Flux<String> errorStream(String str) {
            return Flux.error(new RuntimeException("oops"));
        }

        @QueryHandler(queryName = "exceptionQueryOnce")
        public Flux<String> exceptionQueryOnce(String str) {
            if (this.errorThrown.compareAndSet(false, true)) {
                throw new RuntimeException("oops");
            }
            return Flux.just("correctNow");
        }
    }

    StreamingQueryTest() {
    }

    @BeforeEach
    void setUp() {
        this.annotationQueryHandlerAdapter.subscribe(this.queryBus);
    }

    @AfterEach
    void reset() {
        this.myQueryHandler.errorThrown.set(false);
    }

    private <Q, R> Flux<R> streamingQueryPayloads(StreamingQueryMessage<Q, R> streamingQueryMessage) {
        return streamingQuery(streamingQueryMessage).map((v0) -> {
            return v0.getPayload();
        });
    }

    private <Q, R> Flux<QueryResponseMessage<R>> streamingQuery(StreamingQueryMessage<Q, R> streamingQueryMessage) {
        return Flux.from(this.queryBus.streamingQuery(streamingQueryMessage));
    }

    @Test
    void streamingFluxResults() {
        StepVerifier.create(streamingQueryPayloads(new GenericStreamingQueryMessage("criteria", "fluxQuery", String.class))).expectNext("a", "b", "c", "d").verifyComplete();
    }

    @Test
    void switchHandlerOnError() {
        handlersInvoked.removeIf(str -> {
            return true;
        });
        this.errorQueryHandlerAdapter.subscribe(this.queryBus);
        StepVerifier.create(streamingQueryPayloads(new GenericStreamingQueryMessage("criteria", "listQuery", String.class))).expectNext("a", "b", "c", "d").verifyComplete();
        Assertions.assertEquals(Arrays.asList("handler_error", "handler_healthy"), new ArrayList(handlersInvoked));
    }

    @Test
    void optionalResults() {
        StepVerifier.create(streamingQueryPayloads(new GenericStreamingQueryMessage("criteria", "optionalResultQuery", String.class))).expectNext("optional").verifyComplete();
    }

    @Test
    void emptyOptionalResults() {
        StepVerifier.create(streamingQuery(new GenericStreamingQueryMessage("criteria", "emptyOptionalResultQuery", String.class))).expectComplete().verify();
    }

    @Test
    void streamingListResults() {
        StepVerifier.create(streamingQueryPayloads(new GenericStreamingQueryMessage("criteria", "listQuery", String.class))).expectNext("a", "b", "c", "d").verifyComplete();
    }

    @Test
    void streamingStreamResults() {
        StepVerifier.create(streamingQueryPayloads(new GenericStreamingQueryMessage("criteria", "streamQuery", String.class))).expectNext("a", "b", "c", "d").verifyComplete();
    }

    @Test
    void streamingSingleResult() {
        StepVerifier.create(streamingQueryPayloads(new GenericStreamingQueryMessage("criteria", "singleResultQuery", String.class))).expectNext("lonely").verifyComplete();
    }

    @Test
    void streamingCompletableFutureResult() {
        StepVerifier.create(streamingQueryPayloads(new GenericStreamingQueryMessage("criteria", "completableFutureQuery", String.class))).expectNext("future").verifyComplete();
    }

    @Test
    void streamingFluxAfterHandlerCompletes() {
        StepVerifier.create(streamingQueryPayloads(new GenericStreamingQueryMessage("criteria", "streamingAfterHandlerCompletesQuery", Long.class))).expectNext(0L, 1L, 2L, 3L, 4L).verifyComplete();
    }

    @Test
    void streamingMonoResult() {
        StepVerifier.create(streamingQueryPayloads(new GenericStreamingQueryMessage("criteria", "monoQuery", String.class))).expectNext("helloMono").verifyComplete();
    }

    @Test
    void streamingNullResult() {
        StepVerifier.create(streamingQueryPayloads(new GenericStreamingQueryMessage("criteria", "nullQuery", String.class))).expectComplete().verify();
    }

    @Test
    void errorResult() {
        StepVerifier.create(streamingQueryPayloads(new GenericStreamingQueryMessage("criteria", "exceptionQuery", String.class))).expectErrorMatches(th -> {
            return (th instanceof QueryExecutionException) && th.getMessage().startsWith("Error starting stream");
        }).verify();
    }

    @Test
    void throttledFluxQuery() {
        StepVerifier.create(streamingQueryPayloads(new GenericStreamingQueryMessage("criteria", "throttledFluxQuery", Long.class))).expectNext(new Long[]{0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L}).verifyComplete();
    }

    @Test
    void backpressureFluxQuery() {
        StepVerifier.create(streamingQueryPayloads(new GenericStreamingQueryMessage("criteria", "backPressure", Long.class)), 10L).expectNextCount(10L).thenRequest(10L).expectNextCount(10L).thenCancel().verify();
    }

    @Test
    void dispatchInterceptor() {
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        this.queryBus.registerDispatchInterceptor(list -> {
            atomicBoolean.set(true);
            return (num, queryMessage) -> {
                return queryMessage;
            };
        });
        StepVerifier.create(streamingQueryPayloads(new GenericStreamingQueryMessage("criteria", "fluxQuery", String.class))).expectNext("a", "b", "c", "d").verifyComplete();
        Assertions.assertTrue(atomicBoolean.get());
    }

    @Test
    void handlerInterceptor() {
        this.queryBus.registerHandlerInterceptor((unitOfWork, interceptorChain) -> {
            return ((Flux) interceptorChain.proceed()).map(obj -> {
                return "a";
            });
        });
        StepVerifier.create(streamingQueryPayloads(new GenericStreamingQueryMessage("criteria", "fluxQuery", String.class))).expectNext("a", "a", "a", "a").verifyComplete();
    }

    @Test
    void errorStream() {
        StepVerifier.create(streamingQueryPayloads(new GenericStreamingQueryMessage("criteria", "errorStream", String.class))).verifyErrorMatches(th -> {
            return (th instanceof RuntimeException) && th.getMessage().equals("oops");
        });
    }

    @Test
    void queryNotExists() {
        StepVerifier.create(streamingQueryPayloads(new GenericStreamingQueryMessage("criteria", "queryNotExists", String.class))).verifyErrorMatches(th -> {
            return th instanceof NoHandlerForQueryException;
        });
    }

    @Test
    void resubscribeWorksEvenWhenAnErrorHasBeenCashed() {
        Flux streamingQueryPayloads = streamingQueryPayloads(new GenericStreamingQueryMessage("criteria", "exceptionQueryOnce", String.class));
        StepVerifier.create(streamingQueryPayloads).expectErrorMatches(th -> {
            return (th instanceof QueryExecutionException) && th.getMessage().startsWith("Error starting stream");
        }).verify();
        StepVerifier.create(streamingQueryPayloads).expectNext("correctNow").verifyComplete();
    }
}
