package io.vertx.tests.net;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.embedded.EmbeddedChannel;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.internal.ContextInternal;
import io.vertx.core.internal.buffer.BufferInternal;
import io.vertx.core.internal.net.NetSocketInternal;
import io.vertx.core.net.NetClient;
import io.vertx.core.net.NetServer;
import io.vertx.core.net.NetSocket;
import io.vertx.core.net.impl.VertxConnection;
import io.vertx.core.net.impl.VertxHandler;
import io.vertx.core.transport.Transport;
import io.vertx.test.core.TestUtils;
import io.vertx.test.core.VertxTestBase;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import org.junit.Assume;
import org.junit.Ignore;
import org.junit.Test;

/* loaded from: input_file:io/vertx/tests/net/VertxConnectionTest.class */
public class VertxConnectionTest extends VertxTestBase {
    private NetClient client;
    private NetServer server;
    private volatile Handler<NetSocketInternal> connectHandler;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/vertx/tests/net/VertxConnectionTest$Message.class */
    public static class Message {
        final String id;

        Message(String str) {
            this.id = str;
        }
    }

    /* loaded from: input_file:io/vertx/tests/net/VertxConnectionTest$MessageFactory.class */
    static class MessageFactory {
        int seq = 0;

        MessageFactory() {
        }

        Message next() {
            int i = this.seq;
            this.seq = i + 1;
            return new Message("msg-" + i);
        }

        Message[] next(int i) {
            Message[] messageArr = new Message[i];
            for (int i2 = 0; i2 < i; i2++) {
                messageArr[i2] = next();
            }
            return messageArr;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/vertx/tests/net/VertxConnectionTest$TestConnection.class */
    public class TestConnection extends VertxConnection {
        Handler<Message> handler;
        Handler<Void> readCompleteHandler;

        public TestConnection(ChannelHandlerContext channelHandlerContext) {
            super(VertxConnectionTest.this.vertx.contextBuilder().withEventLoop(channelHandlerContext.executor()).build(), channelHandlerContext);
        }

        protected void handleMessage(Object obj) {
            Handler<Message> handler = this.handler;
            if (handler != null) {
                handler.handle((Message) obj);
            }
        }

        protected void handleReadComplete() {
            Handler<Void> handler = this.readCompleteHandler;
            if (handler != null) {
                handler.handle((Object) null);
            }
        }

        public void pause() {
            doPause();
        }

        public void resume() {
            this.chctx.executor().execute(this::doResume);
        }
    }

    @Override // io.vertx.test.core.AsyncTestBase
    public void before() throws Exception {
        super.before();
        this.client = this.vertx.createNetClient();
        this.server = this.vertx.createNetServer().connectHandler(netSocket -> {
            Handler<NetSocketInternal> handler = this.connectHandler;
            if (handler != null) {
                handler.handle((NetSocketInternal) netSocket);
            } else {
                netSocket.close();
            }
        });
        awaitFuture(this.server.listen(1234, "localhost"));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.vertx.test.core.VertxTestBase, io.vertx.test.core.AsyncTestBase
    public void tearDown() throws Exception {
        super.tearDown();
        this.client = null;
        this.server = null;
    }

    @Test
    public void testQueueMessagesMissMessage() throws Exception {
        disableThreadChecks();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        this.connectHandler = netSocketInternal -> {
            ChannelPipeline pipeline = netSocketInternal.channelHandlerContext().pipeline();
            final ArrayList arrayList = new ArrayList();
            pipeline.addBefore("handler", "myhandler", new ChannelDuplexHandler() { // from class: io.vertx.tests.net.VertxConnectionTest.1
                int flushCount;

                public void write(ChannelHandlerContext channelHandlerContext, Object obj, ChannelPromise channelPromise) throws Exception {
                    if (obj instanceof String) {
                        arrayList.add((String) obj);
                    } else {
                        super.write(channelHandlerContext, obj, channelPromise);
                    }
                }

                public void flush(ChannelHandlerContext channelHandlerContext) throws Exception {
                    super.flush(channelHandlerContext);
                    int i = this.flushCount + 1;
                    this.flushCount = i;
                    switch (i) {
                        case 1:
                            VertxConnectionTest.this.assertEquals(List.of("msg-1"), arrayList);
                            countDownLatch.countDown();
                            VertxConnectionTest.this.awaitLatch(countDownLatch2);
                            return;
                        case 2:
                            VertxConnectionTest.this.assertEquals(List.of("msg-1", "msg-2"), arrayList);
                            VertxConnectionTest.this.testComplete();
                            return;
                        default:
                            return;
                    }
                }
            });
            executeAsyncTask(() -> {
                netSocketInternal.writeMessage("msg-1");
                try {
                    awaitLatch(countDownLatch);
                } catch (InterruptedException e) {
                    fail(e);
                }
                netSocketInternal.writeMessage("msg-2");
                countDownLatch2.countDown();
            });
        };
        awaitFuture(this.client.connect(1234, "localhost"));
        await();
    }

    @Test
    public void testQueueMessageFromInnerWrite() throws Exception {
        this.connectHandler = netSocketInternal -> {
            ChannelPipeline pipeline = netSocketInternal.channelHandlerContext().pipeline();
            final ArrayList arrayList = new ArrayList();
            pipeline.addBefore("handler", "myhandler", new ChannelDuplexHandler() { // from class: io.vertx.tests.net.VertxConnectionTest.2
                public void write(ChannelHandlerContext channelHandlerContext, Object obj, ChannelPromise channelPromise) throws Exception {
                    if (!(obj instanceof String)) {
                        super.write(channelHandlerContext, obj, channelPromise);
                        return;
                    }
                    String str = (String) obj;
                    arrayList.add(str);
                    if ("msg1".equals(str)) {
                        netSocketInternal.writeMessage("msg3");
                    }
                    if (arrayList.size() == 3) {
                        Vertx vertx = VertxConnectionTest.this.vertx;
                        List list = arrayList;
                        vertx.runOnContext(r8 -> {
                            VertxConnectionTest.this.assertEquals(Arrays.asList("msg1", "msg2", "msg3"), list);
                            VertxConnectionTest.this.testComplete();
                        });
                    }
                }
            });
            executeAsyncTaskAndAwait(() -> {
                netSocketInternal.writeMessage("msg1");
                netSocketInternal.writeMessage("msg2");
            });
        };
        awaitFuture(this.client.connect(1234, "localhost"));
        await();
    }

    @Test
    public void testQueueFlushFromEventLoop() throws Exception {
        this.connectHandler = netSocketInternal -> {
            ChannelPipeline pipeline = netSocketInternal.channelHandlerContext().pipeline();
            final ArrayList arrayList = new ArrayList();
            final Runnable runnable = () -> {
                this.vertx.runOnContext(r8 -> {
                    assertEquals(Arrays.asList("msg1", "msg2", "flush"), arrayList);
                    testComplete();
                });
            };
            pipeline.addBefore("handler", "myhandler", new ChannelDuplexHandler() { // from class: io.vertx.tests.net.VertxConnectionTest.3
                int flushes;

                public void write(ChannelHandlerContext channelHandlerContext, Object obj, ChannelPromise channelPromise) throws Exception {
                    if (!(obj instanceof String)) {
                        super.write(channelHandlerContext, obj, channelPromise);
                        return;
                    }
                    String str = (String) obj;
                    arrayList.add(str);
                    if ("msg1".equals(str)) {
                        netSocketInternal.flush();
                    }
                }

                public void flush(ChannelHandlerContext channelHandlerContext) throws Exception {
                    int i = this.flushes;
                    this.flushes = i + 1;
                    if (i < 1) {
                        arrayList.add("flush");
                        if (this.flushes == 1) {
                            runnable.run();
                        }
                    }
                    super.flush(channelHandlerContext);
                }
            });
            CountDownLatch countDownLatch = new CountDownLatch(1);
            executeAsyncTaskAndAwait(() -> {
                netSocketInternal.writeMessage("msg1");
                netSocketInternal.writeMessage("msg2");
                countDownLatch.countDown();
            });
            try {
                countDownLatch.await(20L, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        };
        awaitFuture(this.client.connect(1234, "localhost"));
        await();
    }

    @Test
    public void testOverflowDrain() throws Exception {
        BufferInternal buffer = BufferInternal.buffer(TestUtils.randomAlphaString(16384));
        CompletableFuture completableFuture = new CompletableFuture();
        this.connectHandler = netSocketInternal -> {
            ContextInternal orCreateContext = this.vertx.getOrCreateContext();
            VertxConnection vertxConnection = (VertxConnection) netSocketInternal;
            int i = 0;
            while (vertxConnection.writeToChannel(buffer.getByteBuf())) {
                i++;
            }
            Future.future(promise -> {
                vertxConnection.writeToChannel(Unpooled.EMPTY_BUFFER, promise);
            }).onComplete(onSuccess(r5 -> {
                orCreateContext.emit(r3 -> {
                    testComplete();
                });
            }));
            completableFuture.complete(null);
        };
        this.client.connect(1234, "localhost").onComplete(onSuccess(netSocket -> {
            netSocket.pause();
            completableFuture.whenComplete((r3, th) -> {
                netSocket.resume();
            });
        }));
        await();
    }

    private void fill(NetSocketInternal netSocketInternal, BufferInternal bufferInternal, Handler<Void> handler) {
        Runnable runnable = () -> {
            do {
            } while (((VertxConnection) netSocketInternal).writeToChannel(bufferInternal.getByteBuf()));
        };
        long timer = this.vertx.setTimer(1000L, l -> {
            runnable.run();
            handler.handle((Object) null);
        });
        runnable.run();
        netSocketInternal.drainHandler(r11 -> {
            if (this.vertx.cancelTimer(timer)) {
                fill(netSocketInternal, bufferInternal, handler);
            } else {
                fail();
            }
        });
    }

    @Test
    public void testFailedQueueMessages() throws Exception {
        Assume.assumeFalse(TRANSPORT == Transport.IO_URING);
        BufferInternal buffer = BufferInternal.buffer(TestUtils.randomAlphaString(16384));
        CompletableFuture completableFuture = new CompletableFuture();
        this.connectHandler = netSocketInternal -> {
            fill(netSocketInternal, buffer, r8 -> {
                netSocketInternal.write(buffer).onComplete(onFailure(th -> {
                    testComplete();
                }));
                completableFuture.complete(null);
            });
        };
        NetSocket netSocket = (NetSocket) awaitFuture(this.client.connect(1234, "localhost"));
        netSocket.pause();
        completableFuture.whenComplete((r3, th) -> {
            netSocket.close();
        });
        await();
    }

    @Test
    public void testDrainReentrancy() throws Exception {
        this.connectHandler = netSocketInternal -> {
            netSocketInternal.channelHandlerContext().pipeline().addBefore("handler", "myhandler", new ChannelDuplexHandler() { // from class: io.vertx.tests.net.VertxConnectionTest.4
                int reentrant;

                public void write(ChannelHandlerContext channelHandlerContext, Object obj, ChannelPromise channelPromise) {
                    VertxConnectionTest vertxConnectionTest = VertxConnectionTest.this;
                    int i = this.reentrant;
                    this.reentrant = i + 1;
                    vertxConnectionTest.assertEquals(0L, i);
                    try {
                        String obj2 = obj.toString();
                        boolean z = -1;
                        switch (obj2.hashCode()) {
                            case 3360976:
                                if (obj2.equals("msg1")) {
                                    z = false;
                                    break;
                                }
                                break;
                            case 3360977:
                                if (obj2.equals("msg2")) {
                                    z = true;
                                    break;
                                }
                                break;
                        }
                        switch (z) {
                            case false:
                                VertxConnectionTest.this.assertTrue(channelHandlerContext.channel().isWritable());
                                channelHandlerContext.write(BufferInternal.buffer(TestUtils.randomAlphaString((int) channelHandlerContext.channel().bytesBeforeUnwritable())).getByteBuf());
                                VertxConnectionTest.this.assertFalse(channelHandlerContext.channel().isWritable());
                                channelHandlerContext.flush();
                                VertxConnectionTest.this.assertTrue(channelHandlerContext.channel().isWritable());
                                break;
                            case true:
                                VertxConnectionTest.this.testComplete();
                                break;
                        }
                    } finally {
                        this.reentrant--;
                    }
                }
            });
            VertxConnection vertxConnection = (VertxConnection) netSocketInternal;
            CountDownLatch countDownLatch = new CountDownLatch(1);
            executeAsyncTask(() -> {
                vertxConnection.writeToChannel("msg1");
                vertxConnection.writeToChannel("msg2");
                countDownLatch.countDown();
            });
            try {
                awaitLatch(countDownLatch);
            } catch (InterruptedException e) {
                fail(e);
            }
        };
        await();
    }

    @Test
    public void testConsolidateFlushInDrain() throws Exception {
        this.connectHandler = netSocketInternal -> {
            netSocketInternal.channelHandlerContext().pipeline().addBefore("handler", "myhandler", new ChannelDuplexHandler() { // from class: io.vertx.tests.net.VertxConnectionTest.5
                int flushes;
                int writes;

                public void write(ChannelHandlerContext channelHandlerContext, Object obj, ChannelPromise channelPromise) {
                    this.writes++;
                }

                public void flush(ChannelHandlerContext channelHandlerContext) throws Exception {
                    this.flushes++;
                    if (this.writes == 2) {
                        VertxConnectionTest.this.assertEquals(1L, this.flushes);
                        VertxConnectionTest.this.testComplete();
                    }
                }
            });
            CountDownLatch countDownLatch = new CountDownLatch(1);
            executeAsyncTaskAndAwait(() -> {
                netSocketInternal.writeMessage("msg1");
                netSocketInternal.writeMessage("msg2");
                countDownLatch.countDown();
            });
            try {
                countDownLatch.await(20L, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        };
        awaitFuture(this.client.connect(1234, "localhost"));
        await();
    }

    @Test
    public void testConsolidateFlushInDrainWhenResume() throws Exception {
        this.connectHandler = netSocketInternal -> {
            ChannelPipeline pipeline = netSocketInternal.channelHandlerContext().pipeline();
            pipeline.addBefore("handler", "myhandler", new ChannelDuplexHandler() { // from class: io.vertx.tests.net.VertxConnectionTest.6
                public void write(ChannelHandlerContext channelHandlerContext, Object obj, ChannelPromise channelPromise) {
                    String byteBuf = ((ByteBuf) obj).toString(StandardCharsets.UTF_8);
                    boolean z = -1;
                    switch (byteBuf.hashCode()) {
                        case -984091500:
                            if (byteBuf.equals("outbound-1")) {
                                z = false;
                                break;
                            }
                            break;
                    }
                    switch (z) {
                        case false:
                            netSocketInternal.resume();
                            break;
                    }
                    channelHandlerContext.write(obj);
                }

                public void flush(ChannelHandlerContext channelHandlerContext) {
                    ChannelFuture write = channelHandlerContext.write(Unpooled.copiedBuffer("flush", StandardCharsets.UTF_8));
                    NetSocketInternal netSocketInternal = netSocketInternal;
                    write.addListener(channelFuture -> {
                        netSocketInternal.channelHandlerContext().close();
                    });
                    channelHandlerContext.flush();
                }
            });
            netSocketInternal.messageHandler(obj -> {
                String byteBuf = ((ByteBuf) obj).toString(StandardCharsets.UTF_8);
                boolean z = -1;
                switch (byteBuf.hashCode()) {
                    case 1996728029:
                        if (byteBuf.equals("inbound-1")) {
                            z = false;
                            break;
                        }
                        break;
                    case 1996728030:
                        if (byteBuf.equals("inbound-2")) {
                            z = true;
                            break;
                        }
                        break;
                }
                switch (z) {
                    case false:
                        netSocketInternal.pause();
                        this.vertx.runOnContext(r6 -> {
                            pipeline.fireChannelRead(Unpooled.copiedBuffer("inbound-2", StandardCharsets.UTF_8));
                            pipeline.fireChannelReadComplete();
                            new Thread(() -> {
                                netSocketInternal.writeMessage(Unpooled.copiedBuffer("outbound-1", StandardCharsets.UTF_8));
                            }).start();
                        });
                        return;
                    case true:
                        netSocketInternal.writeMessage(Unpooled.copiedBuffer("outbound-2", StandardCharsets.UTF_8));
                        return;
                    default:
                        return;
                }
            });
        };
        NetSocket netSocket = (NetSocket) awaitFuture(this.client.connect(1234, "localhost"));
        Buffer buffer = Buffer.buffer();
        Objects.requireNonNull(buffer);
        netSocket.handler(buffer::appendBuffer);
        netSocket.closeHandler(r6 -> {
            assertEquals("outbound-1outbound-2flush", buffer.toString());
            testComplete();
        });
        netSocket.write("inbound-1").await();
        await();
    }

    @Test
    public void testReentrantRead() throws Exception {
        this.connectHandler = netSocketInternal -> {
            ChannelPipeline pipeline = netSocketInternal.channelHandlerContext().pipeline();
            AtomicInteger atomicInteger = new AtomicInteger();
            netSocketInternal.messageHandler(obj -> {
                assertEquals(0L, atomicInteger.getAndIncrement());
                String byteBuf = ((ByteBuf) obj).toString(StandardCharsets.UTF_8);
                boolean z = -1;
                switch (byteBuf.hashCode()) {
                    case 1996728029:
                        if (byteBuf.equals("inbound-1")) {
                            z = false;
                            break;
                        }
                        break;
                    case 1996728030:
                        if (byteBuf.equals("inbound-2")) {
                            z = true;
                            break;
                        }
                        break;
                }
                switch (z) {
                    case false:
                        pipeline.fireChannelRead(Unpooled.copiedBuffer("inbound-2", StandardCharsets.UTF_8));
                        break;
                    case true:
                        netSocketInternal.end(Buffer.buffer("outbound-1"));
                        break;
                }
                atomicInteger.decrementAndGet();
            });
        };
        NetSocket netSocket = (NetSocket) awaitFuture(this.client.connect(1234, "localhost"));
        Buffer buffer = Buffer.buffer();
        Objects.requireNonNull(buffer);
        netSocket.handler(buffer::appendBuffer);
        netSocket.closeHandler(r6 -> {
            assertEquals("outbound-1", buffer.toString());
            testComplete();
        });
        netSocket.write("inbound-1").await();
        await();
    }

    @Test
    public void testResumeWhenRead() throws Exception {
        this.connectHandler = netSocketInternal -> {
            ChannelPipeline pipeline = netSocketInternal.channelHandlerContext().pipeline();
            netSocketInternal.messageHandler(obj -> {
                String byteBuf = ((ByteBuf) obj).toString(StandardCharsets.UTF_8);
                boolean z = -1;
                switch (byteBuf.hashCode()) {
                    case 1996728029:
                        if (byteBuf.equals("inbound-1")) {
                            z = false;
                            break;
                        }
                        break;
                    case 1996728030:
                        if (byteBuf.equals("inbound-2")) {
                            z = true;
                            break;
                        }
                        break;
                }
                switch (z) {
                    case false:
                        netSocketInternal.pause();
                        pipeline.fireChannelRead(Unpooled.copiedBuffer("inbound-2", StandardCharsets.UTF_8));
                        netSocketInternal.resume();
                        return;
                    case true:
                        netSocketInternal.end(Buffer.buffer("outbound-1"));
                        return;
                    default:
                        return;
                }
            });
        };
        NetSocket netSocket = (NetSocket) awaitFuture(this.client.connect(1234, "localhost"));
        Buffer buffer = Buffer.buffer();
        Objects.requireNonNull(buffer);
        netSocket.handler(buffer::appendBuffer);
        netSocket.closeHandler(r6 -> {
            assertEquals("outbound-1", buffer.toString());
            testComplete();
        });
        netSocket.write("inbound-1").await();
        await();
    }

    @Test
    public void testWriteQueueDrain() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        BufferInternal buffer = BufferInternal.buffer(TestUtils.randomAlphaString(1024));
        this.connectHandler = netSocketInternal -> {
            netSocketInternal.handler(buffer2 -> {
                fill(netSocketInternal, buffer, r6 -> {
                    countDownLatch.countDown();
                    netSocketInternal.drainHandler(r3 -> {
                        testComplete();
                    });
                });
            });
        };
        NetSocket netSocket = (NetSocket) awaitFuture(this.client.connect(1234, "localhost"));
        netSocket.pause();
        netSocket.write("ping");
        awaitLatch(countDownLatch);
        netSocket.resume();
        await();
    }

    @Test
    public void testChannelPromisePiggiesBackOnEventLoop() throws Exception {
        waitFor(2);
        disableThreadChecks();
        this.connectHandler = netSocketInternal -> {
            Promise promise = Promise.promise();
            ChannelPromise write = ((VertxConnection) netSocketInternal).write(Unpooled.copiedBuffer("outbound-1", StandardCharsets.UTF_8), false, promise);
            Future future = promise.future();
            future.onComplete(onSuccess(r9 -> {
                new Thread(() -> {
                    Thread currentThread = Thread.currentThread();
                    future.onComplete(asyncResult -> {
                        assertSame(currentThread, Thread.currentThread());
                        complete();
                    });
                    write.addListener(channelFuture -> {
                        assertTrue(write.channel().eventLoop().inEventLoop());
                        complete();
                    });
                }).start();
            }));
        };
        awaitFuture(this.client.connect(1234, "localhost"));
        await();
    }

    private CountDownLatch executeAsyncTask(Runnable runnable) {
        assertTrue(Context.isOnEventLoopThread());
        CountDownLatch countDownLatch = new CountDownLatch(1);
        new Thread(() -> {
            runnable.run();
            countDownLatch.countDown();
        }).start();
        return countDownLatch;
    }

    private void executeAsyncTaskAndAwait(Runnable runnable) {
        try {
            executeAsyncTask(runnable).await(20L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    @Test
    public void testClose() {
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        AtomicBoolean atomicBoolean2 = new AtomicBoolean();
        channel((contextInternal, channelHandlerContext) -> {
            return new VertxConnection(contextInternal, channelHandlerContext) { // from class: io.vertx.tests.net.VertxConnectionTest.7
                protected void handleEvent(Object obj) {
                    if ("test".equals(obj)) {
                        close();
                    }
                }

                protected void handleShutdown(Object obj, long j, TimeUnit timeUnit, ChannelPromise channelPromise) {
                    atomicBoolean.set(true);
                }

                protected void handleClose(Object obj, ChannelPromise channelPromise) {
                    atomicBoolean2.set(true);
                    channelPromise.setSuccess();
                }
            };
        }).pipeline().fireUserEventTriggered("test");
        assertFalse(atomicBoolean.get());
        assertTrue(atomicBoolean2.get());
    }

    @Test
    public void testShutdownZeroDoesClose() {
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        AtomicBoolean atomicBoolean2 = new AtomicBoolean();
        channel((contextInternal, channelHandlerContext) -> {
            return new VertxConnection(contextInternal, channelHandlerContext) { // from class: io.vertx.tests.net.VertxConnectionTest.8
                protected void handleEvent(Object obj) {
                    if ("test".equals(obj)) {
                        shutdown(0L, TimeUnit.SECONDS);
                    }
                }

                protected void handleShutdown(Object obj, long j, TimeUnit timeUnit, ChannelPromise channelPromise) {
                    atomicBoolean.set(true);
                }

                protected void handleClose(Object obj, ChannelPromise channelPromise) {
                    atomicBoolean2.set(true);
                }
            };
        }).pipeline().fireUserEventTriggered("test");
        assertFalse(atomicBoolean.get());
        assertTrue(atomicBoolean2.get());
    }

    @Test
    @Ignore
    public void testShutdownReentrantClose() {
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        AtomicBoolean atomicBoolean2 = new AtomicBoolean();
        channel((contextInternal, channelHandlerContext) -> {
            return new VertxConnection(contextInternal, channelHandlerContext) { // from class: io.vertx.tests.net.VertxConnectionTest.9
                protected void handleEvent(Object obj) {
                    if ("test".equals(obj)) {
                        shutdown(10L, TimeUnit.SECONDS);
                    }
                }

                protected void handleShutdown(Object obj, long j, TimeUnit timeUnit, ChannelPromise channelPromise) {
                    atomicBoolean.set(true);
                    close(obj);
                    VertxConnectionTest.this.assertTrue(atomicBoolean2.get());
                }

                protected void handleClose(Object obj, ChannelPromise channelPromise) {
                    atomicBoolean2.set(true);
                }
            };
        }).pipeline().fireUserEventTriggered("test");
        assertTrue(atomicBoolean.get());
        assertTrue(atomicBoolean2.get());
    }

    @Test
    public void testShutdownTimeout() {
        AtomicInteger atomicInteger = new AtomicInteger();
        AtomicInteger atomicInteger2 = new AtomicInteger();
        channel((contextInternal, channelHandlerContext) -> {
            return new VertxConnection(contextInternal, channelHandlerContext) { // from class: io.vertx.tests.net.VertxConnectionTest.10
                protected void handleEvent(Object obj) {
                    if ("test".equals(obj)) {
                        shutdown(100L, TimeUnit.MILLISECONDS);
                    }
                }

                protected void handleShutdown(Object obj, long j, TimeUnit timeUnit, ChannelPromise channelPromise) {
                    atomicInteger.getAndIncrement();
                    VertxConnectionTest.this.assertEquals(0L, atomicInteger2.get());
                    EmbeddedChannel channel = this.chctx.channel();
                    channel.advanceTimeBy(100L, TimeUnit.MILLISECONDS);
                    channel.runPendingTasks();
                    VertxConnectionTest.this.assertEquals(1L, atomicInteger2.get());
                }

                protected void handleClose(Object obj, ChannelPromise channelPromise) {
                    atomicInteger2.getAndIncrement();
                }
            };
        }).pipeline().fireUserEventTriggered("test");
        assertEquals(1L, atomicInteger.get());
        assertEquals(1L, atomicInteger2.get());
    }

    private <C extends VertxConnection> EmbeddedChannel channel(BiFunction<ContextInternal, ChannelHandlerContext, C> biFunction) {
        return new EmbeddedChannel(new ChannelHandler[]{VertxHandler.create(channelHandlerContext -> {
            return (VertxConnection) biFunction.apply(this.vertx.contextBuilder().withEventLoop(channelHandlerContext.channel().eventLoop()).build(), channelHandlerContext);
        })});
    }

    @Test
    public void testDisableAutoReadWhenPaused() {
        ArrayList arrayList = new ArrayList();
        MessageFactory messageFactory = new MessageFactory();
        EmbeddedChannel embeddedChannel = new EmbeddedChannel();
        ChannelPipeline pipeline = embeddedChannel.pipeline();
        pipeline.addLast(new ChannelHandler[]{VertxHandler.create(channelHandlerContext -> {
            return new TestConnection(channelHandlerContext);
        })});
        TestConnection testConnection = (TestConnection) pipeline.get(VertxHandler.class).getConnection();
        Objects.requireNonNull(arrayList);
        testConnection.handler = (v1) -> {
            r1.add(v1);
        };
        testConnection.pause();
        embeddedChannel.writeInbound(messageFactory.next(8));
        assertEquals(Collections.emptyList(), arrayList);
        assertFalse(embeddedChannel.config().isAutoRead());
    }

    @Test
    public void testConsolidatesFlushesWhenResuming() {
        MessageFactory messageFactory = new MessageFactory();
        EmbeddedChannel embeddedChannel = new EmbeddedChannel() { // from class: io.vertx.tests.net.VertxConnectionTest.11
        };
        ChannelPipeline pipeline = embeddedChannel.pipeline();
        pipeline.addLast(new ChannelHandler[]{VertxHandler.create(channelHandlerContext -> {
            return new TestConnection(channelHandlerContext);
        })});
        TestConnection testConnection = (TestConnection) pipeline.get(VertxHandler.class).getConnection();
        testConnection.pause();
        embeddedChannel.writeInbound(messageFactory.next(8));
        assertFalse(embeddedChannel.config().isAutoRead());
        testConnection.resume();
        assertTrue(embeddedChannel.hasPendingTasks());
        testConnection.handler = message -> {
            testConnection.writeToChannel(message);
        };
        testConnection.readCompleteHandler = r4 -> {
            testConnection.writeToChannel("read-complete");
        };
        embeddedChannel.runPendingTasks();
        ArrayList arrayList = new ArrayList();
        while (true) {
            Object readOutbound = embeddedChannel.readOutbound();
            if (readOutbound == null) {
                assertEquals(9L, arrayList.size());
                assertTrue(embeddedChannel.config().isAutoRead());
                assertEquals("read-complete", arrayList.get(8));
                return;
            }
            arrayList.add(readOutbound);
        }
    }

    @Test
    public void testPauseWhenResuming() {
        MessageFactory messageFactory = new MessageFactory();
        EmbeddedChannel embeddedChannel = new EmbeddedChannel() { // from class: io.vertx.tests.net.VertxConnectionTest.12
        };
        ChannelPipeline pipeline = embeddedChannel.pipeline();
        pipeline.addLast(new ChannelHandler[]{VertxHandler.create(channelHandlerContext -> {
            return new TestConnection(channelHandlerContext);
        })});
        TestConnection testConnection = (TestConnection) pipeline.get(VertxHandler.class).getConnection();
        testConnection.pause();
        embeddedChannel.writeInbound(messageFactory.next(4));
        testConnection.resume();
        assertTrue(embeddedChannel.hasPendingTasks());
        AtomicInteger atomicInteger = new AtomicInteger();
        testConnection.handler = message -> {
            if (atomicInteger.incrementAndGet() == 2) {
                testConnection.pause();
            }
        };
        embeddedChannel.runPendingTasks();
        assertEquals(2L, atomicInteger.get());
    }

    @Test
    public void testResumeWhenReadInProgress() {
        MessageFactory messageFactory = new MessageFactory();
        EmbeddedChannel embeddedChannel = new EmbeddedChannel();
        ChannelPipeline pipeline = embeddedChannel.pipeline();
        pipeline.addLast(new ChannelHandler[]{VertxHandler.create(channelHandlerContext -> {
            return new TestConnection(channelHandlerContext);
        })});
        TestConnection testConnection = (TestConnection) pipeline.get(VertxHandler.class).getConnection();
        AtomicInteger atomicInteger = new AtomicInteger();
        testConnection.handler = message -> {
            atomicInteger.incrementAndGet();
        };
        testConnection.pause();
        pipeline.fireChannelRead(messageFactory.next());
        assertEquals(0L, atomicInteger.get());
        Object obj = new Object();
        testConnection.write(obj, false);
        testConnection.resume();
        assertEquals(0L, atomicInteger.get());
        assertTrue(embeddedChannel.hasPendingTasks());
        embeddedChannel.runPendingTasks();
        assertEquals(0L, atomicInteger.get());
        assertNull(embeddedChannel.readOutbound());
        pipeline.fireChannelReadComplete();
        assertEquals(1L, atomicInteger.get());
        assertSame(obj, embeddedChannel.readOutbound());
    }
}
