package org.axonframework.messaging;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import org.axonframework.messaging.MessageStream;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/axonframework/messaging/FluxMessageStreamTest.class */
class FluxMessageStreamTest extends MessageStreamTest<Message<String>> {
    FluxMessageStreamTest() {
    }

    @Override // org.axonframework.messaging.MessageStreamTest
    MessageStream<Message<String>> completedTestSubject(List<Message<String>> list) {
        return MessageStream.fromFlux(Flux.fromIterable(list));
    }

    @Override // org.axonframework.messaging.MessageStreamTest
    MessageStream.Single<Message<String>> completedSingleStreamTestSubject(Message<String> message) {
        Assumptions.abort("FluxMessageStream doesn't support explicit single-value streams");
        return null;
    }

    @Override // org.axonframework.messaging.MessageStreamTest
    MessageStream.Empty<Message<String>> completedEmptyStreamTestSubject() {
        Assumptions.abort("FluxMessageStream doesn't support explicitly empty streams");
        return null;
    }

    @Override // org.axonframework.messaging.MessageStreamTest
    protected MessageStream<Message<String>> uncompletedTestSubject(List<Message<String>> list, CompletableFuture<Void> completableFuture) {
        return MessageStream.fromFlux(Flux.fromIterable(list).concatWith(Flux.create(fluxSink -> {
            completableFuture.whenComplete((r4, th) -> {
                if (th != null) {
                    fluxSink.error(th);
                } else {
                    fluxSink.complete();
                }
            });
        })));
    }

    @Override // org.axonframework.messaging.MessageStreamTest
    MessageStream<Message<String>> failingTestSubject(List<Message<String>> list, Exception exc) {
        return MessageStream.fromFlux(Flux.fromIterable(list).concatWith(Mono.error(exc)));
    }

    @Override // org.axonframework.messaging.MessageStreamTest
    Message<String> createRandomMessage() {
        return new GenericMessage(new MessageType("message"), "test-" + ThreadLocalRandom.current().nextInt(10000));
    }

    @Test
    void testCallingCloseReleasesFluxSubscription() {
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        MessageStream fromFlux = MessageStream.fromFlux(Flux.fromIterable(List.of(createRandomMessage(), createRandomMessage(), createRandomMessage())).doOnCancel(() -> {
            atomicBoolean.set(true);
        }));
        Assertions.assertTrue(fromFlux.next().isPresent());
        Assertions.assertFalse(atomicBoolean.get());
        fromFlux.close();
        Assertions.assertTrue(atomicBoolean.get());
    }
}
