package io.vertx.core;

import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.impl.WorkerExecutor;
import io.vertx.core.impl.future.PromiseInternal;
import io.vertx.test.core.VertxTestBase;
import io.vertx.tests.deployment.VirtualThreadDeploymentTest;
import java.lang.Thread;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:io/vertx/core/VirtualThreadContextTest.class */
public class VirtualThreadContextTest extends VertxTestBase {
    VertxInternal vertx;

    @Override // io.vertx.test.core.VertxTestBase, io.vertx.test.core.AsyncTestBase
    @Before
    public void setUp() throws Exception {
        super.setUp();
        this.vertx = super.vertx;
    }

    @Test
    public void testContext() {
        Assume.assumeTrue(isVirtualThreadAvailable());
        this.vertx.createVirtualThreadContext().runOnContext(r5 -> {
            assertTrue(VirtualThreadDeploymentTest.isVirtual(Thread.currentThread()));
            ContextInternal orCreateContext = this.vertx.getOrCreateContext();
            assertTrue(orCreateContext.executor() instanceof WorkerExecutor);
            orCreateContext.runOnContext(r6 -> {
                assertSame(orCreateContext, this.vertx.getOrCreateContext());
                testComplete();
            });
        });
        await();
    }

    @Test
    public void testAwaitFutureSuccess() {
        Assume.assumeTrue(isVirtualThreadAvailable());
        Object obj = new Object();
        this.vertx.createVirtualThreadContext().runOnContext(r7 -> {
            PromiseInternal promise = this.vertx.getOrCreateContext().promise();
            new Thread(() -> {
                try {
                    Thread.sleep(100L);
                } catch (InterruptedException e) {
                }
                promise.complete(obj);
            }).start();
            assertSame(obj, Future.await(promise.future()));
            testComplete();
        });
        await();
    }

    @Test
    public void testAwaitFutureFailure() {
        Assume.assumeTrue(isVirtualThreadAvailable());
        Exception exc = new Exception();
        this.vertx.createVirtualThreadContext().runOnContext(r7 -> {
            PromiseInternal promise = this.vertx.getOrCreateContext().promise();
            new Thread(() -> {
                try {
                    Thread.sleep(100L);
                } catch (InterruptedException e) {
                }
                promise.fail(exc);
            }).start();
            try {
                Future.await(promise.future());
                fail();
            } catch (Exception e) {
                assertSame(exc, e);
                testComplete();
            }
        });
        await();
    }

    @Test
    public void testAwaitCompoundFuture() {
        Assume.assumeTrue(isVirtualThreadAvailable());
        Object obj = new Object();
        this.vertx.createVirtualThreadContext().runOnContext(r7 -> {
            PromiseInternal promise = this.vertx.getOrCreateContext().promise();
            new Thread(() -> {
                try {
                    Thread.sleep(100L);
                } catch (InterruptedException e) {
                }
                promise.complete(obj);
            }).start();
            assertSame("HELLO", Future.await(promise.future().map(obj2 -> {
                return "HELLO";
            })));
            testComplete();
        });
        await();
    }

    @Test
    public void testDuplicateUseSameThread() {
        Assume.assumeTrue(isVirtualThreadAvailable());
        int i = 1000;
        waitFor(1000);
        this.vertx.createVirtualThreadContext().runOnContext(r5 -> {
            ContextInternal orCreateContext = this.vertx.getOrCreateContext();
            Thread.currentThread();
            for (int i2 = 0; i2 < i; i2++) {
                orCreateContext.duplicate().runOnContext(r3 -> {
                    complete();
                });
            }
        });
        await();
    }

    @Test
    public void testDuplicateConcurrentAwait() {
        Assume.assumeTrue(isVirtualThreadAvailable());
        int i = 1000;
        waitFor(1000);
        this.vertx.createVirtualThreadContext().runOnContext(r10 -> {
            ContextInternal orCreateContext = this.vertx.getOrCreateContext();
            Object obj = new Object();
            ArrayList arrayList = new ArrayList();
            for (int i2 = 0; i2 < i; i2++) {
                ContextInternal duplicate = orCreateContext.duplicate();
                duplicate.runOnContext(r10 -> {
                    boolean z;
                    PromiseInternal promise = duplicate.promise();
                    synchronized (obj) {
                        arrayList.add(promise);
                        z = arrayList.size() == i;
                    }
                    if (z) {
                        orCreateContext.runOnContext(r5 -> {
                            synchronized (obj) {
                                arrayList.forEach(promise2 -> {
                                    promise2.complete((Object) null);
                                });
                            }
                        });
                    }
                    Future.await(promise.future());
                    complete();
                });
            }
        });
        await();
    }

    @Test
    public void testTimer() {
        Assume.assumeTrue(isVirtualThreadAvailable());
        this.vertx.createVirtualThreadContext().runOnContext(r6 -> {
            PromiseInternal promise = this.vertx.getOrCreateContext().promise();
            this.vertx.setTimer(100L, l -> {
                promise.complete("foo");
            });
            assertEquals("foo", (String) Future.await(promise));
            testComplete();
        });
        await();
    }

    @Test
    public void testInThread() {
        Assume.assumeTrue(isVirtualThreadAvailable());
        this.vertx.createVirtualThreadContext().runOnContext(r6 -> {
            ContextInternal orCreateContext = this.vertx.getOrCreateContext();
            assertTrue(orCreateContext.inThread());
            new Thread(() -> {
                boolean z = !orCreateContext.inThread();
                orCreateContext.runOnContext(r6 -> {
                    assertTrue(z);
                    assertTrue(orCreateContext.inThread());
                    testComplete();
                });
            }).start();
        });
        await();
    }

    private void sleep(AtomicInteger atomicInteger) {
        assertEquals(0L, atomicInteger.getAndIncrement());
        try {
            try {
                Thread.sleep(100L);
                atomicInteger.decrementAndGet();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        } catch (Throwable th) {
            atomicInteger.decrementAndGet();
            throw th;
        }
    }

    @Test
    public void testSerializeBlocking() throws Exception {
        Assume.assumeTrue(isVirtualThreadAvailable());
        AtomicInteger atomicInteger = new AtomicInteger();
        this.vertx.createVirtualThreadContext().runOnContext(r6 -> {
            ContextInternal orCreateContext = this.vertx.getOrCreateContext();
            for (int i = 0; i < 10; i++) {
                orCreateContext.runOnContext(r5 -> {
                    sleep(atomicInteger);
                });
            }
            orCreateContext.runOnContext(r3 -> {
                testComplete();
            });
        });
        await();
    }

    @Test
    public void testVirtualThreadsNotAvailable() {
        Assume.assumeFalse(isVirtualThreadAvailable());
        try {
            this.vertx.createVirtualThreadContext();
            fail();
        } catch (IllegalStateException e) {
        }
    }

    @Test
    public void testVirtualThreadInterruptOnClose() throws Exception {
        Assume.assumeTrue(isVirtualThreadAvailable());
        ContextInternal createVirtualThreadContext = this.vertx.createVirtualThreadContext();
        createVirtualThreadContext.exceptionHandler(th -> {
        });
        PromiseInternal promise = createVirtualThreadContext.promise();
        AtomicReference atomicReference = new AtomicReference();
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        createVirtualThreadContext.runOnContext(r7 -> {
            try {
                atomicReference.set(Thread.currentThread());
                Future.await(promise.future());
                fail();
            } catch (Throwable th2) {
                if (th2 instanceof InterruptedException) {
                    atomicBoolean.set(true);
                }
                throw th2;
            }
        });
        assertWaitUntil(() -> {
            return atomicReference.get() != null && ((Thread) atomicReference.get()).getState() == Thread.State.WAITING;
        });
        createVirtualThreadContext.close().toCompletionStage().toCompletableFuture().get(20L, TimeUnit.SECONDS);
        atomicBoolean.getClass();
        assertWaitUntil(atomicBoolean::get);
    }

    @Test
    public void testVirtualThreadInterruptOnClose2() throws Exception {
        Assume.assumeTrue(isVirtualThreadAvailable());
        ContextInternal createVirtualThreadContext = this.vertx.createVirtualThreadContext();
        AtomicReference atomicReference = new AtomicReference();
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        createVirtualThreadContext.runOnContext(r7 -> {
            try {
                atomicReference.set(Thread.currentThread());
                countDownLatch.await();
                fail();
            } catch (InterruptedException e) {
                atomicBoolean.set(true);
            }
        });
        assertWaitUntil(() -> {
            return atomicReference.get() != null && ((Thread) atomicReference.get()).getState() == Thread.State.WAITING;
        });
        createVirtualThreadContext.close().toCompletionStage().toCompletableFuture().get(20L, TimeUnit.SECONDS);
        atomicBoolean.getClass();
        assertWaitUntil(atomicBoolean::get);
    }

    @Test
    public void testContextCloseContextSerialization() throws Exception {
        Assume.assumeTrue(isVirtualThreadAvailable());
        ContextInternal createVirtualThreadContext = this.vertx.createVirtualThreadContext();
        Thread[] threadArr = new Thread[4];
        List list = (List) IntStream.range(0, 4).mapToObj(i -> {
            return Promise.promise();
        }).collect(Collectors.toList());
        ConcurrentLinkedDeque concurrentLinkedDeque = new ConcurrentLinkedDeque();
        CyclicBarrier[] cyclicBarrierArr = new CyclicBarrier[4];
        AtomicInteger atomicInteger = new AtomicInteger();
        for (int i2 = 0; i2 < 4; i2++) {
            int i3 = i2;
            CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
            cyclicBarrierArr[i2] = cyclicBarrier;
            concurrentLinkedDeque.add(cyclicBarrier);
            createVirtualThreadContext.runOnContext(r10 -> {
                threadArr[i3] = Thread.currentThread();
                try {
                    Future.await(((Promise) list.get(i3)).future());
                    fail();
                } catch (Exception e) {
                    assertTrue(e instanceof InterruptedException);
                    CyclicBarrier cyclicBarrier2 = (CyclicBarrier) concurrentLinkedDeque.removeFirst();
                    assertTrue(atomicInteger.addAndGet(1) == 1);
                    try {
                        try {
                            cyclicBarrier2.await();
                            atomicInteger.decrementAndGet();
                        } catch (Exception e2) {
                            throw new RuntimeException(e2);
                        }
                    } catch (Throwable th) {
                        atomicInteger.decrementAndGet();
                        throw th;
                    }
                }
            });
        }
        assertWaitUntil(() -> {
            for (Thread thread : threadArr) {
                if (thread == null || thread.getState() != Thread.State.WAITING) {
                    return false;
                }
            }
            return true;
        });
        Future close = createVirtualThreadContext.close();
        for (int i4 = 0; i4 < 4; i4++) {
            try {
                cyclicBarrierArr[i4].await();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } catch (BrokenBarrierException e2) {
                throw new RuntimeException(e2);
            }
        }
        close.toCompletionStage().toCompletableFuture().get(20L, TimeUnit.SECONDS);
    }

    @Test
    public void testAwaitWhenClosed() throws Exception {
        Assume.assumeTrue(isVirtualThreadAvailable());
        ContextInternal createVirtualThreadContext = this.vertx.createVirtualThreadContext();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        createVirtualThreadContext.runOnContext(r6 -> {
            countDownLatch.countDown();
            try {
                new CountDownLatch(1).await();
                fail();
            } catch (InterruptedException e) {
                assertFalse(Thread.currentThread().isInterrupted());
            }
            try {
                Future.await(Promise.promise().future());
                fail();
            } catch (Exception e2) {
                assertEquals(InterruptedException.class, e2.getClass());
                testComplete();
            }
        });
        awaitLatch(countDownLatch);
        createVirtualThreadContext.close();
        await();
    }

    @Test
    public void testSubmitAfterClose() {
        Assume.assumeTrue(isVirtualThreadAvailable());
        ContextInternal createVirtualThreadContext = this.vertx.createVirtualThreadContext();
        createVirtualThreadContext.close();
        createVirtualThreadContext.runOnContext(r3 -> {
            testComplete();
        });
        await();
    }

    @Test
    public void testAwaitFromVirtualThreadExecuteBlocking() {
        Assume.assumeTrue(isVirtualThreadAvailable());
        this.vertx.createVirtualThreadContext().executeBlocking(() -> {
            Future.await(this.vertx.timer(20L));
            return "done";
        }).onComplete(onSuccess(str -> {
            assertEquals("done", str);
            testComplete();
        }));
        await();
    }

    @Test
    public void testAwaitFromWorkerExecuteBlocking() {
        this.vertx.getOrCreateContext().executeBlocking(() -> {
            Future.await(this.vertx.timer(20L));
            return "done";
        }).onComplete(onFailure(th -> {
            testComplete();
        }));
        await();
    }
}
