package org.axonframework.messaging;

import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import org.axonframework.messaging.DelayedMessageStream;
import org.axonframework.messaging.MessageStream;
import org.axonframework.utils.MockException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import reactor.test.StepVerifier;

/* loaded from: input_file:org/axonframework/messaging/DelayedMessageStreamTest.class */
class DelayedMessageStreamTest extends MessageStreamTest<Message<String>> {

    @Nested
    /* loaded from: input_file:org/axonframework/messaging/DelayedMessageStreamTest$Create.class */
    class Create {
        Create() {
        }

        @Test
        void createForExecutionExceptionReturnsFailedMessageStreamWithCause() {
            RuntimeException runtimeException = new RuntimeException("oops");
            CompletableFuture thenApply = DelayedMessageStream.create(CompletableFuture.failedFuture(runtimeException)).first().asCompletableFuture().thenApply((v0) -> {
                return v0.message();
            });
            Assertions.assertTrue(thenApply.isCompletedExceptionally());
            Assertions.assertEquals(runtimeException, thenApply.exceptionNow());
        }

        @Test
        void entryBecomeVisibleWhenFutureCompletes_asCompletableFuture() {
            AtomicBoolean atomicBoolean = new AtomicBoolean(false);
            CompletableFuture completableFuture = new CompletableFuture();
            MessageStream whenComplete = DelayedMessageStream.create(completableFuture).whenComplete(() -> {
                atomicBoolean.set(true);
            });
            Assertions.assertFalse(whenComplete.first().asCompletableFuture().isDone());
            Assertions.assertFalse(atomicBoolean.get());
            completableFuture.complete(MessageStream.just(DelayedMessageStreamTest.this.createRandomMessage()));
            Assertions.assertTrue(whenComplete.first().asCompletableFuture().isDone());
            Assertions.assertTrue(atomicBoolean.get());
        }

        @Test
        void createEntryBecomeVisibleWhenFutureCompletes_asFlux() {
            AtomicBoolean atomicBoolean = new AtomicBoolean(false);
            Message<String> createRandomMessage = DelayedMessageStreamTest.this.createRandomMessage();
            CompletableFuture completableFuture = new CompletableFuture();
            StepVerifier.create(DelayedMessageStream.create(completableFuture).whenComplete(() -> {
                atomicBoolean.set(true);
            }).asFlux()).verifyTimeout(Duration.ofMillis(250L));
            Assertions.assertFalse(atomicBoolean.get());
            completableFuture.complete(MessageStream.just(createRandomMessage));
            StepVerifier.create(DelayedMessageStream.create(completableFuture).whenComplete(() -> {
                atomicBoolean.set(true);
            }).asFlux()).expectNextMatches(entry -> {
                return entry.message().equals(createRandomMessage);
            }).verifyComplete();
            Assertions.assertTrue(atomicBoolean.get());
        }

        @Test
        void reduceResultBecomesVisibleWhenFutureCompletes() {
            Message<String> createRandomMessage = DelayedMessageStreamTest.this.createRandomMessage();
            String str = ((String) createRandomMessage.getPayload()) + ((String) createRandomMessage.getPayload());
            MessageStream<Message<String>> completedTestSubject = DelayedMessageStreamTest.this.completedTestSubject(List.of(createRandomMessage, createRandomMessage));
            CompletableFuture completableFuture = new CompletableFuture();
            CompletableFuture reduce = DelayedMessageStream.create(completableFuture).reduce("", (str2, entry) -> {
                return str2 + ((String) entry.message().getPayload());
            });
            Assertions.assertFalse(reduce.isDone());
            completableFuture.complete(completedTestSubject);
            Assertions.assertTrue(reduce.isDone());
            Assertions.assertEquals(str, reduce.join());
        }

        @Test
        void closeWillCloseTheUnderlyingStreamWhenItResolves() {
            MessageStream create = DelayedMessageStream.create(new CompletableFuture());
            create.close();
            Assertions.assertTrue(create.isCompleted());
            Assertions.assertTrue(create.error().isPresent());
        }

        @Test
        void closeWillCloseTheUnderlyingStreamImmediatelyWhenItHasResolved() {
            CompletableFuture completableFuture = new CompletableFuture();
            MessageStream create = DelayedMessageStream.create(completableFuture);
            MessageStream messageStream = (MessageStream) Mockito.mock(new MessageStream[0]);
            completableFuture.complete(messageStream);
            create.close();
            ((MessageStream) Mockito.verify(messageStream)).close();
        }

        @Test
        void closeIsNotPropagatedWhenCompletableFutureCompletesExceptionally() {
            CompletableFuture completableFuture = new CompletableFuture();
            MessageStream create = DelayedMessageStream.create(completableFuture);
            completableFuture.completeExceptionally(new MockException("Simulating failure"));
            Objects.requireNonNull(create);
            Assertions.assertDoesNotThrow(create::close);
        }

        @Test
        void shouldReturnEmptyWhenCallingNextOnFailingFuture() {
            CompletableFuture completableFuture = new CompletableFuture();
            MessageStream create = DelayedMessageStream.create(completableFuture);
            Assertions.assertFalse(create.error().isPresent());
            Assertions.assertFalse(create.hasNextAvailable());
            completableFuture.completeExceptionally(new MockException("Simulating failure"));
            Assertions.assertFalse(create.hasNextAvailable());
            Assertions.assertFalse(create.next().isPresent());
            Assertions.assertTrue(create.isCompleted());
            Assertions.assertTrue(create.error().isPresent());
        }

        @Test
        void shouldReturnEmptyWhenCallingNextOnCancelledFuture() {
            CompletableFuture completableFuture = new CompletableFuture();
            MessageStream create = DelayedMessageStream.create(completableFuture);
            Assertions.assertFalse(create.error().isPresent());
            Assertions.assertFalse(create.hasNextAvailable());
            completableFuture.cancel(true);
            Assertions.assertFalse(create.hasNextAvailable());
            Assertions.assertFalse(create.next().isPresent());
            Assertions.assertTrue(create.isCompleted());
            Assertions.assertTrue(create.error().isPresent());
        }

        @Test
        void shouldForwardNextCallAsSoonAsDelegateResolved() {
            CompletableFuture completableFuture = new CompletableFuture();
            MessageStream create = DelayedMessageStream.create(completableFuture);
            Assertions.assertFalse(create.next().isPresent());
            Assertions.assertFalse(create.isCompleted());
            completableFuture.complete(MessageStream.just(DelayedMessageStreamTest.this.createRandomMessage()));
            Assertions.assertTrue(create.next().isPresent());
            Assertions.assertTrue(create.isCompleted());
        }
    }

    @Nested
    /* loaded from: input_file:org/axonframework/messaging/DelayedMessageStreamTest$CreateSingle.class */
    class CreateSingle {
        CreateSingle() {
        }

        @Test
        void createForExecutionExceptionReturnsFailedMessageStreamWithCause() {
            RuntimeException runtimeException = new RuntimeException("oops");
            CompletableFuture thenApply = DelayedMessageStream.createSingle(CompletableFuture.failedFuture(runtimeException)).first().asCompletableFuture().thenApply((v0) -> {
                return v0.message();
            });
            Assertions.assertTrue(thenApply.isCompletedExceptionally());
            Assertions.assertEquals(runtimeException, thenApply.exceptionNow());
        }

        @Test
        void entryBecomeVisibleWhenFutureCompletes_asCompletableFuture() {
            AtomicBoolean atomicBoolean = new AtomicBoolean(false);
            CompletableFuture completableFuture = new CompletableFuture();
            MessageStream.Single whenComplete = DelayedMessageStream.createSingle(completableFuture).whenComplete(() -> {
                atomicBoolean.set(true);
            });
            Assertions.assertFalse(whenComplete.first().asCompletableFuture().isDone());
            Assertions.assertFalse(atomicBoolean.get());
            completableFuture.complete(MessageStream.just(DelayedMessageStreamTest.this.createRandomMessage()));
            Assertions.assertTrue(whenComplete.first().asCompletableFuture().isDone());
            Assertions.assertTrue(atomicBoolean.get());
        }

        @Test
        void createEntryBecomeVisibleWhenFutureCompletes_asFlux() {
            AtomicBoolean atomicBoolean = new AtomicBoolean(false);
            Message<String> createRandomMessage = DelayedMessageStreamTest.this.createRandomMessage();
            CompletableFuture completableFuture = new CompletableFuture();
            StepVerifier.create(DelayedMessageStream.createSingle(completableFuture).whenComplete(() -> {
                atomicBoolean.set(true);
            }).asFlux()).verifyTimeout(Duration.ofMillis(250L));
            Assertions.assertFalse(atomicBoolean.get());
            completableFuture.complete(MessageStream.just(createRandomMessage));
            StepVerifier.create(DelayedMessageStream.createSingle(completableFuture).whenComplete(() -> {
                atomicBoolean.set(true);
            }).asFlux()).expectNextMatches(entry -> {
                return entry.message().equals(createRandomMessage);
            }).verifyComplete();
            Assertions.assertTrue(atomicBoolean.get());
        }

        @Test
        void reduceResultBecomesVisibleWhenFutureCompletes() {
            Message<String> createRandomMessage = DelayedMessageStreamTest.this.createRandomMessage();
            String str = (String) createRandomMessage.getPayload();
            MessageStream.Single<Message<String>> completedSingleStreamTestSubject = DelayedMessageStreamTest.this.completedSingleStreamTestSubject(createRandomMessage);
            CompletableFuture completableFuture = new CompletableFuture();
            CompletableFuture reduce = DelayedMessageStream.createSingle(completableFuture).reduce("", (str2, entry) -> {
                return str2 + ((String) entry.message().getPayload());
            });
            Assertions.assertFalse(reduce.isDone());
            completableFuture.complete(completedSingleStreamTestSubject);
            Assertions.assertTrue(reduce.isDone());
            Assertions.assertEquals(str, reduce.join());
        }

        @Test
        void closeWillCloseTheUnderlyingStreamWhenItResolves() {
            MessageStream.Single createSingle = DelayedMessageStream.createSingle(new CompletableFuture());
            createSingle.close();
            Assertions.assertTrue(createSingle.isCompleted());
            Assertions.assertTrue(createSingle.error().isPresent());
        }

        @Test
        void closeWillCloseTheUnderlyingStreamImmediatelyWhenItHasResolved() {
            CompletableFuture completableFuture = new CompletableFuture();
            MessageStream.Single createSingle = DelayedMessageStream.createSingle(completableFuture);
            MessageStream.Single single = (MessageStream.Single) Mockito.mock(new MessageStream.Single[0]);
            completableFuture.complete(single);
            createSingle.close();
            ((MessageStream.Single) Mockito.verify(single)).close();
        }

        @Test
        void closeIsNotPropagatedWhenCompletableFutureCompletesExceptionally() {
            CompletableFuture completableFuture = new CompletableFuture();
            MessageStream.Single createSingle = DelayedMessageStream.createSingle(completableFuture);
            completableFuture.completeExceptionally(new MockException("Simulating failure"));
            Objects.requireNonNull(createSingle);
            Assertions.assertDoesNotThrow(createSingle::close);
        }

        @Test
        void shouldReturnEmptyWhenCallingNextOnFailingFuture() {
            CompletableFuture completableFuture = new CompletableFuture();
            MessageStream.Single createSingle = DelayedMessageStream.createSingle(completableFuture);
            Assertions.assertFalse(createSingle.error().isPresent());
            Assertions.assertFalse(createSingle.hasNextAvailable());
            completableFuture.completeExceptionally(new MockException("Simulating failure"));
            Assertions.assertFalse(createSingle.hasNextAvailable());
            Assertions.assertFalse(createSingle.next().isPresent());
            Assertions.assertTrue(createSingle.isCompleted());
            Assertions.assertTrue(createSingle.error().isPresent());
        }

        @Test
        void shouldReturnEmptyWhenCallingNextOnCancelledFuture() {
            CompletableFuture completableFuture = new CompletableFuture();
            MessageStream.Single createSingle = DelayedMessageStream.createSingle(completableFuture);
            Assertions.assertFalse(createSingle.error().isPresent());
            Assertions.assertFalse(createSingle.hasNextAvailable());
            completableFuture.cancel(true);
            Assertions.assertFalse(createSingle.hasNextAvailable());
            Assertions.assertFalse(createSingle.next().isPresent());
            Assertions.assertTrue(createSingle.isCompleted());
            Assertions.assertTrue(createSingle.error().isPresent());
        }

        @Test
        void shouldForwardNextCallAsSoonAsDelegateResolved() {
            CompletableFuture completableFuture = new CompletableFuture();
            MessageStream.Single createSingle = DelayedMessageStream.createSingle(completableFuture);
            Assertions.assertFalse(createSingle.next().isPresent());
            Assertions.assertFalse(createSingle.isCompleted());
            completableFuture.complete(MessageStream.just(DelayedMessageStreamTest.this.createRandomMessage()));
            Assertions.assertTrue(createSingle.next().isPresent());
            Assertions.assertTrue(createSingle.isCompleted());
        }
    }

    DelayedMessageStreamTest() {
    }

    @Override // org.axonframework.messaging.MessageStreamTest
    MessageStream<Message<String>> completedTestSubject(List<Message<String>> list) {
        return DelayedMessageStream.create(CompletableFuture.completedFuture(MessageStream.fromIterable(list)));
    }

    @Override // org.axonframework.messaging.MessageStreamTest
    MessageStream.Single<Message<String>> completedSingleStreamTestSubject(Message<String> message) {
        return DelayedMessageStream.createSingle(CompletableFuture.completedFuture(MessageStream.just(message)));
    }

    @Override // org.axonframework.messaging.MessageStreamTest
    MessageStream.Empty<Message<String>> completedEmptyStreamTestSubject() {
        return new DelayedMessageStream.Empty(CompletableFuture.completedFuture(MessageStream.empty().cast()));
    }

    @Override // org.axonframework.messaging.MessageStreamTest
    MessageStream<Message<String>> failingTestSubject(List<Message<String>> list, Exception exc) {
        return DelayedMessageStream.create(CompletableFuture.completedFuture(MessageStream.fromIterable(list).concatWith(MessageStream.failed(exc))));
    }

    @Override // org.axonframework.messaging.MessageStreamTest
    Message<String> createRandomMessage() {
        return new GenericMessage(new MessageType("message"), "test-" + ThreadLocalRandom.current().nextInt(10000));
    }
}
