package io.vertx.tests.concurrent;

import io.vertx.core.Context;
import io.vertx.core.Handler;
import io.vertx.core.VertxOptions;
import io.vertx.core.internal.VertxInternal;
import io.vertx.core.internal.concurrent.InboundMessageQueue;
import io.vertx.test.core.VertxTestBase;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.IntConsumer;
import org.junit.Test;

/* loaded from: input_file:io/vertx/tests/concurrent/InboundMessageQueueTest.class */
public abstract class InboundMessageQueueTest extends VertxTestBase {
    private volatile Thread producerThread;
    private volatile Thread consumerThread;
    Context context;
    TestChannel queue;
    final AtomicInteger sequence = new AtomicInteger();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/vertx/tests/concurrent/InboundMessageQueueTest$TestChannel.class */
    public class TestChannel extends InboundMessageQueue<Integer> {
        final IntConsumer consumer;
        private Handler<Void> drainHandler;
        private volatile boolean writable;
        private int size;

        public TestChannel(IntConsumer intConsumer) {
            super(InboundMessageQueueTest.this.context.eventLoop(), InboundMessageQueueTest.this.context.executor());
            this.consumer = intConsumer;
            this.writable = true;
        }

        public TestChannel(IntConsumer intConsumer, int i, int i2) {
            super(InboundMessageQueueTest.this.context.eventLoop(), InboundMessageQueueTest.this.context.executor(), i, i2);
            this.consumer = intConsumer;
            this.writable = true;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public int size() {
            return this.size;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void handleMessage(Integer num) {
            this.size--;
            this.consumer.accept(num.intValue());
        }

        protected void handleResume() {
            this.writable = true;
            Handler<Void> handler = this.drainHandler;
            if (handler != null) {
                handler.handle((Object) null);
            }
        }

        public boolean isWritable() {
            return this.writable;
        }

        protected void handlePause() {
            this.writable = false;
        }

        final void resume() {
            fetch(Long.MAX_VALUE);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public final int fill() {
            int i = 0;
            boolean z = false;
            while (this.writable) {
                z |= add(Integer.valueOf(InboundMessageQueueTest.this.sequence.getAndIncrement()));
                i++;
            }
            this.size += i;
            if (z) {
                drain();
            }
            return i;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public final boolean emit() {
            this.size++;
            write(Integer.valueOf(InboundMessageQueueTest.this.sequence.getAndIncrement()));
            return this.writable;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public final boolean emit(int i) {
            this.size += i;
            boolean z = false;
            for (int i2 = 0; i2 < i; i2++) {
                z |= add(Integer.valueOf(InboundMessageQueueTest.this.sequence.getAndIncrement()));
            }
            if (z) {
                drain();
            }
            return this.writable;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public TestChannel drainHandler(Handler<Void> handler) {
            this.drainHandler = handler;
            return this;
        }
    }

    protected abstract Context createContext(VertxInternal vertxInternal);

    @Override // io.vertx.test.core.VertxTestBase, io.vertx.test.core.AsyncTestBase
    public void setUp() throws Exception {
        super.setUp();
        this.context = createContext((VertxInternal) this.vertx);
        this.sequence.set(0);
        this.context.runOnContext(r4 -> {
            this.consumerThread = Thread.currentThread();
        });
        this.context.nettyEventLoop().execute(() -> {
            this.producerThread = Thread.currentThread();
        });
        waitUntil(() -> {
            return (this.consumerThread == null || this.producerThread == null) ? false : true;
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.vertx.test.core.VertxTestBase
    public VertxOptions getOptions() {
        return super.getOptions().setWorkerPoolSize(1);
    }

    @Override // io.vertx.test.core.VertxTestBase, io.vertx.test.core.AsyncTestBase
    public void tearDown() throws Exception {
        super.tearDown();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final void assertConsumer() {
        assertSame(this.consumerThread, Thread.currentThread());
    }

    protected final void assertProducer() {
        assertSame(this.producerThread, Thread.currentThread());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final void producerTask(Runnable runnable) {
        this.context.nettyEventLoop().execute(runnable);
    }

    protected final void consumerTask(Runnable runnable) {
        this.context.runOnContext(r3 -> {
            runnable.run();
        });
    }

    protected final TestChannel buffer(IntConsumer intConsumer) {
        return new TestChannel(intConsumer);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final TestChannel buffer(IntConsumer intConsumer, int i, int i2) {
        return new TestChannel(intConsumer, i, i2);
    }

    @Test
    public void testFlowing() {
        AtomicInteger atomicInteger = new AtomicInteger();
        this.queue = buffer(i -> {
            assertConsumer();
            assertEquals(0L, i);
            assertEquals(0L, atomicInteger.getAndIncrement());
            testComplete();
        });
        producerTask(() -> {
            assertTrue(this.queue.emit());
        });
        await();
    }

    @Test
    public void testTake() {
        AtomicInteger atomicInteger = new AtomicInteger();
        this.queue = buffer(i -> {
            assertConsumer();
            assertEquals(0L, i);
            assertEquals(0L, atomicInteger.getAndIncrement());
            testComplete();
        });
        consumerTask(() -> {
            this.queue.pause();
            this.queue.fetch(1L);
            producerTask(() -> {
                this.queue.emit();
            });
        });
        await();
    }

    @Test
    public void testFlowingAdd() {
        AtomicInteger atomicInteger = new AtomicInteger();
        this.queue = buffer(i -> {
            assertConsumer();
            atomicInteger.getAndIncrement();
        });
        producerTask(() -> {
            assertTrue(this.queue.emit());
            assertWaitUntil(() -> {
                return atomicInteger.get() == 1;
            });
            assertTrue(this.queue.emit());
            assertWaitUntil(() -> {
                return atomicInteger.get() == 2;
            });
            testComplete();
        });
        await();
    }

    @Test
    public void testFlowingRefill() {
        AtomicInteger atomicInteger = new AtomicInteger();
        this.queue = buffer(i -> {
            assertConsumer();
            atomicInteger.getAndIncrement();
        }, 5, 5).drainHandler(r8 -> {
            assertProducer();
            assertEquals(8L, atomicInteger.get());
            testComplete();
        });
        this.queue.pause();
        producerTask(() -> {
            int i2 = 0;
            while (i2 < 8) {
                assertEquals("Expected " + i2 + " to be bilto", Boolean.valueOf(i2 < 4), Boolean.valueOf(this.queue.emit()));
                i2++;
            }
            this.queue.resume();
        });
        await();
    }

    @Test
    public void testPauseWhenFull() {
        AtomicInteger atomicInteger = new AtomicInteger();
        AtomicInteger atomicInteger2 = new AtomicInteger();
        this.queue = buffer(i -> {
            assertConsumer();
            assertEquals(0L, atomicInteger2.get());
            assertEquals(0L, atomicInteger.getAndIncrement());
            testComplete();
        }, 5, 5).drainHandler(r8 -> {
            assertProducer();
            assertEquals(0L, atomicInteger2.getAndIncrement());
        });
        producerTask(() -> {
            this.queue.pause();
            int i2 = 0;
            while (i2 < 5) {
                assertEquals(Boolean.valueOf(i2 < 4), Boolean.valueOf(this.queue.emit()));
                i2++;
            }
            this.queue.fetch(1L);
        });
        await();
    }

    @Test
    public void testPausedResume() {
        AtomicInteger atomicInteger = new AtomicInteger();
        AtomicInteger atomicInteger2 = new AtomicInteger();
        this.queue = buffer(i -> {
            assertConsumer();
            atomicInteger2.getAndIncrement();
        }, 5, 5).drainHandler(r9 -> {
            assertProducer();
            assertEquals(0L, atomicInteger.getAndIncrement());
            assertEquals(5L, atomicInteger2.get());
            testComplete();
        });
        producerTask(() -> {
            this.queue.pause();
            this.queue.fill();
            this.queue.resume();
        });
        await();
    }

    @Test
    public void testPausedDrain() {
        waitFor(2);
        AtomicInteger atomicInteger = new AtomicInteger();
        AtomicInteger atomicInteger2 = new AtomicInteger();
        this.queue = buffer(i -> {
            assertConsumer();
            assertEquals(0L, atomicInteger.get());
            atomicInteger2.getAndIncrement();
        }, 5, 5);
        this.queue.drainHandler(r9 -> {
            assertProducer();
            assertEquals(0L, atomicInteger.getAndIncrement());
            assertEquals(5L, atomicInteger2.get());
            complete();
        });
        producerTask(() -> {
            this.queue.pause();
            this.queue.fill();
            assertEquals(0L, atomicInteger.get());
            assertEquals(0L, atomicInteger2.get());
            this.queue.resume();
            complete();
        });
        await();
    }

    @Test
    public void testPausedRequestLimited() {
        AtomicInteger atomicInteger = new AtomicInteger();
        AtomicInteger atomicInteger2 = new AtomicInteger();
        this.queue = buffer(i -> {
            assertConsumer();
            atomicInteger.getAndIncrement();
        }, 3, 3).drainHandler(r8 -> {
            assertProducer();
            assertEquals(0L, atomicInteger2.getAndIncrement());
        });
        producerTask(() -> {
            this.queue.pause();
            this.queue.fetch(1L);
            assertEquals(0L, atomicInteger2.get());
            assertEquals(0L, atomicInteger.get());
            assertTrue(this.queue.emit());
            assertEquals(0L, atomicInteger2.get());
            Objects.requireNonNull(atomicInteger);
            waitUntilEquals(1, atomicInteger::get);
            assertTrue(this.queue.emit());
            assertEquals(0L, atomicInteger2.get());
            assertEquals(1L, atomicInteger.get());
            assertTrue(this.queue.emit());
            assertEquals(0L, atomicInteger2.get());
            assertEquals(1L, atomicInteger.get());
            assertFalse(this.queue.emit());
            assertEquals(0L, atomicInteger2.get());
            assertEquals(1L, atomicInteger.get());
            testComplete();
        });
        await();
    }

    @Test
    public void testPushReturnsTrueUntilHighWatermark() {
        AtomicInteger atomicInteger = new AtomicInteger();
        this.queue = buffer(i -> {
            atomicInteger.incrementAndGet();
        }, 2, 2);
        producerTask(() -> {
            this.queue.pause();
            this.queue.fetch(1L);
            assertTrue(this.queue.emit());
            assertWaitUntil(() -> {
                return atomicInteger.get() == 1;
            });
            assertTrue(this.queue.emit());
            assertFalse(this.queue.emit());
            testComplete();
        });
        await();
    }

    @Test
    public void testHighWaterMark() {
        this.queue = buffer(i -> {
        }, 5, 5);
        producerTask(() -> {
            this.queue.pause();
            this.queue.fill();
            assertEquals(5L, this.sequence.get());
            testComplete();
        });
        await();
    }

    @Test
    public void testAddAllWhenPaused() {
        waitFor(2);
        AtomicInteger atomicInteger = new AtomicInteger();
        AtomicInteger atomicInteger2 = new AtomicInteger();
        AtomicInteger atomicInteger3 = new AtomicInteger();
        this.queue = buffer(i -> {
            atomicInteger.incrementAndGet();
            assertEquals(0L, atomicInteger3.get());
            assertEquals(0L, atomicInteger2.get());
            this.queue.fetch(1L);
        }, 4, 4).drainHandler(r9 -> {
            assertEquals(5L, atomicInteger.get());
            atomicInteger3.incrementAndGet();
            complete();
        });
        producerTask(() -> {
            this.queue.pause();
            assertFalse(this.queue.emit(5));
            this.queue.fetch(1L);
            complete();
        });
        await();
    }

    @Test
    public void testAddAllWhenFlowing() {
        AtomicInteger atomicInteger = new AtomicInteger();
        AtomicInteger atomicInteger2 = new AtomicInteger();
        this.queue = buffer(i -> {
            atomicInteger.incrementAndGet();
        }, 4, 4).drainHandler(r3 -> {
            atomicInteger2.incrementAndGet();
        });
        producerTask(() -> {
            this.queue.emit(4);
        });
        Objects.requireNonNull(atomicInteger2);
        waitUntilEquals(1, atomicInteger2::get);
        Objects.requireNonNull(atomicInteger);
        waitUntilEquals(4, atomicInteger::get);
    }

    @Test
    public void testCheckThatPauseAfterResumeWontDoAnyEmission() {
        AtomicInteger atomicInteger = new AtomicInteger();
        this.queue = buffer(i -> {
            atomicInteger.incrementAndGet();
        }, 4, 4);
        producerTask(() -> {
            this.queue.pause();
            this.queue.fill();
            consumerTask(() -> {
                this.queue.resume();
                this.queue.pause();
                this.vertx.setTimer(20L, l -> {
                    assertEquals(0L, atomicInteger.get());
                    testComplete();
                });
            });
        });
        await();
    }

    @Test
    public void testBufferSignalingFullImmediately() {
        List synchronizedList = Collections.synchronizedList(new ArrayList());
        AtomicInteger atomicInteger = new AtomicInteger();
        Objects.requireNonNull(synchronizedList);
        this.queue = buffer((v1) -> {
            r2.add(v1);
        }, 1, 1);
        producerTask(() -> {
            this.queue.drainHandler(r9 -> {
                switch (atomicInteger.getAndIncrement()) {
                    case 0:
                        producerTask(() -> {
                            assertFalse(this.queue.emit());
                            this.queue.resume();
                        });
                        return;
                    case 1:
                        assertEquals(Arrays.asList(0, 1), synchronizedList);
                        testComplete();
                        return;
                    default:
                        return;
                }
            });
            this.queue.emit();
            assertWaitUntil(() -> {
                return synchronizedList.size() == 1;
            });
            assertEquals(Collections.singletonList(0), synchronizedList);
            this.queue.pause();
        });
        await();
    }

    @Test
    public void testClose() {
        List synchronizedList = Collections.synchronizedList(new ArrayList());
        final List synchronizedList2 = Collections.synchronizedList(new ArrayList());
        Objects.requireNonNull(synchronizedList);
        this.queue = new TestChannel((v1) -> {
            r4.add(v1);
        }, 1, 1) { // from class: io.vertx.tests.concurrent.InboundMessageQueueTest.1
            /* JADX INFO: Access modifiers changed from: protected */
            public void handleDispose(Integer num) {
                synchronizedList2.add(num);
            }
        };
        producerTask(() -> {
            this.queue.pause();
            this.queue.emit(5);
            this.queue.closeProducer();
            consumerTask(() -> {
                this.queue.closeConsumer();
                assertEquals(Collections.emptyList(), synchronizedList);
                assertEquals(Arrays.asList(0, 1, 2, 3, 4), synchronizedList2);
                producerTask(() -> {
                    this.queue.write(5);
                    testComplete();
                });
            });
        });
        await();
    }
}
