package io.vertx.tests.context;

import io.vertx.core.impl.TaskQueue;
import io.vertx.core.impl.WorkerExecutor;
import io.vertx.test.core.AsyncTestBase;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.assertj.core.api.Assertions;
import org.junit.Test;

/* loaded from: input_file:io/vertx/tests/context/TaskQueueTest.class */
public class TaskQueueTest extends AsyncTestBase {
    private TaskQueue taskQueue;
    private Executor executor;
    private List<Thread> threads = Collections.synchronizedList(new ArrayList());

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.vertx.test.core.AsyncTestBase
    public void setUp() throws Exception {
        super.setUp();
        this.taskQueue = new TaskQueue();
        AtomicInteger atomicInteger = new AtomicInteger();
        this.executor = runnable -> {
            new Thread(runnable, "vert.x-" + atomicInteger.getAndIncrement()).start();
        };
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.vertx.test.core.AsyncTestBase
    public void tearDown() throws Exception {
        for (int i = 0; i < this.threads.size(); i++) {
            try {
                this.threads.get(i).join();
            } finally {
                this.threads.clear();
            }
        }
        super.tearDown();
    }

    private void suspendAndAwaitResume(CountDownLatch countDownLatch) {
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            fail(e);
        }
    }

    @Test
    public void testCreateThread() throws Exception {
        AtomicReference atomicReference = new AtomicReference();
        this.taskQueue.execute(() -> {
            atomicReference.set(Thread.currentThread());
        }, this.executor);
        waitUntil(() -> {
            return atomicReference.get() != null;
        });
        Thread.sleep(10L);
        this.taskQueue.execute(() -> {
            assertNotSame(atomicReference.get(), Thread.currentThread());
            testComplete();
        }, this.executor);
        await();
    }

    @Test
    public void testAwaitSchedulesOnNewThread() {
        this.taskQueue.execute(() -> {
            Thread currentThread = Thread.currentThread();
            this.taskQueue.execute(() -> {
                assertNotSame(currentThread, Thread.currentThread());
                testComplete();
            }, this.executor);
            suspendAndAwaitResume(this.taskQueue.current().trySuspend());
        }, this.executor);
        await();
    }

    @Test
    public void testResumeFromAnotherThread() {
        this.taskQueue.execute(() -> {
            WorkerExecutor.Execution current = this.taskQueue.current();
            new Thread(() -> {
                try {
                    Thread.sleep(100L);
                    current.resume();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }).start();
            suspendAndAwaitResume(current.trySuspend());
            testComplete();
        }, this.executor);
        await();
    }

    @Test
    public void testResumeFromContextThread() {
        this.taskQueue.execute(() -> {
            WorkerExecutor.Execution current = this.taskQueue.current();
            CountDownLatch trySuspend = current.trySuspend();
            this.taskQueue.execute(() -> {
                try {
                    Thread.sleep(100L);
                    current.resume();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }, this.executor);
            suspendAndAwaitResume(trySuspend);
            testComplete();
        }, this.executor);
        await();
    }

    @Test
    public void testResumeWhenIdle() {
        this.taskQueue.execute(() -> {
            AtomicReference atomicReference = new AtomicReference();
            WorkerExecutor.Execution current = this.taskQueue.current();
            new Thread(() -> {
                Thread thread;
                while (true) {
                    thread = (Thread) atomicReference.get();
                    if (thread == null) {
                        try {
                            Thread.sleep(1L);
                        } catch (InterruptedException e) {
                        }
                    } else {
                        try {
                            break;
                        } catch (InterruptedException e2) {
                            e2.printStackTrace(System.out);
                        }
                    }
                }
                thread.join(2000L);
                current.resume();
            }).start();
            CountDownLatch trySuspend = current.trySuspend();
            this.taskQueue.execute(() -> {
                atomicReference.set(Thread.currentThread());
            }, this.executor);
            suspendAndAwaitResume(trySuspend);
            testComplete();
        }, this.executor);
        await();
    }

    @Test
    public void testUnscheduleRace2() {
        AtomicInteger atomicInteger = new AtomicInteger();
        this.taskQueue.execute(() -> {
            assertEquals("vert.x-0", Thread.currentThread().getName());
            CompletableFuture completableFuture = new CompletableFuture();
            this.taskQueue.execute(() -> {
                assertEquals("vert.x-0", Thread.currentThread().getName());
                assertEquals(0L, atomicInteger.getAndIncrement());
                WorkerExecutor.Execution current = this.taskQueue.current();
                completableFuture.whenComplete((r7, th) -> {
                    current.resume(() -> {
                        assertEquals("vert.x-1", Thread.currentThread().getName());
                        assertEquals(2L, atomicInteger.getAndIncrement());
                    });
                });
                suspendAndAwaitResume(current.trySuspend());
                assertEquals(3L, atomicInteger.getAndIncrement());
            }, this.executor);
            AtomicBoolean atomicBoolean = new AtomicBoolean();
            this.taskQueue.execute(() -> {
                assertEquals("vert.x-1", Thread.currentThread().getName());
                assertEquals(1L, atomicInteger.getAndIncrement());
                do {
                } while (!atomicBoolean.get());
                completableFuture.complete(null);
            }, this.executor);
            this.taskQueue.execute(() -> {
                assertEquals("vert.x-0", Thread.currentThread().getName());
                assertEquals(4L, atomicInteger.getAndIncrement());
                testComplete();
            }, this.executor);
            atomicBoolean.set(true);
        }, this.executor);
        await();
    }

    @Test
    public void shouldNotHaveTaskInQueueWhenTaskHasBeenRejected() {
        Executor executor = runnable -> {
            throw new RejectedExecutionException();
        };
        TaskQueue taskQueue = new TaskQueue();
        Assertions.assertThatThrownBy(() -> {
            taskQueue.execute(this::fail, executor);
        }).isInstanceOf(RejectedExecutionException.class);
        Assertions.assertThat(taskQueue.isEmpty()).isTrue();
    }

    @Test
    public void testCloseSuspendedTasks() {
        TaskQueue taskQueue = new TaskQueue();
        ConcurrentLinkedDeque concurrentLinkedDeque = new ConcurrentLinkedDeque();
        Objects.requireNonNull(concurrentLinkedDeque);
        Executor executor = (v1) -> {
            r0.add(v1);
        };
        Runnable runnable = () -> {
            taskQueue.current().trySuspend();
        };
        taskQueue.execute(runnable, executor);
        assertEquals(1L, concurrentLinkedDeque.size());
        ((Runnable) concurrentLinkedDeque.pop()).run();
        TaskQueue.CloseResult close = taskQueue.close();
        assertEquals(1L, close.suspendedTasks().size());
        assertEquals(1L, close.suspendedThreads().size());
        assertSame(runnable, close.suspendedTasks().get(0));
    }

    @Test
    public void testCloseResumingTasks() {
        TaskQueue taskQueue = new TaskQueue();
        ConcurrentLinkedDeque concurrentLinkedDeque = new ConcurrentLinkedDeque();
        Objects.requireNonNull(concurrentLinkedDeque);
        Executor executor = (v1) -> {
            r0.add(v1);
        };
        AtomicReference atomicReference = new AtomicReference();
        Runnable runnable = () -> {
            WorkerExecutor.Execution current = taskQueue.current();
            atomicReference.set(current);
            current.trySuspend();
        };
        taskQueue.execute(runnable, executor);
        assertEquals(1L, concurrentLinkedDeque.size());
        taskQueue.execute(() -> {
        }, runnable2 -> {
        });
        ((Runnable) concurrentLinkedDeque.pop()).run();
        ((WorkerExecutor.Execution) atomicReference.get()).resume();
        TaskQueue.CloseResult close = taskQueue.close();
        assertEquals(1L, close.suspendedTasks().size());
        assertEquals(1L, close.suspendedThreads().size());
        assertSame(runnable, close.suspendedTasks().get(0));
    }

    @Test
    public void testCloseBeforeSuspend() {
        TaskQueue taskQueue = new TaskQueue();
        ConcurrentLinkedDeque concurrentLinkedDeque = new ConcurrentLinkedDeque();
        Objects.requireNonNull(concurrentLinkedDeque);
        Executor executor = (v1) -> {
            r0.add(v1);
        };
        AtomicReference atomicReference = new AtomicReference();
        taskQueue.execute(() -> {
            Thread thread = new Thread(() -> {
                atomicReference.set(taskQueue.close());
            });
            thread.start();
            try {
                thread.join();
                assertNull(taskQueue.current().trySuspend());
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }, executor);
        ((Runnable) concurrentLinkedDeque.pop()).run();
        assertTrue(((TaskQueue.CloseResult) atomicReference.get()).suspendedThreads().isEmpty());
        assertNotNull(((TaskQueue.CloseResult) atomicReference.get()).activeThread());
    }

    @Test
    public void testCloseBeforeResumeExecution() {
        TaskQueue taskQueue = new TaskQueue();
        ConcurrentLinkedDeque concurrentLinkedDeque = new ConcurrentLinkedDeque();
        Objects.requireNonNull(concurrentLinkedDeque);
        taskQueue.execute(() -> {
            WorkerExecutor.Execution current = taskQueue.current();
            current.resume();
            assertNull(current.trySuspend());
            taskQueue.close();
        }, (v1) -> {
            r0.add(v1);
        });
        ((Runnable) concurrentLinkedDeque.pop()).run();
        assertEquals(0L, concurrentLinkedDeque.size());
    }

    @Test
    public void testCloseBetweenSuspendAndAwait() {
        TaskQueue taskQueue = new TaskQueue();
        ConcurrentLinkedDeque concurrentLinkedDeque = new ConcurrentLinkedDeque();
        Objects.requireNonNull(concurrentLinkedDeque);
        Executor executor = (v1) -> {
            r0.add(v1);
        };
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        taskQueue.execute(() -> {
            CountDownLatch trySuspend = taskQueue.current().trySuspend();
            AtomicBoolean atomicBoolean2 = new AtomicBoolean();
            new Thread(() -> {
                ((Thread) taskQueue.close().suspendedThreads().get(0)).interrupt();
                atomicBoolean2.set(true);
            }).start();
            while (!atomicBoolean2.get()) {
                Thread.yield();
            }
            try {
                trySuspend.await();
            } catch (InterruptedException e) {
                atomicBoolean.set(true);
            }
        }, executor);
        ((Runnable) concurrentLinkedDeque.pop()).run();
        assertTrue(atomicBoolean.get());
    }

    @Test
    public void testSubmitAfterClose() {
        TaskQueue taskQueue = new TaskQueue();
        taskQueue.close();
        ConcurrentLinkedDeque concurrentLinkedDeque = new ConcurrentLinkedDeque();
        Objects.requireNonNull(concurrentLinkedDeque);
        taskQueue.execute(() -> {
        }, (v1) -> {
            r0.add(v1);
        });
        assertEquals(1L, concurrentLinkedDeque.size());
    }

    @Test
    public void testSuspendAfterResume() {
        AtomicInteger atomicInteger = new AtomicInteger();
        TaskQueue taskQueue = new TaskQueue();
        ConcurrentLinkedDeque concurrentLinkedDeque = new ConcurrentLinkedDeque();
        Objects.requireNonNull(concurrentLinkedDeque);
        taskQueue.execute(() -> {
            assertEquals(0L, atomicInteger.getAndIncrement());
            taskQueue.execute(() -> {
                assertEquals(2L, atomicInteger.getAndIncrement());
            }, this.executor);
            WorkerExecutor.Execution current = taskQueue.current();
            assertEquals(1L, atomicInteger.getAndIncrement());
            current.resume();
            assertNull(current.trySuspend());
        }, (v1) -> {
            r0.add(v1);
        });
        ((Runnable) concurrentLinkedDeque.poll()).run();
        assertEquals(2L, atomicInteger.get());
    }
}
