package org.axonframework.eventhandling;

import jakarta.annotation.Nonnull;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import org.awaitility.Awaitility;
import org.axonframework.messaging.MessageType;
import org.axonframework.messaging.unitofwork.UnitOfWork;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/axonframework/eventhandling/EventSinkTest.class */
class EventSinkTest {
    private static final String TEST_CONTEXT = "some-context";
    private AtomicReference<List<EventMessage<?>>> publishedEventsReference;
    private EventSink testSubject;

    /* loaded from: input_file:org/axonframework/eventhandling/EventSinkTest$DefaultEventSink.class */
    static class DefaultEventSink implements EventSink {
        private final AtomicReference<List<EventMessage<?>>> publishedEventsReference;

        public DefaultEventSink(AtomicReference<List<EventMessage<?>>> atomicReference) {
            this.publishedEventsReference = atomicReference;
        }

        public CompletableFuture<Void> publish(@Nonnull List<EventMessage<?>> list) {
            this.publishedEventsReference.set(list);
            return CompletableFuture.completedFuture(null);
        }
    }

    EventSinkTest() {
    }

    @BeforeEach
    void setUp() {
        this.publishedEventsReference = new AtomicReference<>();
        this.testSubject = new DefaultEventSink(this.publishedEventsReference);
    }

    @Test
    void publishWithContextInvokesPublishInPostInvocationPhase() {
        EventMessage<?> eventMessage = eventMessage(0);
        EventMessage<?> eventMessage2 = eventMessage(1);
        UnitOfWork unitOfWork = new UnitOfWork();
        unitOfWork.runOnPreInvocation(processingContext -> {
            this.testSubject.publish(processingContext, new EventMessage[]{eventMessage, eventMessage2});
        }).runOnInvocation(processingContext2 -> {
            Assertions.assertNull(this.publishedEventsReference.get());
        }).runOnCommit(processingContext3 -> {
            Assertions.assertFalse(this.publishedEventsReference.get().isEmpty());
        });
        awaitCompletion(unitOfWork.execute());
        Assertions.assertEquals(Arrays.asList(eventMessage, eventMessage2), this.publishedEventsReference.get());
    }

    private static <R> R awaitCompletion(CompletableFuture<R> completableFuture) {
        Awaitility.await().atMost(Duration.ofMillis(500L)).pollDelay(Duration.ofMillis(25L)).untilAsserted(() -> {
            Assertions.assertFalse(completableFuture.isCompletedExceptionally(), () -> {
                return completableFuture.exceptionNow().toString();
            });
        });
        return completableFuture.join();
    }

    private static EventMessage<?> eventMessage(int i) {
        return new GenericEventMessage(new MessageType("test", "event", "0.0.1"), "Event[" + i + "]");
    }
}
