package org.axonframework.messaging.retry;

import jakarta.annotation.Nonnull;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.ObjectMethods;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.axonframework.common.infra.ComponentDescriptor;
import org.axonframework.messaging.GenericMessage;
import org.axonframework.messaging.Message;
import org.axonframework.messaging.MessageStream;
import org.axonframework.messaging.MessageType;
import org.axonframework.messaging.retry.RetryPolicy;
import org.axonframework.messaging.retry.RetryScheduler;
import org.axonframework.messaging.unitofwork.ProcessingContext;
import org.axonframework.utils.MockException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import reactor.test.StepVerifier;

/* loaded from: input_file:org/axonframework/messaging/retry/AsyncRetrySchedulerTest.class */
class AsyncRetrySchedulerTest {
    private static final MessageType TEST_TYPE = new MessageType("message");
    private AsyncRetryScheduler testSubject;
    private RetryPolicy retryPolicy;
    private ScheduledExecutorService executor;
    private AtomicReference<RetryPolicy.Outcome> policyOutcome;
    private Queue<ScheduledTask> scheduledTasks;

    /* loaded from: input_file:org/axonframework/messaging/retry/AsyncRetrySchedulerTest$ScheduledTask.class */
    private static final class ScheduledTask extends Record {
        private final Runnable task;
        private final long delay;
        private final TimeUnit unit;

        private ScheduledTask(Runnable runnable, long j, TimeUnit timeUnit) {
            this.task = runnable;
            this.delay = j;
            this.unit = timeUnit;
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, ScheduledTask.class), ScheduledTask.class, "task;delay;unit", "FIELD:Lorg/axonframework/messaging/retry/AsyncRetrySchedulerTest$ScheduledTask;->task:Ljava/lang/Runnable;", "FIELD:Lorg/axonframework/messaging/retry/AsyncRetrySchedulerTest$ScheduledTask;->delay:J", "FIELD:Lorg/axonframework/messaging/retry/AsyncRetrySchedulerTest$ScheduledTask;->unit:Ljava/util/concurrent/TimeUnit;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, ScheduledTask.class), ScheduledTask.class, "task;delay;unit", "FIELD:Lorg/axonframework/messaging/retry/AsyncRetrySchedulerTest$ScheduledTask;->task:Ljava/lang/Runnable;", "FIELD:Lorg/axonframework/messaging/retry/AsyncRetrySchedulerTest$ScheduledTask;->delay:J", "FIELD:Lorg/axonframework/messaging/retry/AsyncRetrySchedulerTest$ScheduledTask;->unit:Ljava/util/concurrent/TimeUnit;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, ScheduledTask.class, Object.class), ScheduledTask.class, "task;delay;unit", "FIELD:Lorg/axonframework/messaging/retry/AsyncRetrySchedulerTest$ScheduledTask;->task:Ljava/lang/Runnable;", "FIELD:Lorg/axonframework/messaging/retry/AsyncRetrySchedulerTest$ScheduledTask;->delay:J", "FIELD:Lorg/axonframework/messaging/retry/AsyncRetrySchedulerTest$ScheduledTask;->unit:Ljava/util/concurrent/TimeUnit;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        public Runnable task() {
            return this.task;
        }

        public long delay() {
            return this.delay;
        }

        public TimeUnit unit() {
            return this.unit;
        }
    }

    /* loaded from: input_file:org/axonframework/messaging/retry/AsyncRetrySchedulerTest$TestRetryPolicy.class */
    private static class TestRetryPolicy implements RetryPolicy {
        private final AtomicReference<RetryPolicy.Outcome> policyOutcome;

        private TestRetryPolicy(AtomicReference<RetryPolicy.Outcome> atomicReference) {
            this.policyOutcome = atomicReference;
        }

        public RetryPolicy.Outcome defineFor(@Nonnull Message<?> message, @Nonnull Throwable th, @Nonnull List<Class<? extends Throwable>[]> list) {
            return this.policyOutcome.get();
        }

        public void describeTo(@Nonnull ComponentDescriptor componentDescriptor) {
        }

        public AtomicReference<RetryPolicy.Outcome> policyOutcome() {
            return this.policyOutcome;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (obj == null || obj.getClass() != getClass()) {
                return false;
            }
            return Objects.equals(this.policyOutcome, ((TestRetryPolicy) obj).policyOutcome);
        }

        public int hashCode() {
            return Objects.hash(this.policyOutcome);
        }

        public String toString() {
            return "TestRetryPolicy[policyOutcome=" + String.valueOf(this.policyOutcome) + "]";
        }
    }

    AsyncRetrySchedulerTest() {
    }

    @BeforeEach
    void setup() {
        this.policyOutcome = new AtomicReference<>(RetryPolicy.Outcome.doNotReschedule());
        this.scheduledTasks = new LinkedList();
        this.retryPolicy = (RetryPolicy) Mockito.spy(new TestRetryPolicy(this.policyOutcome));
        this.executor = (ScheduledExecutorService) Mockito.mock(ScheduledExecutorService.class);
        Mockito.when(this.executor.schedule((Runnable) Mockito.any(Runnable.class), Mockito.anyLong(), (TimeUnit) Mockito.any())).thenAnswer(invocationOnMock -> {
            this.scheduledTasks.add(new ScheduledTask((Runnable) invocationOnMock.getArgument(0), ((Long) invocationOnMock.getArgument(1)).longValue(), (TimeUnit) invocationOnMock.getArgument(2)));
            return null;
        });
        this.testSubject = new AsyncRetryScheduler(this.retryPolicy, this.executor);
    }

    @Test
    void shouldReturnFailedStreamIfPolicyOutcomeIsNoRetry() {
        GenericMessage genericMessage = new GenericMessage(TEST_TYPE, "stub");
        RetryScheduler.Dispatcher dispatcher = (RetryScheduler.Dispatcher) Mockito.mock(new RetryScheduler.Dispatcher[0]);
        Assertions.assertTrue(this.testSubject.scheduleRetry(genericMessage, (ProcessingContext) null, new MockException("Simulating exception"), dispatcher).first().asCompletableFuture().isCompletedExceptionally());
        ((RetryScheduler.Dispatcher) Mockito.verify(dispatcher, Mockito.never())).dispatch((Message) Mockito.any(), (ProcessingContext) Mockito.any());
        ((ScheduledExecutorService) Mockito.verify(this.executor, Mockito.never())).schedule((Runnable) Mockito.any(Runnable.class), Mockito.anyLong(), (TimeUnit) Mockito.any());
    }

    @Test
    void shouldScheduleRetryIfPolicyOutcomeIsRetry() {
        GenericMessage genericMessage = new GenericMessage(TEST_TYPE, "stub");
        this.policyOutcome.set(RetryPolicy.Outcome.rescheduleIn(1L, TimeUnit.SECONDS));
        RetryScheduler.Dispatcher dispatcher = (RetryScheduler.Dispatcher) Mockito.mock(new RetryScheduler.Dispatcher[0]);
        Mockito.when(dispatcher.dispatch((Message) Mockito.any(), (ProcessingContext) Mockito.any())).thenReturn(MessageStream.empty().cast());
        MessageStream scheduleRetry = this.testSubject.scheduleRetry(genericMessage, (ProcessingContext) null, new MockException("Simulating exception"), dispatcher);
        ((RetryPolicy) Mockito.verify(this.retryPolicy)).defineFor((Message) Mockito.eq(genericMessage), (Throwable) Mockito.isA(MockException.class), (List) Mockito.argThat((v0) -> {
            return v0.isEmpty();
        }));
        Assertions.assertFalse(scheduleRetry.first().asCompletableFuture().isDone());
        ScheduledTask poll = this.scheduledTasks.poll();
        Assertions.assertNotNull(poll, "Expected task to have been scheduled");
        Assertions.assertEquals(1L, poll.delay);
        Assertions.assertEquals(TimeUnit.SECONDS, poll.unit);
        poll.task.run();
        ((RetryScheduler.Dispatcher) Mockito.verify(dispatcher)).dispatch((Message) Mockito.eq(genericMessage), (ProcessingContext) Mockito.any());
        Assertions.assertTrue(scheduleRetry.first().asCompletableFuture().isDone());
        Mockito.verifyNoMoreInteractions(new Object[]{this.retryPolicy});
    }

    @Test
    void shouldRescheduleAgainWhenRetryReturnsFailedStream() {
        GenericMessage genericMessage = new GenericMessage(TEST_TYPE, "stub");
        this.policyOutcome.set(RetryPolicy.Outcome.rescheduleIn(1L, TimeUnit.SECONDS));
        RetryScheduler.Dispatcher dispatcher = (RetryScheduler.Dispatcher) Mockito.mock(new RetryScheduler.Dispatcher[0]);
        Mockito.when(dispatcher.dispatch((Message) Mockito.any(), (ProcessingContext) Mockito.any())).thenReturn(MessageStream.failed(new MockException("Repeated failure"))).thenReturn(MessageStream.empty().cast());
        MessageStream scheduleRetry = this.testSubject.scheduleRetry(genericMessage, (ProcessingContext) null, new MockException("Simulating exception"), dispatcher);
        ((RetryPolicy) Mockito.verify(this.retryPolicy)).defineFor((Message) Mockito.eq(genericMessage), (Throwable) Mockito.isA(MockException.class), (List) Mockito.argThat((v0) -> {
            return v0.isEmpty();
        }));
        Assertions.assertFalse(scheduleRetry.first().asCompletableFuture().isDone());
        ScheduledTask poll = this.scheduledTasks.poll();
        Assertions.assertNotNull(poll, "Expected task to have been scheduled");
        Assertions.assertEquals(1L, poll.delay);
        Assertions.assertEquals(TimeUnit.SECONDS, poll.unit);
        poll.task.run();
        ((RetryPolicy) Mockito.verify(this.retryPolicy)).defineFor((Message) Mockito.eq(genericMessage), (Throwable) Mockito.isA(MockException.class), (List) Mockito.argThat(list -> {
            return list.size() == 1;
        }));
        ((RetryScheduler.Dispatcher) Mockito.verify(dispatcher, Mockito.times(1))).dispatch((Message) Mockito.eq(genericMessage), (ProcessingContext) Mockito.any());
        Assertions.assertFalse(scheduleRetry.first().asCompletableFuture().isDone());
        this.scheduledTasks.remove().task.run();
        Assertions.assertTrue(scheduleRetry.first().asCompletableFuture().isDone());
    }

    @Test
    void shouldReturnFailedStreamIfFailureIsNotFirstItemInStream() {
        GenericMessage genericMessage = new GenericMessage(TEST_TYPE, "stub");
        GenericMessage genericMessage2 = new GenericMessage(TEST_TYPE, "OK");
        this.policyOutcome.set(RetryPolicy.Outcome.rescheduleIn(1L, TimeUnit.SECONDS));
        RetryScheduler.Dispatcher dispatcher = (RetryScheduler.Dispatcher) Mockito.mock(new RetryScheduler.Dispatcher[0]);
        Mockito.when(dispatcher.dispatch((Message) Mockito.any(), (ProcessingContext) Mockito.any())).thenAnswer(invocationOnMock -> {
            return MessageStream.just(genericMessage2).concatWith(MessageStream.failed(new MockException("Streaming error")));
        });
        MessageStream scheduleRetry = this.testSubject.scheduleRetry(genericMessage, (ProcessingContext) null, new MockException("Simulating exception"), dispatcher);
        Assertions.assertFalse(scheduleRetry.hasNextAvailable());
        ((RetryScheduler.Dispatcher) Mockito.verify(dispatcher, Mockito.never())).dispatch((Message) Mockito.any(), (ProcessingContext) Mockito.any());
        this.scheduledTasks.remove().task.run();
        Assertions.assertTrue(scheduleRetry.hasNextAvailable());
        StepVerifier.create(scheduleRetry.asFlux()).expectNextCount(1L).verifyError(MockException.class);
        Assertions.assertTrue(this.scheduledTasks.isEmpty());
        ((RetryScheduler.Dispatcher) Mockito.verify(dispatcher, Mockito.times(1))).dispatch((Message) Mockito.any(), (ProcessingContext) Mockito.any());
    }

    @Test
    void shouldNotScheduleAnotherRetryWhenPolicyIndicatesSo() {
        GenericMessage genericMessage = new GenericMessage(TEST_TYPE, "stub");
        this.policyOutcome.set(RetryPolicy.Outcome.rescheduleIn(1L, TimeUnit.SECONDS));
        RetryScheduler.Dispatcher dispatcher = (RetryScheduler.Dispatcher) Mockito.mock(new RetryScheduler.Dispatcher[0]);
        Mockito.when(dispatcher.dispatch((Message) Mockito.any(), (ProcessingContext) Mockito.any())).thenAnswer(invocationOnMock -> {
            return MessageStream.failed(new MockException("Retry error"));
        });
        MessageStream scheduleRetry = this.testSubject.scheduleRetry(genericMessage, (ProcessingContext) null, new MockException("Simulating exception"), dispatcher);
        Assertions.assertFalse(scheduleRetry.first().asCompletableFuture().isDone());
        ((RetryScheduler.Dispatcher) Mockito.verify(dispatcher, Mockito.never())).dispatch((Message) Mockito.any(), (ProcessingContext) Mockito.any());
        this.policyOutcome.set(RetryPolicy.Outcome.doNotReschedule());
        this.scheduledTasks.remove().task.run();
        Assertions.assertTrue(scheduleRetry.first().asCompletableFuture().isDone());
        Assertions.assertInstanceOf(MockException.class, scheduleRetry.first().asCompletableFuture().exceptionNow());
        Assertions.assertEquals("Retry error", scheduleRetry.first().asCompletableFuture().exceptionNow().getMessage());
        Assertions.assertTrue(this.scheduledTasks.isEmpty());
        ((RetryScheduler.Dispatcher) Mockito.verify(dispatcher, Mockito.times(1))).dispatch((Message) Mockito.any(), (ProcessingContext) Mockito.any());
    }

    @Test
    void shouldDescribeItsProperties() {
        ComponentDescriptor componentDescriptor = (ComponentDescriptor) Mockito.mock(new ComponentDescriptor[0]);
        this.testSubject.describeTo(componentDescriptor);
        ((ComponentDescriptor) Mockito.verify(componentDescriptor)).describeProperty("retryPolicy", this.retryPolicy);
        ((ComponentDescriptor) Mockito.verify(componentDescriptor)).describeProperty("executor", this.executor);
    }
}
