package io.vertx.tests.streams;

import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.internal.ContextInternal;
import io.vertx.core.internal.VertxInternal;
import io.vertx.core.internal.streams.ReadStreamIterator;
import io.vertx.core.streams.ReadStream;
import io.vertx.core.streams.StreamBase;
import io.vertx.test.core.Repeat;
import io.vertx.test.core.VertxTestBase;
import io.vertx.test.fakestream.FakeStream;
import java.lang.Thread;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.junit.Assume;
import org.junit.Test;

/* loaded from: input_file:io/vertx/tests/streams/IteratorTest.class */
public class IteratorTest extends VertxTestBase {

    /* renamed from: io.vertx.tests.streams.IteratorTest$1Consumer, reason: invalid class name */
    /* loaded from: input_file:io/vertx/tests/streams/IteratorTest$1Consumer.class */
    class C1Consumer extends Thread {
        final List<Integer> consumed = new ArrayList();
        final /* synthetic */ CyclicBarrier val$barrier;
        final /* synthetic */ Iterator val$iterator;

        C1Consumer(CyclicBarrier cyclicBarrier, Iterator it) {
            this.val$barrier = cyclicBarrier;
            this.val$iterator = it;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                this.val$barrier.await();
                while (true) {
                    try {
                        this.consumed.add((Integer) this.val$iterator.next());
                    } catch (NoSuchElementException e) {
                        return;
                    }
                }
            } catch (Exception e2) {
            }
        }
    }

    @Test
    public void testIteratorResuming() {
        FakeStream fakeStream = new FakeStream();
        fakeStream.m39setWriteQueueMaxSize(0);
        Iterator it = ReadStreamIterator.iterator(fakeStream);
        for (int i = 0; i < 16; i++) {
            assertFalse(fakeStream.writeQueueFull());
            fakeStream.write(Integer.valueOf(i));
        }
        fakeStream.write(17);
        assertTrue(fakeStream.writeQueueFull());
        for (int i2 = 0; i2 < 16; i2++) {
            it.next();
        }
        assertFalse(fakeStream.writeQueueFull());
    }

    @Test
    public void testEnd() {
        FakeStream fakeStream = new FakeStream();
        Iterator it = ReadStreamIterator.iterator(fakeStream);
        for (int i = 0; i < 15; i++) {
            fakeStream.write(Integer.valueOf(i));
        }
        fakeStream.end();
        for (int i2 = 0; i2 < 15; i2++) {
            assertTrue(it.hasNext());
            it.next();
        }
        assertFalse(it.hasNext());
        try {
            it.next();
            fail();
        } catch (NoSuchElementException e) {
        }
    }

    @Test
    public void testFail() {
        FakeStream fakeStream = new FakeStream();
        Iterator it = ReadStreamIterator.iterator(fakeStream);
        for (int i = 0; i < 15; i++) {
            fakeStream.write(Integer.valueOf(i));
        }
        Throwable th = new Throwable();
        fakeStream.fail(th);
        for (int i2 = 0; i2 < 15; i2++) {
            assertTrue(it.hasNext());
            it.next();
        }
        assertTrue(it.hasNext());
        try {
            it.next();
            fail();
        } catch (Throwable th2) {
            assertSame(th, th2);
        }
    }

    @Test
    public void testHasNextSignal() throws Exception {
        FakeStream fakeStream = new FakeStream();
        Iterator it = ReadStreamIterator.iterator(fakeStream);
        Thread[] threadArr = new Thread[4];
        for (int i = 0; i < 4; i++) {
            Objects.requireNonNull(it);
            Thread thread = new Thread(it::hasNext);
            threadArr[i] = thread;
            thread.start();
            assertWaitUntil(() -> {
                return thread.getState() == Thread.State.WAITING;
            });
        }
        fakeStream.end();
        for (Thread thread2 : threadArr) {
            thread2.join();
        }
    }

    /* JADX WARN: Type inference failed for: r0v0, types: [io.vertx.tests.streams.IteratorTest$1Stream, io.vertx.core.streams.ReadStream] */
    @Repeat(times = 100)
    @Test
    public void testConcurrentReads() throws Exception {
        ?? r0 = new ReadStream<Integer>() { // from class: io.vertx.tests.streams.IteratorTest.1Stream
            private Handler<Integer> handler;
            private Handler<Void> endHandler;
            private long demand = Long.MAX_VALUE;
            private final Lock lock = new ReentrantLock();
            private final Condition producerSignal = this.lock.newCondition();

            public ReadStream<Integer> exceptionHandler(Handler<Throwable> handler) {
                return this;
            }

            public void write(Integer num) throws InterruptedException {
                long j;
                this.lock.lock();
                while (true) {
                    try {
                        j = this.demand;
                        if (j > 0) {
                            break;
                        } else {
                            this.producerSignal.await();
                        }
                    } finally {
                        this.lock.unlock();
                    }
                }
                if (j != Long.MAX_VALUE) {
                    this.demand = j - 1;
                }
                Handler<Integer> handler = this.handler;
                if (handler != null) {
                    handler.handle(num);
                }
            }

            public void end() throws InterruptedException {
                this.lock.lock();
                while (this.demand <= 0) {
                    try {
                        this.producerSignal.await();
                    } finally {
                        this.lock.unlock();
                    }
                }
                Handler<Void> handler = this.endHandler;
                if (handler != null) {
                    handler.handle((Object) null);
                }
            }

            public ReadStream<Integer> handler(Handler<Integer> handler) {
                this.lock.lock();
                try {
                    this.handler = handler;
                    return this;
                } finally {
                    this.lock.unlock();
                }
            }

            public ReadStream<Integer> endHandler(Handler<Void> handler) {
                this.lock.lock();
                try {
                    this.endHandler = handler;
                    return this;
                } finally {
                    this.lock.unlock();
                }
            }

            public ReadStream<Integer> pause() {
                this.lock.lock();
                try {
                    this.demand = 0L;
                    return this;
                } finally {
                    this.lock.unlock();
                }
            }

            public ReadStream<Integer> resume() {
                return fetch(Long.MAX_VALUE);
            }

            public ReadStream<Integer> fetch(long j) {
                if (j < 0) {
                    throw new IllegalArgumentException();
                }
                if (j > 0) {
                    this.lock.lock();
                    try {
                        long j2 = this.demand + j;
                        if (j2 < 0) {
                            j2 = Long.MAX_VALUE;
                        }
                        this.demand = j2;
                        this.producerSignal.signal();
                        this.lock.unlock();
                    } catch (Throwable th) {
                        this.lock.unlock();
                        throw th;
                    }
                }
                return this;
            }

            /* renamed from: exceptionHandler, reason: collision with other method in class */
            public /* bridge */ /* synthetic */ StreamBase m121exceptionHandler(Handler handler) {
                return exceptionHandler((Handler<Throwable>) handler);
            }
        };
        Iterator it = ReadStreamIterator.iterator((ReadStream) r0);
        CyclicBarrier cyclicBarrier = new CyclicBarrier(8 + 1);
        C1Consumer[] c1ConsumerArr = new C1Consumer[16384];
        for (int i = 0; i < 8; i++) {
            C1Consumer c1Consumer = new C1Consumer(cyclicBarrier, it);
            c1Consumer.start();
            c1ConsumerArr[i] = c1Consumer;
        }
        cyclicBarrier.await();
        for (int i2 = 0; i2 < 16384; i2++) {
            r0.write(Integer.valueOf(i2));
        }
        r0.end();
        ArrayList arrayList = new ArrayList();
        for (int i3 = 0; i3 < 8; i3++) {
            C1Consumer c1Consumer2 = c1ConsumerArr[i3];
            c1Consumer2.join(1000L);
            if (c1Consumer2.getState() != Thread.State.TERMINATED) {
                System.out.println("Could not join timely " + c1Consumer2 + ":");
                Exception exc = new Exception();
                exc.setStackTrace(c1Consumer2.getStackTrace());
                exc.printStackTrace(System.out);
                fail();
            }
            arrayList.addAll(c1Consumer2.consumed);
        }
        assertEquals(arrayList.size(), 16384);
    }

    @Test
    public void testVirtualThread() {
        VertxInternal vertxInternal = (VertxInternal) this.vertx;
        Assume.assumeTrue(vertxInternal.isVirtualThreadAvailable());
        doTestVirtualThread(vertxInternal);
    }

    private void doTestVirtualThread(VertxInternal vertxInternal) {
        FakeStream fakeStream = new FakeStream();
        Iterator it = ReadStreamIterator.iterator(fakeStream);
        ContextInternal createVirtualThreadContext = vertxInternal.createVirtualThreadContext();
        AtomicInteger atomicInteger = new AtomicInteger();
        createVirtualThreadContext.runOnContext(r11 -> {
            createVirtualThreadContext.runOnContext(r9 -> {
                assertEquals(0L, atomicInteger.getAndIncrement());
                fakeStream.write(0);
            });
            assertEquals(0L, atomicInteger.get());
            it.next();
            assertEquals(1L, atomicInteger.getAndIncrement());
            testComplete();
        });
        await();
    }

    @Test
    public void testBlockingStreamFromVirtualThread() {
        VertxInternal vertxInternal = this.vertx;
        Assume.assumeTrue(vertxInternal.isVirtualThreadAvailable());
        ContextInternal createVirtualThreadContext = vertxInternal.createVirtualThreadContext();
        testBlockingStream(runnable -> {
            createVirtualThreadContext.runOnContext(r3 -> {
                runnable.run();
            });
        });
    }

    @Test
    public void testBlockingStreamFromVanillaThread() {
        testBlockingStream(runnable -> {
            new Thread(runnable).start();
        });
    }

    @Test
    public void testBlockingStreamFromVertxThread() {
        VertxInternal vertxInternal = this.vertx;
        FakeStream fakeStream = new FakeStream();
        List synchronizedList = Collections.synchronizedList(new ArrayList());
        Stream blockingStream = fakeStream.blockingStream();
        ContextInternal createEventLoopContext = vertxInternal.createEventLoopContext();
        Objects.requireNonNull(synchronizedList);
        createEventLoopContext.exceptionHandler((v1) -> {
            r1.add(v1);
        });
        createEventLoopContext.runOnContext(r5 -> {
            blockingStream.forEach(num -> {
                fail();
            });
        });
        assertWaitUntil(() -> {
            return synchronizedList.size() == 1;
        });
        assertEquals(IllegalStateException.class, ((Throwable) synchronizedList.get(0)).getClass());
    }

    private void testBlockingStream(Executor executor) {
        FakeStream fakeStream = new FakeStream();
        Stream blockingStream = fakeStream.blockingStream();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 4; i++) {
            arrayList.add(Integer.valueOf(i));
        }
        List synchronizedList = Collections.synchronizedList(new ArrayList());
        executor.execute(() -> {
            synchronizedList.addAll((Collection) blockingStream.collect(Collectors.toList()));
        });
        new Thread(() -> {
            try {
                Iterator it = arrayList.iterator();
                while (it.hasNext()) {
                    fakeStream.write(Integer.valueOf(((Integer) it.next()).intValue()));
                    Thread.sleep(10L);
                }
                fakeStream.end();
            } catch (InterruptedException e) {
            }
        }).start();
        assertWaitUntil(() -> {
            return synchronizedList.equals(arrayList);
        });
    }

    @Test
    public void testBlockingStreamInterleavingFromVirtualThread() {
        VertxInternal vertxInternal = this.vertx;
        Assume.assumeTrue(vertxInternal.isVirtualThreadAvailable());
        ContextInternal createVirtualThreadContext = vertxInternal.createVirtualThreadContext();
        FakeStream fakeStream = new FakeStream();
        Stream blockingStream = fakeStream.blockingStream();
        int i = 32;
        createVirtualThreadContext.runOnContext(r16 -> {
            AtomicInteger atomicInteger = new AtomicInteger();
            vertxInternal.setPeriodic(5L, l -> {
                assertSame(createVirtualThreadContext, Vertx.currentContext().unwrap());
                int andIncrement = atomicInteger.getAndIncrement();
                if (andIncrement != i) {
                    fakeStream.write(Integer.valueOf(andIncrement));
                } else {
                    vertxInternal.cancelTimer(l.longValue());
                    fakeStream.end();
                }
            });
            ArrayList arrayList = new ArrayList();
            Objects.requireNonNull(arrayList);
            blockingStream.forEach((v1) -> {
                r1.add(v1);
            });
            assertEquals(i, arrayList.size());
            testComplete();
        });
        await();
    }

    @Test
    public void testStreamFailure() {
        RuntimeException runtimeException = new RuntimeException();
        FakeStream fakeStream = new FakeStream();
        Stream blockingStream = fakeStream.blockingStream();
        fakeStream.fail(runtimeException);
        try {
            blockingStream.collect(Collectors.toList());
        } catch (Exception e) {
            assertSame(runtimeException, e);
        }
    }

    @Test
    public void testDeadlockFromVirtualStream() {
        VertxInternal vertxInternal = this.vertx;
        Assume.assumeTrue(vertxInternal.isVirtualThreadAvailable());
        ContextInternal createVirtualThreadContext = vertxInternal.createVirtualThreadContext();
        FakeStream fakeStream = new FakeStream();
        Stream blockingStream = fakeStream.blockingStream();
        int i = 32;
        List list = (List) IntStream.range(0, 32).boxed().collect(Collectors.toList());
        AtomicInteger atomicInteger = new AtomicInteger();
        createVirtualThreadContext.setPeriodic(1L, l -> {
            int incrementAndGet = atomicInteger.incrementAndGet();
            fakeStream.write(Integer.valueOf(incrementAndGet - 1));
            if (incrementAndGet == i) {
                vertxInternal.cancelTimer(l.longValue());
                fakeStream.end();
            }
        });
        createVirtualThreadContext.runOnContext(r7 -> {
            assertEquals(list, (List) blockingStream.collect(Collectors.toList()));
            testComplete();
        });
        await();
    }
}
