package org.axonframework.messaging;

import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.GenericEventMessage;
import org.axonframework.messaging.MessageStream;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/axonframework/messaging/QueueMessageStreamTest.class */
class QueueMessageStreamTest extends MessageStreamTest<EventMessage<String>> {
    QueueMessageStreamTest() {
    }

    @Override // org.axonframework.messaging.MessageStreamTest
    MessageStream<EventMessage<String>> completedTestSubject(List<EventMessage<String>> list) {
        QueueMessageStream queueMessageStream = new QueueMessageStream();
        list.forEach(eventMessage -> {
            queueMessageStream.offer(eventMessage, Context.empty());
        });
        queueMessageStream.complete();
        return queueMessageStream;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // org.axonframework.messaging.MessageStreamTest
    public MessageStream.Single<EventMessage<String>> completedSingleStreamTestSubject(EventMessage<String> eventMessage) {
        Assumptions.abort("QueueMessageStream does not support explicit single-item streams");
        return null;
    }

    @Override // org.axonframework.messaging.MessageStreamTest
    MessageStream.Empty<EventMessage<String>> completedEmptyStreamTestSubject() {
        Assumptions.abort("QueueMessageStream does not support explicit zero-item streams");
        return null;
    }

    /* renamed from: uncompletedTestSubject, reason: avoid collision after fix types in other method */
    protected QueueMessageStream<EventMessage<String>> uncompletedTestSubject2(List<EventMessage<String>> list, CompletableFuture<Void> completableFuture) {
        QueueMessageStream<EventMessage<String>> queueMessageStream = new QueueMessageStream<>();
        list.forEach(eventMessage -> {
            queueMessageStream.offer(eventMessage, Context.empty());
        });
        completableFuture.whenComplete((r4, th) -> {
            if (th != null) {
                queueMessageStream.completeExceptionally(th);
            } else {
                queueMessageStream.complete();
            }
        });
        return queueMessageStream;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.axonframework.messaging.MessageStreamTest
    public void publishAdditionalMessage(MessageStream<EventMessage<String>> messageStream, EventMessage<String> eventMessage) {
        ((QueueMessageStream) messageStream).offer(eventMessage, Context.empty());
    }

    @Override // org.axonframework.messaging.MessageStreamTest
    MessageStream<EventMessage<String>> failingTestSubject(List<EventMessage<String>> list, Exception exc) {
        QueueMessageStream queueMessageStream = new QueueMessageStream();
        list.forEach(eventMessage -> {
            queueMessageStream.offer(eventMessage, Context.empty());
        });
        queueMessageStream.completeExceptionally(exc);
        return queueMessageStream;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // org.axonframework.messaging.MessageStreamTest
    public EventMessage<String> createRandomMessage() {
        return new GenericEventMessage(new MessageType("message"), "test-" + ThreadLocalRandom.current().nextInt(10000));
    }

    @Test
    void shouldInvokeConsumeCallbackWhenMessageIsConsumed() {
        QueueMessageStream<EventMessage<String>> uncompletedTestSubject2 = uncompletedTestSubject2(List.of(createRandomMessage()), new CompletableFuture<>());
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        uncompletedTestSubject2.onConsumeCallback(() -> {
            atomicBoolean.set(true);
        });
        Assertions.assertFalse(atomicBoolean.get());
        Assertions.assertTrue(uncompletedTestSubject2.next().isPresent());
        Assertions.assertTrue(atomicBoolean.get());
    }

    @Test
    void shouldRefuseNewItemWhenCapacityHasBeenReached() {
        QueueMessageStream queueMessageStream = new QueueMessageStream(new ArrayBlockingQueue(2));
        EventMessage<String> createRandomMessage = createRandomMessage();
        EventMessage<String> createRandomMessage2 = createRandomMessage();
        Assertions.assertTrue(queueMessageStream.offer(createRandomMessage, Context.empty()));
        Assertions.assertTrue(queueMessageStream.offer(createRandomMessage2, Context.empty()));
        Assertions.assertFalse(queueMessageStream.offer(createRandomMessage(), Context.empty()));
        Assertions.assertSame(createRandomMessage, queueMessageStream.next().map((v0) -> {
            return v0.message();
        }).orElse(null));
        Assertions.assertTrue(queueMessageStream.offer(createRandomMessage(), Context.empty()));
        Assertions.assertFalse(queueMessageStream.offer(createRandomMessage(), Context.empty()));
        Assertions.assertSame(createRandomMessage2, queueMessageStream.next().map((v0) -> {
            return v0.message();
        }).orElse(null));
    }

    @Override // org.axonframework.messaging.MessageStreamTest
    protected /* bridge */ /* synthetic */ MessageStream<EventMessage<String>> uncompletedTestSubject(List<EventMessage<String>> list, CompletableFuture completableFuture) {
        return uncompletedTestSubject2(list, (CompletableFuture<Void>) completableFuture);
    }
}
