package io.vertx.rxjava3.test;

import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.functions.Function;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
import io.vertx.core.Context;
import io.vertx.core.Vertx;
import io.vertx.core.WorkerExecutor;
import io.vertx.rxjava3.ContextScheduler;
import io.vertx.rxjava3.RxHelper;
import io.vertx.test.core.VertxTestBase;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.junit.Test;

/* loaded from: input_file:io/vertx/rxjava3/test/SchedulerTest.class */
public class SchedulerTest extends VertxTestBase {
    private WorkerExecutor workerExecutor;

    public void setUp() throws Exception {
        super.setUp();
        this.workerExecutor = this.vertx.createSharedWorkerExecutor(this.name.getMethodName());
    }

    protected void tearDown() throws Exception {
        this.workerExecutor.close();
        super.tearDown();
        RxJavaPlugins.setScheduleHandler((Function) null);
    }

    private void assertEventLoopThread(Thread thread) {
        String name = thread.getName();
        assertTrue("Was expecting event loop thread instead of " + name, name.startsWith("vert.x-eventloop-thread"));
    }

    private void assertWorkerThread(Thread thread) {
        String name = thread.getName();
        assertTrue("Was expecting worker thread instead of " + name, name.startsWith("vert.x-worker-thread"));
    }

    private void assertWorkerExecutorThread(Thread thread) {
        String name = thread.getName();
        assertTrue("Was expecting worker executor thread instead of " + name, name.startsWith(this.name.getMethodName()));
    }

    @Test
    public void testScheduleImmediately() throws Exception {
        testScheduleImmediately(() -> {
            return new ContextScheduler(this.vertx, false);
        }, this::assertEventLoopThread);
    }

    @Test
    public void testScheduleImmediatelyBlocking() throws Exception {
        testScheduleImmediately(() -> {
            return new ContextScheduler(this.vertx, true);
        }, this::assertWorkerThread);
    }

    @Test
    public void testScheduleImmediatelyWorkerExecutor() throws Exception {
        testScheduleImmediately(() -> {
            return new ContextScheduler(this.workerExecutor);
        }, this::assertWorkerExecutorThread);
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void testScheduleImmediately(Supplier<ContextScheduler> supplier, Consumer<Thread> consumer) throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        ContextScheduler.ContextWorker createWorker = supplier.get().createWorker();
        AtomicReference atomicReference = new AtomicReference();
        createWorker.schedule(() -> {
            atomicReference.set(Thread.currentThread());
            countDownLatch.countDown();
        }, 0L, TimeUnit.MILLISECONDS);
        awaitLatch(countDownLatch);
        consumer.accept(atomicReference.get());
    }

    @Test
    public void testScheduleObserveOnReturnsOnTheCorrectThread() {
        Context orCreateContext = this.vertx.getOrCreateContext();
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        orCreateContext.runOnContext(r8 -> {
            Observable doOnNext = Observable.create(observableEmitter -> {
                atomicBoolean.set(Context.isOnVertxThread());
                observableEmitter.onNext("expected");
                observableEmitter.onComplete();
            }).observeOn(new ContextScheduler(orCreateContext, false)).doOnNext(str -> {
                assertEquals(Vertx.currentContext(), orCreateContext);
            });
            new Thread(() -> {
                doOnNext.subscribe(str2 -> {
                    assertEquals("expected", str2);
                }, this::fail, this::testComplete);
            }).start();
        });
        await();
        assertFalse(atomicBoolean.get());
    }

    @Test
    public void testScheduleWithDelayObserveOnReturnsOnTheCorrectThread() {
        Context orCreateContext = this.vertx.getOrCreateContext();
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        orCreateContext.runOnContext(r9 -> {
            Observable doOnNext = Observable.create(observableEmitter -> {
                atomicBoolean.set(Context.isOnVertxThread());
                observableEmitter.onNext("expected");
                observableEmitter.onComplete();
            }).delay(10L, TimeUnit.MILLISECONDS, new ContextScheduler(orCreateContext, false)).doOnNext(str -> {
                assertEquals(Vertx.currentContext(), orCreateContext);
            });
            new Thread(() -> {
                doOnNext.subscribe(str2 -> {
                    assertEquals("expected", str2);
                }, this::fail, this::testComplete);
            }).start();
        });
        assertFalse(atomicBoolean.get());
        await();
    }

    @Test
    public void testScheduleDelayed() throws Exception {
        testScheduleDelayed(() -> {
            return new ContextScheduler(this.vertx, false);
        }, this::assertEventLoopThread);
    }

    @Test
    public void testScheduleDelayedBlocking() throws Exception {
        testScheduleDelayed(() -> {
            return new ContextScheduler(this.vertx, true);
        }, this::assertWorkerThread);
    }

    @Test
    public void testScheduleDelayedWorkerExecutor() throws Exception {
        testScheduleDelayed(() -> {
            return new ContextScheduler(this.workerExecutor);
        }, this::assertWorkerExecutorThread);
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void testScheduleDelayed(Supplier<ContextScheduler> supplier, Consumer<Thread> consumer) throws Exception {
        ContextScheduler.ContextWorker createWorker = supplier.get().createWorker();
        long currentTimeMillis = System.currentTimeMillis();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        AtomicReference atomicReference = new AtomicReference();
        AtomicLong atomicLong = new AtomicLong();
        createWorker.schedule(() -> {
            atomicReference.set(Thread.currentThread());
            atomicLong.set(System.currentTimeMillis() - currentTimeMillis);
            countDownLatch.countDown();
        }, 40L, TimeUnit.MILLISECONDS);
        awaitLatch(countDownLatch);
        consumer.accept(atomicReference.get());
        assertTrue(atomicLong.get() >= 40);
    }

    @Test
    public void testSchedulePeriodic() {
        testSchedulePeriodic(() -> {
            return new ContextScheduler(this.vertx, false);
        }, this::assertEventLoopThread);
    }

    @Test
    public void testSchedulePeriodicBlocking() {
        testSchedulePeriodic(() -> {
            return new ContextScheduler(this.vertx, true);
        }, this::assertWorkerThread);
    }

    @Test
    public void testSchedulePeriodicWorkerExecutor() {
        testSchedulePeriodic(() -> {
            return new ContextScheduler(this.workerExecutor);
        }, this::assertWorkerExecutorThread);
    }

    private void testSchedulePeriodic(Supplier<ContextScheduler> supplier, Consumer<Thread> consumer) {
        disableThreadChecks();
        ContextScheduler.ContextWorker createWorker = supplier.get().createWorker();
        AtomicLong atomicLong = new AtomicLong(System.currentTimeMillis() - 40);
        AtomicInteger atomicInteger = new AtomicInteger();
        AtomicReference atomicReference = new AtomicReference();
        atomicReference.set(createWorker.schedulePeriodically(() -> {
            consumer.accept(Thread.currentThread());
            if (atomicInteger.incrementAndGet() > 2) {
                ((Disposable) atomicReference.get()).dispose();
                testComplete();
            } else {
                long currentTimeMillis = System.currentTimeMillis();
                long j = currentTimeMillis - atomicLong.get();
                assertTrue("" + j, j >= 40);
                atomicLong.set(currentTimeMillis);
            }
        }, 0L, 40L, TimeUnit.MILLISECONDS));
        await();
    }

    @Test
    public void testUnsubscribeBeforeExecute() throws Exception {
        testUnsubscribeBeforeExecute(() -> {
            return new ContextScheduler(this.vertx, false);
        });
    }

    @Test
    public void testUnsubscribeBeforeExecuteBlocking() throws Exception {
        testUnsubscribeBeforeExecute(() -> {
            return new ContextScheduler(this.vertx, true);
        });
    }

    @Test
    public void testUnsubscribeBeforeExecuteWorkerExecutor() throws Exception {
        testUnsubscribeBeforeExecute(() -> {
            return new ContextScheduler(this.workerExecutor);
        });
    }

    private void testUnsubscribeBeforeExecute(Supplier<ContextScheduler> supplier) throws Exception {
        ContextScheduler.ContextWorker createWorker = supplier.get().createWorker();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        countDownLatch.getClass();
        createWorker.schedule(countDownLatch::countDown, 20L, TimeUnit.MILLISECONDS).dispose();
        assertFalse(countDownLatch.await(40L, TimeUnit.MILLISECONDS));
    }

    @Test
    public void testUnsubscribeDuringExecute() throws Exception {
        testUnsubscribeDuringExecute(() -> {
            return new ContextScheduler(this.vertx, false);
        });
    }

    @Test
    public void testUnsubscribeDuringExecuteBlocking() throws Exception {
        testUnsubscribeDuringExecute(() -> {
            return new ContextScheduler(this.vertx, true);
        });
    }

    @Test
    public void testUnsubscribeDuringExecuteWorkerExecutor() throws Exception {
        testUnsubscribeDuringExecute(() -> {
            return new ContextScheduler(this.workerExecutor);
        });
    }

    private void testUnsubscribeDuringExecute(Supplier<ContextScheduler> supplier) throws Exception {
        ContextScheduler.ContextWorker createWorker = supplier.get().createWorker();
        AtomicInteger atomicInteger = new AtomicInteger();
        AtomicReference atomicReference = new AtomicReference();
        atomicReference.set(createWorker.schedulePeriodically(() -> {
            if (atomicInteger.getAndIncrement() == 0) {
                ((Disposable) atomicReference.get()).dispose();
            }
        }, 0L, 5L, TimeUnit.MILLISECONDS));
        Thread.sleep(60L);
        assertEquals(1L, atomicInteger.get());
    }

    @Test
    public void testUnsubscribeBetweenActions() throws Exception {
        testUnsubscribeBetweenActions(() -> {
            return new ContextScheduler(this.vertx, false);
        });
    }

    @Test
    public void testUnsubscribeBetweenActionsBlocking() throws Exception {
        testUnsubscribeBetweenActions(() -> {
            return new ContextScheduler(this.vertx, true);
        });
    }

    @Test
    public void testUnsubscribeBetweenActionsWorkerExecutor() throws Exception {
        testUnsubscribeBetweenActions(() -> {
            return new ContextScheduler(this.workerExecutor);
        });
    }

    private void testUnsubscribeBetweenActions(Supplier<ContextScheduler> supplier) throws Exception {
        ContextScheduler.ContextWorker createWorker = supplier.get().createWorker();
        AtomicInteger atomicInteger = new AtomicInteger();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        AtomicReference atomicReference = new AtomicReference();
        atomicReference.set(createWorker.schedulePeriodically(() -> {
            if (atomicInteger.incrementAndGet() == 4) {
                countDownLatch.countDown();
            }
        }, 0L, 20L, TimeUnit.MILLISECONDS));
        awaitLatch(countDownLatch);
        ((Disposable) atomicReference.get()).dispose();
        Thread.sleep(60L);
        assertEquals(4L, atomicInteger.get());
    }

    @Test
    public void testWorkerUnsubscribe() throws Exception {
        testWorkerUnsubscribe(() -> {
            return new ContextScheduler(this.vertx, false);
        });
    }

    @Test
    public void testWorkerUnsubscribeBlocking() throws Exception {
        testWorkerUnsubscribe(() -> {
            return new ContextScheduler(this.vertx, true);
        });
    }

    @Test
    public void testWorkerUnsubscribeWorkerExecutor() throws Exception {
        testWorkerUnsubscribe(() -> {
            return new ContextScheduler(this.workerExecutor);
        });
    }

    private void testWorkerUnsubscribe(Supplier<ContextScheduler> supplier) throws Exception {
        ContextScheduler.ContextWorker createWorker = supplier.get().createWorker();
        CountDownLatch countDownLatch = new CountDownLatch(2);
        countDownLatch.getClass();
        Disposable schedule = createWorker.schedule(countDownLatch::countDown, 40L, TimeUnit.MILLISECONDS);
        countDownLatch.getClass();
        Disposable schedule2 = createWorker.schedule(countDownLatch::countDown, 40L, TimeUnit.MILLISECONDS);
        createWorker.dispose();
        assertTrue(schedule.isDisposed());
        assertTrue(schedule2.isDisposed());
        assertFalse(countDownLatch.await(40L, TimeUnit.MILLISECONDS));
        assertEquals(2L, countDownLatch.getCount());
    }

    @Test
    public void testPeriodicRescheduleAfterActionBlocking() {
        ContextScheduler.ContextWorker createWorker = new ContextScheduler(this.vertx, true).createWorker();
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        long nanoTime = System.nanoTime();
        createWorker.schedulePeriodically(() -> {
            if (atomicBoolean.compareAndSet(false, true)) {
                try {
                    Thread.sleep(10L);
                } catch (InterruptedException e) {
                    fail();
                }
            } else {
                assertTrue(System.nanoTime() - nanoTime > TimeUnit.NANOSECONDS.convert(50L, TimeUnit.MILLISECONDS));
                createWorker.dispose();
                testComplete();
            }
        }, 20L, 20L, TimeUnit.MILLISECONDS);
        await();
    }

    @Test
    public void testSchedulerHook() throws Exception {
        testSchedulerHook(() -> {
            return new ContextScheduler(this.vertx, false);
        });
    }

    @Test
    public void testSchedulerHookBlocking() throws Exception {
        testSchedulerHook(() -> {
            return new ContextScheduler(this.vertx, true);
        });
    }

    @Test
    public void testSchedulerHookWorkerExecutor() throws Exception {
        testSchedulerHook(() -> {
            return new ContextScheduler(this.workerExecutor);
        });
    }

    private void testSchedulerHook(Supplier<ContextScheduler> supplier) throws Exception {
        AtomicInteger atomicInteger = new AtomicInteger();
        AtomicInteger atomicInteger2 = new AtomicInteger();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        RxJavaPlugins.setScheduleHandler(runnable -> {
            atomicInteger.incrementAndGet();
            return () -> {
                runnable.run();
                atomicInteger2.getAndIncrement();
                countDownLatch.countDown();
            };
        });
        ContextScheduler.ContextWorker createWorker = supplier.get().createWorker();
        assertEquals(0L, atomicInteger.get());
        assertEquals(0L, atomicInteger2.get());
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        AtomicInteger atomicInteger3 = new AtomicInteger();
        AtomicInteger atomicInteger4 = new AtomicInteger();
        createWorker.schedule(() -> {
            atomicInteger3.set(atomicInteger.get());
            atomicInteger4.set(atomicInteger2.get());
            countDownLatch2.countDown();
        }, 0L, TimeUnit.SECONDS);
        awaitLatch(countDownLatch2);
        awaitLatch(countDownLatch);
        assertEquals(1L, atomicInteger.get());
        assertEquals(1L, atomicInteger2.get());
        assertEquals(1L, atomicInteger3.get());
        assertEquals(0L, atomicInteger4.get());
    }

    @Test
    public void testRemovedFromContextAfterRun() throws Exception {
        ContextScheduler.ContextWorker createWorker = RxHelper.blockingScheduler(this.vertx).createWorker();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        countDownLatch.getClass();
        createWorker.schedule(countDownLatch::countDown);
        awaitLatch(countDownLatch);
        waitUntil(() -> {
            return createWorker.countActions() == 0;
        });
    }

    @Test
    public void testRemovedFromContextAfterDelay() throws Exception {
        ContextScheduler.ContextWorker createWorker = RxHelper.blockingScheduler(this.vertx).createWorker();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        countDownLatch.getClass();
        createWorker.schedule(countDownLatch::countDown, 10L, TimeUnit.MILLISECONDS);
        awaitLatch(countDownLatch);
        waitUntil(() -> {
            return createWorker.countActions() == 0;
        });
    }

    @Test
    public void testUnsubscribePeriodicInTask() throws Exception {
        ContextScheduler.ContextWorker createWorker = RxHelper.blockingScheduler(this.vertx).createWorker();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        AtomicReference atomicReference = new AtomicReference();
        atomicReference.set(createWorker.schedulePeriodically(() -> {
            while (true) {
                Disposable disposable = (Disposable) atomicReference.get();
                if (disposable != null) {
                    disposable.dispose();
                    countDownLatch.countDown();
                    return;
                }
                Thread.yield();
            }
        }, 10L, 10L, TimeUnit.MILLISECONDS));
        awaitLatch(countDownLatch);
        waitUntil(() -> {
            return createWorker.countActions() == 0;
        });
    }

    @Test
    public void testTimeoutDoesNotFireAfterSubscriptionIsDisposed() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(2);
        Flowable.fromArray(new String[]{"tick"}).timeout(500L, TimeUnit.MILLISECONDS, RxHelper.scheduler(this.vertx)).doOnError(th -> {
            countDownLatch.countDown();
        }).firstOrError().subscribe(str -> {
            countDownLatch.countDown();
        });
        assertFalse("doOnError should not have been invoked", countDownLatch.await(1L, TimeUnit.SECONDS));
    }
}
