package org.axonframework.messaging;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Consumer;
import org.axonframework.messaging.MessageStream;
import org.axonframework.messaging.OnNextMessageStream;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import reactor.test.StepVerifier;

/* loaded from: input_file:org/axonframework/messaging/OnNextMessageStreamTest.class */
class OnNextMessageStreamTest extends MessageStreamTest<Message<String>> {
    private static final Consumer<MessageStream.Entry<Message<String>>> NO_OP_ON_NEXT = entry -> {
    };

    OnNextMessageStreamTest() {
    }

    @Override // org.axonframework.messaging.MessageStreamTest
    MessageStream<Message<String>> completedTestSubject(List<Message<String>> list) {
        return new OnNextMessageStream(MessageStream.fromIterable(list), NO_OP_ON_NEXT);
    }

    @Override // org.axonframework.messaging.MessageStreamTest
    MessageStream.Single<Message<String>> completedSingleStreamTestSubject(Message<String> message) {
        return new OnNextMessageStream.Single(MessageStream.just(message), NO_OP_ON_NEXT);
    }

    @Override // org.axonframework.messaging.MessageStreamTest
    MessageStream.Empty<Message<String>> completedEmptyStreamTestSubject() {
        Assumptions.abort("OnNextMessageStream does not support empty streams");
        return null;
    }

    @Override // org.axonframework.messaging.MessageStreamTest
    MessageStream<Message<String>> failingTestSubject(List<Message<String>> list, Exception exc) {
        return new OnNextMessageStream(MessageStream.fromIterable(list).concatWith(MessageStream.failed(exc)), NO_OP_ON_NEXT);
    }

    @Override // org.axonframework.messaging.MessageStreamTest
    Message<String> createRandomMessage() {
        return new GenericMessage(new MessageType("message"), "test-" + ThreadLocalRandom.current().nextInt(10000));
    }

    @Test
    void onNextNotInvokedOnEmptyStream() {
        Consumer consumer = (Consumer) Mockito.mock(new Consumer[0]);
        MessageStream.empty().onNext(consumer).first().asCompletableFuture().isDone();
        ((Consumer) Mockito.verify(consumer, Mockito.never())).accept((MessageStream.Entry) Mockito.any());
    }

    @Test
    void verifyOnNextInvokedForFirstElementWhenUsingOnCompletableFuture() {
        ArrayList arrayList = new ArrayList();
        Message<String> createRandomMessage = createRandomMessage();
        MessageStream fromIterable = MessageStream.fromIterable(List.of(createRandomMessage, createRandomMessage()));
        Objects.requireNonNull(arrayList);
        Assertions.assertTrue(fromIterable.onNext((v1) -> {
            r1.add(v1);
        }).first().asCompletableFuture().thenApply((v0) -> {
            return v0.message();
        }).isDone());
        Assertions.assertEquals(1, arrayList.size());
        Assertions.assertEquals(createRandomMessage, ((MessageStream.Entry) arrayList.getFirst()).message());
    }

    @Test
    void verifyOnNextInvokedForAllElementsWhenUsingAsFlux() {
        ArrayList arrayList = new ArrayList();
        Message<String> createRandomMessage = createRandomMessage();
        Message<String> createRandomMessage2 = createRandomMessage();
        MessageStream fromIterable = MessageStream.fromIterable(List.of(createRandomMessage, createRandomMessage2));
        Objects.requireNonNull(arrayList);
        StepVerifier.create(fromIterable.onNext((v1) -> {
            r1.add(v1);
        }).asFlux()).expectNextCount(2L).verifyComplete();
        Assertions.assertEquals(2, arrayList.size());
        Assertions.assertEquals(createRandomMessage, ((MessageStream.Entry) arrayList.getFirst()).message());
        Assertions.assertEquals(createRandomMessage2, ((MessageStream.Entry) arrayList.get(1)).message());
    }
}
