package org.axonframework.messaging;

import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import org.axonframework.common.FutureUtils;
import org.axonframework.messaging.Message;
import org.axonframework.messaging.MessageStream;
import org.axonframework.utils.MockException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import reactor.test.StepVerifier;

/* loaded from: input_file:org/axonframework/messaging/MessageStreamTest.class */
public abstract class MessageStreamTest<M extends Message<?>> {

    @Nested
    /* loaded from: input_file:org/axonframework/messaging/MessageStreamTest$Filter.class */
    class Filter {
        Filter() {
        }

        @Test
        void filterKeepsEntriesForWhichTrueIsReturned() {
            Message createRandomMessage = MessageStreamTest.this.createRandomMessage();
            MessageStream filter = MessageStreamTest.this.completedTestSubject(List.of(createRandomMessage, MessageStreamTest.this.createRandomMessage())).filter(entry -> {
                return entry.message().equals(createRandomMessage);
            });
            Optional next = filter.next();
            Assertions.assertTrue(next.isPresent());
            Assertions.assertEquals(createRandomMessage, ((MessageStream.Entry) next.get()).message());
            Assertions.assertFalse(filter.next().isPresent());
            Assertions.assertTrue(filter.isCompleted());
        }

        @Test
        void filterRemovesEntriesForWhichFalseIsReturned() {
            Message createRandomMessage = MessageStreamTest.this.createRandomMessage();
            Message createRandomMessage2 = MessageStreamTest.this.createRandomMessage();
            MessageStream filter = MessageStreamTest.this.completedTestSubject(List.of(createRandomMessage, createRandomMessage2)).filter(entry -> {
                return !entry.message().equals(createRandomMessage2);
            });
            Optional next = filter.next();
            Assertions.assertTrue(next.isPresent());
            Assertions.assertEquals(createRandomMessage, ((MessageStream.Entry) next.get()).message());
            Assertions.assertFalse(filter.next().isPresent());
            Assertions.assertTrue(filter.isCompleted());
        }
    }

    abstract MessageStream<M> completedTestSubject(List<M> list);

    abstract MessageStream.Single<M> completedSingleStreamTestSubject(M m);

    abstract MessageStream.Empty<M> completedEmptyStreamTestSubject();

    protected MessageStream<M> uncompletedTestSubject(List<M> list, CompletableFuture<Void> completableFuture) {
        return completedTestSubject(list).concatWith(DelayedMessageStream.create(completableFuture.thenApply(r2 -> {
            return MessageStream.empty();
        }).exceptionally((Function<Throwable, ? extends U>) MessageStream::failed)).cast());
    }

    abstract MessageStream<M> failingTestSubject(List<M> list, Exception exc);

    protected void publishAdditionalMessage(MessageStream<M> messageStream, M m) {
        Assumptions.abort("This implementation doesn't support delayed publishing");
    }

    abstract M createRandomMessage();

    @Test
    void shouldInvokeOnAvailableCallbackWhenMessagesAreAvailable() {
        MessageStream<M> completedTestSubject = completedTestSubject(List.of(createRandomMessage()));
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        completedTestSubject.onAvailable(() -> {
            atomicBoolean.set(true);
        });
        Assertions.assertTrue(atomicBoolean.get());
    }

    @Test
    void shouldInvokeOnAvailableCallbackWhenStreamIsCompleted() {
        MessageStream<M> completedTestSubject = completedTestSubject(List.of());
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        completedTestSubject.onAvailable(() -> {
            atomicBoolean.set(true);
        });
        Assertions.assertTrue(atomicBoolean.get());
    }

    @Test
    void shouldInvokeOnAvailableCallbackWhenCompletedExceptionally() {
        MessageStream<M> failingTestSubject = failingTestSubject(List.of(), new RuntimeException("Oops"));
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        failingTestSubject.onAvailable(() -> {
            atomicBoolean.set(true);
        });
        Assertions.assertTrue(atomicBoolean.get());
    }

    @Test
    void shouldReturnEmptyNextAndNoAvailableMessagesOnError() {
        MessageStream<M> failingTestSubject = failingTestSubject(List.of(), new RuntimeException("Oops"));
        Assertions.assertFalse(failingTestSubject.hasNextAvailable());
        Assertions.assertFalse(failingTestSubject.next().isPresent());
        Assertions.assertTrue(failingTestSubject.isCompleted());
    }

    @Test
    void shouldNotInvokeOnAvailableCallbackUntilCompleted() {
        MessageStream<M> uncompletedTestSubject = uncompletedTestSubject(List.of(), new CompletableFuture<>());
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        uncompletedTestSubject.onAvailable(() -> {
            atomicBoolean.set(true);
        });
        Assertions.assertFalse(atomicBoolean.get());
    }

    @Test
    void shouldInvokeCompletionCallbackWhenNextIsRequestedAfterCompletion() {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        MessageStream whenComplete = uncompletedTestSubject(List.of(), completableFuture).whenComplete(() -> {
            atomicBoolean.set(true);
        });
        while (whenComplete.next().isPresent()) {
            Assertions.assertFalse(atomicBoolean.get());
        }
        completableFuture.complete(null);
        Assertions.assertFalse(whenComplete.next().isPresent());
        Assertions.assertTrue(atomicBoolean.get());
    }

    @Test
    void shouldInvokeCompletionCallbackWhenOnAvailableIsRequestedAfterCompletion() {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        MessageStream whenComplete = uncompletedTestSubject(List.of(), completableFuture).whenComplete(() -> {
            atomicBoolean.set(true);
        });
        while (whenComplete.hasNextAvailable()) {
            Assertions.assertFalse(atomicBoolean.get());
            whenComplete.next();
        }
        completableFuture.complete(null);
        Assertions.assertFalse(whenComplete.hasNextAvailable());
        Assertions.assertTrue(atomicBoolean.get());
    }

    @Test
    void shouldInvokeCompletionCallbackOnceAllMessagesHaveBeenConsumed() {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        MessageStream whenComplete = uncompletedTestSubject(List.of(createRandomMessage()), completableFuture).whenComplete(() -> {
            atomicBoolean.set(true);
        });
        completableFuture.complete(null);
        Assertions.assertFalse(atomicBoolean.get());
        whenComplete.next();
        Assertions.assertFalse(atomicBoolean.get());
        Assertions.assertFalse(whenComplete.hasNextAvailable());
        Assertions.assertTrue(atomicBoolean.get());
    }

    @Test
    void shouldInvokeCompletionCallbackImmediatelyOnCompletedEmptyStream() {
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        completedEmptyStreamTestSubject().whenComplete(() -> {
            atomicBoolean.set(true);
        });
        Assertions.assertTrue(atomicBoolean.get());
    }

    @Test
    void shouldInvokeCompletionCallbackWhenStreamCompletesEmpty() {
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        uncompletedTestSubject(List.of(), completableFuture).whenComplete(() -> {
            atomicBoolean.set(true);
        });
        Assertions.assertFalse(atomicBoolean.get());
        completableFuture.complete(null);
        Assertions.assertTrue(atomicBoolean.get());
    }

    @Test
    void shouldNotInvokeOnAvailableCallbackWhenNotCompletedAndAvailableMessageIsConsumed() {
        MessageStream<M> uncompletedTestSubject = uncompletedTestSubject(List.of(createRandomMessage()), new CompletableFuture<>());
        Assertions.assertTrue(uncompletedTestSubject.next().isPresent());
        Assertions.assertFalse(uncompletedTestSubject.next().isPresent());
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        uncompletedTestSubject.onAvailable(() -> {
            atomicBoolean.set(true);
        });
        Assertions.assertFalse(atomicBoolean.get());
    }

    @Test
    void shouldNotInvokeOnAvailableCallbackWhenNotCompletedAndAvailableMessagesAreConsumed() {
        MessageStream<M> uncompletedTestSubject = uncompletedTestSubject(List.of(createRandomMessage(), createRandomMessage()), new CompletableFuture<>());
        Assertions.assertTrue(uncompletedTestSubject.next().isPresent());
        Assertions.assertTrue(uncompletedTestSubject.next().isPresent());
        Assertions.assertFalse(uncompletedTestSubject.next().isPresent());
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        uncompletedTestSubject.onAvailable(() -> {
            atomicBoolean.set(true);
        });
        Assertions.assertFalse(atomicBoolean.get());
    }

    @Test
    void shouldInvokeOnAvailableCallbackWhenNotCompletedAndNotAllAvailableMessagesAreConsumed() {
        MessageStream<M> uncompletedTestSubject = uncompletedTestSubject(List.of(createRandomMessage(), createRandomMessage()), new CompletableFuture<>());
        Assertions.assertTrue(uncompletedTestSubject.next().isPresent());
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        uncompletedTestSubject.onAvailable(() -> {
            atomicBoolean.set(true);
        });
        Assertions.assertTrue(atomicBoolean.get());
    }

    @Test
    void shouldEmitOriginalExceptionAsFailure() {
        CompletableFuture asCompletableFuture = failingTestSubject(List.of(), new MockException()).first().asCompletableFuture();
        Assertions.assertTrue(asCompletableFuture.isCompletedExceptionally());
        Assertions.assertInstanceOf(MockException.class, asCompletableFuture.exceptionNow());
    }

    @Test
    void shouldCompleteWithNullOnEmptyList() {
        Assertions.assertNull(completedTestSubject(Collections.emptyList()).first().asCompletableFuture().resultNow());
    }

    @Test
    void shouldCompleteWithNullOnEmptyStream() {
        Assertions.assertNull(completedEmptyStreamTestSubject().first().asCompletableFuture().resultNow());
    }

    @Test
    void shouldMapSingleEntry_asCompletableFuture() {
        M createRandomMessage = createRandomMessage();
        M createRandomMessage2 = createRandomMessage();
        Assertions.assertSame(createRandomMessage2, ((MessageStream.Entry) completedTestSubject(List.of(createRandomMessage)).map(entry -> {
            return entry.map(message -> {
                return createRandomMessage2;
            });
        }).first().asCompletableFuture().join()).message());
    }

    @Test
    void shouldMapSingleEntry_asCompletableFuture_ExplicitlyDeclaredSingle() {
        M createRandomMessage = createRandomMessage();
        M createRandomMessage2 = createRandomMessage();
        Assertions.assertSame(createRandomMessage2, ((MessageStream.Entry) completedSingleStreamTestSubject(createRandomMessage).map(entry -> {
            return entry.map(message -> {
                return createRandomMessage2;
            });
        }).first().asCompletableFuture().join()).message());
    }

    @Test
    void shouldMapSingleMessage_asCompletableFuture() {
        M createRandomMessage = createRandomMessage();
        M createRandomMessage2 = createRandomMessage();
        Assertions.assertSame(createRandomMessage2, ((MessageStream.Entry) completedTestSubject(List.of(createRandomMessage)).mapMessage(message -> {
            return createRandomMessage2;
        }).first().asCompletableFuture().join()).message());
    }

    @Test
    void shouldMapSingleMessage_asCompletableFuture_ExplicitlyDeclaredSingle() {
        M createRandomMessage = createRandomMessage();
        M createRandomMessage2 = createRandomMessage();
        Assertions.assertSame(createRandomMessage2, ((MessageStream.Entry) completedSingleStreamTestSubject(createRandomMessage).mapMessage(message -> {
            return createRandomMessage2;
        }).first().asCompletableFuture().join()).message());
    }

    @Test
    void shouldMapSingleEntry_asFlux() {
        M createRandomMessage = createRandomMessage();
        M createRandomMessage2 = createRandomMessage();
        StepVerifier.create(completedTestSubject(List.of(createRandomMessage)).map(entry -> {
            return entry.map(message -> {
                return createRandomMessage2;
            });
        }).asFlux()).expectNextMatches(entry2 -> {
            return entry2.message().equals(createRandomMessage2);
        }).verifyComplete();
    }

    @Test
    void shouldMapSingleEntry_asMono() {
        M createRandomMessage = createRandomMessage();
        M createRandomMessage2 = createRandomMessage();
        StepVerifier.create(completedSingleStreamTestSubject(createRandomMessage).map(entry -> {
            return entry.map(message -> {
                return createRandomMessage2;
            });
        }).asMono()).expectNextMatches(entry2 -> {
            return entry2.message().equals(createRandomMessage2);
        }).verifyComplete();
    }

    @Test
    void shouldMapSingleMessage_asFlux() {
        M createRandomMessage = createRandomMessage();
        M createRandomMessage2 = createRandomMessage();
        StepVerifier.create(completedTestSubject(List.of(createRandomMessage)).mapMessage(message -> {
            return createRandomMessage2;
        }).asFlux()).expectNextMatches(entry -> {
            return entry.message().equals(createRandomMessage2);
        }).verifyComplete();
    }

    @Test
    void shouldMapSingleMessage_asMono() {
        M createRandomMessage = createRandomMessage();
        M createRandomMessage2 = createRandomMessage();
        StepVerifier.create(completedSingleStreamTestSubject(createRandomMessage).mapMessage(message -> {
            return createRandomMessage2;
        }).asMono()).expectNextMatches(entry -> {
            return entry.message().equals(createRandomMessage2);
        }).verifyComplete();
    }

    @Test
    void shouldMapMultipleEntries_asFlux() {
        M createRandomMessage = createRandomMessage();
        M createRandomMessage2 = createRandomMessage();
        M createRandomMessage3 = createRandomMessage();
        M createRandomMessage4 = createRandomMessage();
        StepVerifier.create(completedTestSubject(List.of(createRandomMessage, createRandomMessage3)).map(entry -> {
            return entry.map(message -> {
                return message == createRandomMessage ? createRandomMessage2 : createRandomMessage4;
            });
        }).asFlux()).expectNextMatches(entry2 -> {
            return entry2.message().equals(createRandomMessage2);
        }).expectNextMatches(entry3 -> {
            return entry3.message().equals(createRandomMessage4);
        }).verifyComplete();
    }

    @Test
    void shouldMapMultipleMessages_asFlux() {
        M createRandomMessage = createRandomMessage();
        M createRandomMessage2 = createRandomMessage();
        M createRandomMessage3 = createRandomMessage();
        M createRandomMessage4 = createRandomMessage();
        StepVerifier.create(completedTestSubject(List.of(createRandomMessage, createRandomMessage3)).mapMessage(message -> {
            return message == createRandomMessage ? createRandomMessage2 : createRandomMessage4;
        }).asFlux()).expectNextMatches(entry -> {
            return entry.message().equals(createRandomMessage2);
        }).expectNextMatches(entry2 -> {
            return entry2.message().equals(createRandomMessage4);
        }).verifyComplete();
    }

    @Test
    void shouldMapEntriesUntilFailure_asFlux() {
        M createRandomMessage = createRandomMessage();
        M createRandomMessage2 = createRandomMessage();
        StepVerifier.Step expectNextMatches = StepVerifier.create(failingTestSubject(List.of(createRandomMessage), new MockException()).map(entry -> {
            return entry.map(message -> {
                return createRandomMessage2;
            });
        }).onErrorContinue(MessageStream::failed).asFlux()).expectNextMatches(entry2 -> {
            return entry2.message().equals(createRandomMessage2);
        });
        Class<MockException> cls = MockException.class;
        Objects.requireNonNull(MockException.class);
        expectNextMatches.expectErrorMatches((v1) -> {
            return r1.isInstance(v1);
        }).verify();
    }

    @Test
    void shouldMapMessagesUntilFailure_asFlux() {
        M createRandomMessage = createRandomMessage();
        M createRandomMessage2 = createRandomMessage();
        StepVerifier.Step expectNextMatches = StepVerifier.create(failingTestSubject(List.of(createRandomMessage), new MockException()).mapMessage(message -> {
            return createRandomMessage2;
        }).onErrorContinue(MessageStream::failed).asFlux()).expectNextMatches(entry -> {
            return entry.message().equals(createRandomMessage2);
        });
        Class<MockException> cls = MockException.class;
        Objects.requireNonNull(MockException.class);
        expectNextMatches.expectErrorMatches((v1) -> {
            return r1.isInstance(v1);
        }).verify();
    }

    @Test
    void shouldNotCallMapperForEmptyStream_asCompletableFuture() {
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        MessageStream.Entry entry = (MessageStream.Entry) completedTestSubject(List.of()).map(entry2 -> {
            atomicBoolean.set(true);
            return entry2;
        }).first().asCompletableFuture().join();
        Assertions.assertFalse(atomicBoolean.get(), "Mapper function should not be invoked for empty streams");
        Assertions.assertNull(entry, "Expected null value from empty stream");
    }

    @Test
    void shouldNotCallMessageMapperForEmptyStream_asCompletableFuture() {
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        MessageStream.Entry entry = (MessageStream.Entry) completedTestSubject(List.of()).mapMessage(message -> {
            atomicBoolean.set(true);
            return message;
        }).first().asCompletableFuture().join();
        Assertions.assertFalse(atomicBoolean.get(), "Mapper function should not be invoked for empty streams");
        Assertions.assertNull(entry, "Expected null value from empty stream");
    }

    @Test
    void shouldNotCallMapperForEmptyStream_asCompletableFuture_ExplicitlyDeclaredEmpty() {
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        MessageStream.Entry entry = (MessageStream.Entry) completedEmptyStreamTestSubject().map(entry2 -> {
            atomicBoolean.set(true);
            return entry2;
        }).first().asCompletableFuture().join();
        Assertions.assertFalse(atomicBoolean.get(), "Mapper function should not be invoked for empty streams");
        Assertions.assertNull(entry, "Expected null value from empty stream");
    }

    @Test
    void shouldNotCallMapperForEmptyStream_asFlux() {
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        StepVerifier.create(completedTestSubject(List.of()).map(entry -> {
            atomicBoolean.set(true);
            return entry;
        }).asFlux()).verifyComplete();
        Assertions.assertFalse(atomicBoolean.get(), "Mapper function should not be invoked for empty streams");
    }

    @Test
    void shouldNotCallMapperForEmptyStream_asFlux_ExplicitlyDeclaredEmpty() {
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        StepVerifier.create(completedEmptyStreamTestSubject().map(entry -> {
            atomicBoolean.set(true);
            return entry;
        }).asFlux()).verifyComplete();
        Assertions.assertFalse(atomicBoolean.get(), "Mapper function should not be invoked for empty streams");
    }

    @Test
    void shouldNotCallMapperForEmptyStream_asMono_ExplicitlyDeclaredEmpty() {
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        StepVerifier.create(completedEmptyStreamTestSubject().map(entry -> {
            atomicBoolean.set(true);
            return entry;
        }).asMono()).verifyComplete();
        Assertions.assertFalse(atomicBoolean.get(), "Mapper function should not be invoked for empty streams");
    }

    @Test
    void shouldNotCallMessageMapperForEmptyStream_asFlux() {
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        StepVerifier.create(completedTestSubject(List.of()).mapMessage(message -> {
            atomicBoolean.set(true);
            return message;
        }).asFlux()).verifyComplete();
        Assertions.assertFalse(atomicBoolean.get(), "Mapper function should not be invoked for empty streams");
    }

    @Test
    void shouldNotCallMessageMapperForEmptyStream_asFlux_ExplicitlyDeclaredEmpty() {
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        StepVerifier.create(completedEmptyStreamTestSubject().mapMessage(message -> {
            atomicBoolean.set(true);
            return message;
        }).asFlux()).verifyComplete();
        Assertions.assertFalse(atomicBoolean.get(), "Mapper function should not be invoked for empty streams");
    }

    @Test
    void shouldNotCallMessageMapperForEmptyStream_asMono_ExplicitlyDeclaredEmpty() {
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        StepVerifier.create(completedEmptyStreamTestSubject().mapMessage(message -> {
            atomicBoolean.set(true);
            return message;
        }).asMono()).verifyComplete();
        Assertions.assertFalse(atomicBoolean.get(), "Mapper function should not be invoked for empty streams");
    }

    @Test
    void shouldNotCallMapperForFailedStream() {
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        Assertions.assertTrue(failingTestSubject(List.of(), new MockException()).map(entry -> {
            atomicBoolean.set(true);
            return entry;
        }).first().asCompletableFuture().isCompletedExceptionally());
        Assertions.assertFalse(atomicBoolean.get(), "Mapper function should not be invoked for empty streams");
    }

    @Test
    void shouldNotCallMessageMapperForFailedStream() {
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        Assertions.assertTrue(failingTestSubject(List.of(), new MockException()).mapMessage(message -> {
            atomicBoolean.set(true);
            return message;
        }).first().asCompletableFuture().isCompletedExceptionally());
        Assertions.assertFalse(atomicBoolean.get(), "Mapper function should not be invoked for empty streams");
    }

    @Test
    void shouldReduceToExpectedResult() {
        M createRandomMessage = createRandomMessage();
        String str = createRandomMessage.getPayload().toString() + createRandomMessage.getPayload().toString();
        CompletableFuture reduce = completedTestSubject(List.of(createRandomMessage, createRandomMessage)).reduce("", (str2, entry) -> {
            return entry.message().getPayload().toString() + entry.message().getPayload().toString();
        });
        Assertions.assertTrue(reduce.isDone());
        Assertions.assertEquals(str, reduce.join());
    }

    @Test
    void errorInReduceFunctionLeadsToFailedStream() {
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        RuntimeException runtimeException = new RuntimeException("oops");
        CompletableFuture reduce = completedTestSubject(List.of(createRandomMessage(), createRandomMessage())).reduce("", (str, entry) -> {
            atomicBoolean.set(true);
            throw runtimeException;
        });
        Assertions.assertTrue(reduce.isCompletedExceptionally());
        Assertions.assertEquals(runtimeException, reduce.exceptionNow());
        Assertions.assertTrue(atomicBoolean.get());
    }

    @Test
    void shouldReturnIdentityWhenReducingEmptyStream() {
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        CompletableFuture reduce = completedTestSubject(List.of()).reduce("42", (str, entry) -> {
            atomicBoolean.set(true);
            return entry.message().getPayload().toString() + entry.message().getPayload().toString();
        });
        Assertions.assertTrue(reduce.isDone());
        Assertions.assertEquals("42", reduce.join());
        Assertions.assertFalse(atomicBoolean.get());
    }

    @Test
    void shouldNotReduceForEmptyFailingStream() {
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        RuntimeException runtimeException = new RuntimeException("oops");
        CompletableFuture reduce = failingTestSubject(List.of(), runtimeException).reduce("", (str, entry) -> {
            atomicBoolean.set(true);
            return entry.message().getPayload().toString() + entry.message().getPayload().toString();
        });
        Assertions.assertTrue(reduce.isCompletedExceptionally());
        Assertions.assertEquals(runtimeException, reduce.exceptionNow());
        Assertions.assertFalse(atomicBoolean.get());
    }

    @Test
    void shouldCompleteExceptionallyAfterReducingForFailedStream() {
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        RuntimeException runtimeException = new RuntimeException("oops");
        CompletableFuture reduce = failingTestSubject(List.of(createRandomMessage(), createRandomMessage()), runtimeException).reduce("", (str, entry) -> {
            atomicBoolean.set(true);
            return entry.message().getPayload().toString() + entry.message().getPayload().toString();
        });
        Assertions.assertTrue(reduce.isCompletedExceptionally());
        Assertions.assertEquals(runtimeException, reduce.exceptionNow());
        Assertions.assertTrue(atomicBoolean.get());
    }

    @Test
    void shouldInvokeOnNextHandler_asCompletableFuture() {
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        M createRandomMessage = createRandomMessage();
        CompletableFuture asCompletableFuture = completedTestSubject(List.of(createRandomMessage)).onNext(entry -> {
            atomicBoolean.set(true);
        }).first().asCompletableFuture();
        Assertions.assertTrue(asCompletableFuture.isDone());
        Assertions.assertEquals(createRandomMessage, ((MessageStream.Entry) asCompletableFuture.join()).message());
        Assertions.assertTrue(atomicBoolean.get());
    }

    @Test
    void shouldInvokeOnNextHandler_asFlux() {
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        M createRandomMessage = createRandomMessage();
        StepVerifier.create(completedTestSubject(List.of(createRandomMessage)).onNext(entry -> {
            atomicBoolean.set(true);
        }).asFlux()).expectNextMatches(entry2 -> {
            return entry2.message().equals(createRandomMessage);
        }).verifyComplete();
        Assertions.assertTrue(atomicBoolean.get());
    }

    @Test
    void shouldReturnFirstEntryFromOnErrorStream_asCompletableFuture() {
        RuntimeException runtimeException = new RuntimeException("oops");
        M createRandomMessage = createRandomMessage();
        MessageStream<M> completedTestSubject = completedTestSubject(List.of(createRandomMessage));
        CompletableFuture asCompletableFuture = failingTestSubject(List.of(), runtimeException).onErrorContinue(th -> {
            Assertions.assertEquals(runtimeException, FutureUtils.unwrap(th));
            return completedTestSubject;
        }).first().asCompletableFuture();
        Assertions.assertTrue(asCompletableFuture.isDone());
        Assertions.assertEquals(createRandomMessage, ((MessageStream.Entry) asCompletableFuture.join()).message());
    }

    @Test
    void shouldContinueOnSecondStreamOnError_asFlux() {
        RuntimeException runtimeException = new RuntimeException("oops");
        M createRandomMessage = createRandomMessage();
        M createRandomMessage2 = createRandomMessage();
        MessageStream<M> completedTestSubject = completedTestSubject(List.of(createRandomMessage2));
        StepVerifier.create(failingTestSubject(List.of(createRandomMessage), runtimeException).onErrorContinue(th -> {
            Assertions.assertEquals(runtimeException, FutureUtils.unwrap(th));
            return completedTestSubject;
        }).asFlux()).expectNextMatches(entry -> {
            return entry.message().equals(createRandomMessage);
        }).expectNextMatches(entry2 -> {
            return entry2.message().equals(createRandomMessage2);
        }).verifyComplete();
    }

    @Test
    void shouldMoveToConcatWithStream_asCompletableFuture_returnFirstEntryFromFirstStream() {
        M createRandomMessage = createRandomMessage();
        CompletableFuture asCompletableFuture = completedTestSubject(List.of(createRandomMessage)).concatWith(completedTestSubject(List.of())).first().asCompletableFuture();
        Assertions.assertTrue(asCompletableFuture.isDone());
        Assertions.assertEquals(createRandomMessage, ((MessageStream.Entry) asCompletableFuture.join()).message());
    }

    @Test
    void shouldMoveToConcatWithStream_asCompletableFuture_returnFirstEntryFromSecondStream() {
        M createRandomMessage = createRandomMessage();
        CompletableFuture asCompletableFuture = completedTestSubject(List.of()).concatWith(completedTestSubject(List.of(createRandomMessage))).first().asCompletableFuture();
        Assertions.assertTrue(asCompletableFuture.isDone());
        Assertions.assertEquals(createRandomMessage, ((MessageStream.Entry) asCompletableFuture.join()).message());
    }

    @Test
    void shouldMoveToConcatWithStream_asFlux() {
        M createRandomMessage = createRandomMessage();
        M createRandomMessage2 = createRandomMessage();
        StepVerifier.create(completedTestSubject(List.of(createRandomMessage)).concatWith(completedTestSubject(List.of(createRandomMessage2))).asFlux()).expectNextMatches(entry -> {
            return entry.message().equals(createRandomMessage);
        }).expectNextMatches(entry2 -> {
            return entry2.message().equals(createRandomMessage2);
        }).verifyComplete();
    }

    @Test
    void shouldInvokeCompletionCallback_asCompletableFuture() {
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        completedTestSubject(List.of()).whenComplete(() -> {
            atomicBoolean.set(true);
        }).first().asCompletableFuture().join();
        Assertions.assertTrue(atomicBoolean.get());
    }

    @Test
    void shouldNotInvokeCompletionCallbackForFailedStream_asCompletableFuture() {
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        RuntimeException runtimeException = new RuntimeException("oops");
        CompletableFuture asCompletableFuture = failingTestSubject(List.of(), runtimeException).whenComplete(() -> {
            atomicBoolean.set(true);
        }).first().asCompletableFuture();
        Assertions.assertTrue(asCompletableFuture.isCompletedExceptionally());
        Assertions.assertEquals(runtimeException, asCompletableFuture.exceptionNow());
        Assertions.assertFalse(atomicBoolean.get());
    }

    @Test
    void shouldInvokeCompletionCallback_asFlux() {
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        StepVerifier.create(completedTestSubject(List.of()).whenComplete(() -> {
            atomicBoolean.set(true);
        }).asFlux()).verifyComplete();
        Assertions.assertTrue(atomicBoolean.get());
    }

    @Test
    void shouldInvokeCompletionCallback_asMono() {
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        StepVerifier.create(completedSingleStreamTestSubject(createRandomMessage()).whenComplete(() -> {
            atomicBoolean.set(true);
        }).asMono()).expectNextCount(1L).verifyComplete();
        Assertions.assertTrue(atomicBoolean.get());
    }

    @Test
    void shouldNotInvokeCompletionCallbackForFailedStream_asFlux() {
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        RuntimeException runtimeException = new RuntimeException("oops");
        StepVerifier.create(failingTestSubject(List.of(), runtimeException).whenComplete(() -> {
            atomicBoolean.set(true);
        }).asFlux()).verifyErrorMatches(th -> {
            return th == runtimeException;
        });
        Assertions.assertFalse(atomicBoolean.get());
    }

    @Test
    void shouldNotInvokeCompletionCallbackForFailedStream_asMono() {
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        RuntimeException runtimeException = new RuntimeException("oops");
        MessageStream.Single failingTestSubject = failingTestSubject(List.of(), runtimeException);
        Assumptions.assumeTrue(failingTestSubject instanceof MessageStream.Single);
        StepVerifier.create(failingTestSubject.whenComplete(() -> {
            atomicBoolean.set(true);
        }).asMono()).verifyErrorMatches(th -> {
            return th == runtimeException;
        });
        Assertions.assertFalse(atomicBoolean.get());
    }

    @Test
    void shouldResultInFailedStreamWhenCompletionCallbackThrowsAnException_asFlux() {
        RuntimeException runtimeException = new RuntimeException("oops");
        M createRandomMessage = createRandomMessage();
        StepVerifier.Step expectNextMatches = StepVerifier.create(completedTestSubject(List.of(createRandomMessage)).whenComplete(() -> {
            throw runtimeException;
        }).asFlux()).expectNextMatches(entry -> {
            return entry.message().equals(createRandomMessage);
        });
        Objects.requireNonNull(runtimeException);
        expectNextMatches.verifyErrorMatches((v1) -> {
            return r1.equals(v1);
        });
    }

    @Test
    void shouldInvokeCallbackOnceAdditionalMessagesBecomeAvailable() {
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        MessageStream<M> uncompletedTestSubject = uncompletedTestSubject(List.of(), new CompletableFuture<>());
        uncompletedTestSubject.onAvailable(() -> {
            atomicBoolean.set(true);
        });
        Assertions.assertFalse(atomicBoolean.get());
        publishAdditionalMessage(uncompletedTestSubject, createRandomMessage());
        Assertions.assertTrue(atomicBoolean.get());
    }

    @Test
    void shouldCallCloseWhenConsumingOnlyTheFirstMessage() {
        MessageStream messageStream = (MessageStream) Mockito.mock(new MessageStream[0]);
        MessageStream.Single first = completedTestSubject(List.of(createRandomMessage(), createRandomMessage())).concatWith(messageStream).first();
        Assertions.assertTrue(first.next().isPresent());
        Assertions.assertFalse(first.hasNextAvailable());
        Assertions.assertFalse(first.error().isPresent());
        Assertions.assertFalse(first.next().isPresent());
        Assertions.assertFalse(first.hasNextAvailable());
        Assertions.assertFalse(first.next().isPresent());
        Assertions.assertTrue(first.isCompleted());
        ((MessageStream) Mockito.verify(messageStream)).close();
    }

    @Test
    void shouldCallCloseWhenConsumingFirstAsCompletableFuture() {
        MessageStream messageStream = (MessageStream) Mockito.mock(new MessageStream[0]);
        CompletableFuture asCompletableFuture = completedTestSubject(List.of(createRandomMessage(), createRandomMessage())).concatWith(messageStream).first().asCompletableFuture();
        Assertions.assertTrue(asCompletableFuture.isDone());
        Assertions.assertNotNull(asCompletableFuture.getNow(null));
        ((MessageStream) Mockito.verify(messageStream)).close();
    }
}
