package io.vertx.amqp.tests;

import io.vertx.amqp.AmqpClient;
import io.vertx.amqp.AmqpClientOptions;
import io.vertx.amqp.AmqpConnection;
import io.vertx.amqp.AmqpReceiver;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.proton.ProtonConnection;
import io.vertx.proton.ProtonHelper;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.message.Message;
import org.junit.Test;

/* loaded from: input_file:io/vertx/amqp/tests/CloseTest.class */
public class CloseTest extends BareTestBase {
    @Test(timeout = 20000)
    public void testConsumerCloseCompletionNotification(TestContext testContext) throws Exception {
        String methodName = this.name.getMethodName();
        String str = "myMessageContent-" + methodName;
        Async async = testContext.async();
        Async async2 = testContext.async();
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        MockServer mockServer = new MockServer(this.vertx, protonConnection -> {
            handleReceiverOpenSendMessageThenClose(protonConnection, methodName, str, testContext);
        });
        AmqpClient create = AmqpClient.create(this.vertx, new AmqpClientOptions().setHost("localhost").setPort(mockServer.actualPort()));
        create.connect().onComplete(asyncResult -> {
            testContext.assertTrue(asyncResult.succeeded());
            ((AmqpConnection) asyncResult.result()).createReceiver(methodName).onComplete(asyncResult -> {
                testContext.assertTrue(asyncResult.succeeded());
                AmqpReceiver amqpReceiver = (AmqpReceiver) asyncResult.result();
                amqpReceiver.exceptionHandler(th -> {
                    atomicBoolean.set(true);
                });
                amqpReceiver.handler(amqpMessage -> {
                    String bodyAsString = amqpMessage.bodyAsString();
                    testContext.assertNotNull(bodyAsString, "amqp message body content was null");
                    testContext.assertEquals(str, bodyAsString, "amqp message body not as expected");
                    amqpReceiver.close().onComplete(testContext.asyncAssertSuccess(r8 -> {
                        async.complete();
                        create.close().onComplete(testContext.asyncAssertSuccess(r3 -> {
                            async2.complete();
                        }));
                    }));
                });
            });
        });
        try {
            testContext.assertFalse(atomicBoolean.get(), "exception handler unexpectedly called");
            async.awaitSuccess();
            async2.awaitSuccess();
            mockServer.close();
        } catch (Throwable th) {
            mockServer.close();
            throw th;
        }
    }

    private void handleReceiverOpenSendMessageThenClose(ProtonConnection protonConnection, String str, String str2, TestContext testContext) {
        protonConnection.openHandler(asyncResult -> {
            protonConnection.closeHandler(asyncResult -> {
                protonConnection.close();
            });
            protonConnection.open();
        });
        protonConnection.sessionOpenHandler((v0) -> {
            v0.open();
        });
        protonConnection.senderOpenHandler(protonSender -> {
            Source remoteSource = protonSender.getRemoteSource();
            testContext.assertNotNull(remoteSource, "source should not be null");
            testContext.assertEquals(str, remoteSource.getAddress(), "expected given address");
            protonSender.setSource(remoteSource.copy());
            Message message = Proton.message();
            message.setBody(new AmqpValue(str2));
            protonSender.send(message);
            protonSender.closeHandler(asyncResult2 -> {
                protonSender.close();
            });
            protonSender.open();
        });
    }

    @Test(timeout = 20000)
    public void testConsumerClosedRemotelyCallsExceptionHandler(TestContext testContext) throws Exception {
        doConsumerClosedRemotelyCallsExceptionHandlerTestImpl(testContext, false);
    }

    @Test(timeout = 20000)
    public void testConsumerClosedRemotelyWithErrorCallsExceptionHandler(TestContext testContext) throws Exception {
        doConsumerClosedRemotelyCallsExceptionHandlerTestImpl(testContext, true);
    }

    private void doConsumerClosedRemotelyCallsExceptionHandlerTestImpl(TestContext testContext, boolean z) throws Exception {
        String methodName = this.name.getMethodName();
        String str = "myMessageContent-" + methodName;
        Async async = testContext.async();
        Async async2 = testContext.async();
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        MockServer mockServer = new MockServer(this.vertx, protonConnection -> {
            protonConnection.openHandler(asyncResult -> {
                protonConnection.closeHandler(asyncResult -> {
                    protonConnection.close();
                });
                protonConnection.open();
            });
            protonConnection.sessionOpenHandler((v0) -> {
                v0.open();
            });
            protonConnection.senderOpenHandler(protonSender -> {
                Source remoteSource = protonSender.getRemoteSource();
                testContext.assertNotNull(remoteSource, "source should not be null");
                testContext.assertEquals(methodName, remoteSource.getAddress(), "expected given address");
                protonSender.setSource(remoteSource.copy());
                protonSender.open();
                Message message = Proton.message();
                message.setBody(new AmqpValue(str));
                this.vertx.setTimer(500L, l -> {
                    protonSender.send(message);
                    if (z) {
                        protonSender.setCondition(ProtonHelper.condition(AmqpError.INTERNAL_ERROR, "testing-error"));
                    }
                    protonSender.close();
                });
            });
        });
        this.client = AmqpClient.create(this.vertx, new AmqpClientOptions().setHost("localhost").setPort(mockServer.actualPort()));
        this.client.connect().onComplete(testContext.asyncAssertSuccess(amqpConnection -> {
            amqpConnection.createReceiver(methodName).onComplete(testContext.asyncAssertSuccess(amqpReceiver -> {
                amqpReceiver.handler(amqpMessage -> {
                    String bodyAsString = amqpMessage.bodyAsString();
                    testContext.assertNotNull(bodyAsString, "amqp message body content was null");
                    testContext.assertEquals(str, bodyAsString, "amqp message body not as expected");
                    atomicBoolean.set(true);
                });
                amqpReceiver.exceptionHandler(th -> {
                    testContext.assertNotNull(th, "expected exception");
                    testContext.assertTrue(th instanceof Exception, "expected exception");
                    if (z) {
                        testContext.assertNotNull(th.getCause(), "expected cause");
                    } else {
                        testContext.assertNull(th.getCause(), "expected no cause");
                    }
                    testContext.assertTrue(atomicBoolean.get(), "expected msg to be received first");
                    async2.complete();
                    this.client.close().onComplete(testContext.asyncAssertSuccess(r3 -> {
                        async.complete();
                    }));
                });
            }));
        }));
        try {
            async2.awaitSuccess();
            async.awaitSuccess();
            mockServer.close();
        } catch (Throwable th) {
            mockServer.close();
            throw th;
        }
    }

    @Test(timeout = 20000)
    public void testConsumerClosedRemotelyCallsEndHandler(TestContext testContext) throws Exception {
        String methodName = this.name.getMethodName();
        String str = "myMessageContent-" + methodName;
        Async async = testContext.async();
        Async async2 = testContext.async();
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        MockServer mockServer = new MockServer(this.vertx, protonConnection -> {
            protonConnection.openHandler(asyncResult -> {
                protonConnection.closeHandler(asyncResult -> {
                    protonConnection.close();
                });
                protonConnection.open();
            });
            protonConnection.sessionOpenHandler((v0) -> {
                v0.open();
            });
            protonConnection.senderOpenHandler(protonSender -> {
                Source remoteSource = protonSender.getRemoteSource();
                testContext.assertNotNull(remoteSource, "source should not be null");
                testContext.assertEquals(methodName, remoteSource.getAddress(), "expected given address");
                protonSender.setSource(remoteSource.copy());
                protonSender.open();
                this.vertx.setTimer(500L, l -> {
                    Message message = Proton.message();
                    message.setBody(new AmqpValue(str));
                    protonSender.send(message);
                    protonSender.setCondition(ProtonHelper.condition(AmqpError.INTERNAL_ERROR, "testing-error"));
                    protonSender.close();
                });
            });
        });
        this.client = AmqpClient.create(this.vertx, new AmqpClientOptions().setHost("localhost").setPort(mockServer.actualPort()));
        this.client.connect().onComplete(testContext.asyncAssertSuccess(amqpConnection -> {
            amqpConnection.createReceiver(methodName).onComplete(testContext.asyncAssertSuccess(amqpReceiver -> {
                amqpReceiver.handler(amqpMessage -> {
                    testContext.assertNotNull(amqpMessage.bodyAsString(), "message body was null");
                    String bodyAsString = amqpMessage.bodyAsString();
                    testContext.assertNotNull(bodyAsString, "amqp message body content was null");
                    testContext.assertEquals(str, bodyAsString, "amqp message body not as expected");
                    atomicBoolean.set(true);
                });
                amqpReceiver.endHandler(r9 -> {
                    testContext.assertTrue(atomicBoolean.get(), "expected msg to be received first");
                    async2.complete();
                    this.client.close().onComplete(testContext.asyncAssertSuccess(r3 -> {
                        async.complete();
                    }));
                });
            }));
        }));
        try {
            async2.awaitSuccess();
            async.awaitSuccess();
            mockServer.close();
        } catch (Throwable th) {
            mockServer.close();
            throw th;
        }
    }

    @Test(timeout = 20000)
    public void testConsumerClosedLocallyDoesNotCallEndHandler(TestContext testContext) throws Exception {
        String methodName = this.name.getMethodName();
        String str = "myMessageContent-" + methodName;
        Async async = testContext.async();
        MockServer mockServer = new MockServer(this.vertx, protonConnection -> {
            protonConnection.openHandler(asyncResult -> {
                protonConnection.closeHandler(asyncResult -> {
                    protonConnection.close();
                });
                protonConnection.open();
            });
            protonConnection.sessionOpenHandler((v0) -> {
                v0.open();
            });
            protonConnection.senderOpenHandler(protonSender -> {
                Source remoteSource = protonSender.getRemoteSource();
                testContext.assertNotNull(remoteSource, "source should not be null");
                testContext.assertEquals(methodName, remoteSource.getAddress(), "expected given address");
                protonSender.setSource(remoteSource.copy());
                this.vertx.setTimer(500L, l -> {
                    Message message = Proton.message();
                    message.setBody(new AmqpValue(str));
                    protonSender.send(message);
                    protonSender.closeHandler(asyncResult2 -> {
                        protonSender.close();
                    });
                });
                protonSender.open();
            });
        });
        this.client = AmqpClient.create(this.vertx, new AmqpClientOptions().setPort(mockServer.actualPort()).setHost("localhost"));
        this.client.connect().onComplete(testContext.asyncAssertSuccess(amqpConnection -> {
            amqpConnection.createReceiver(methodName).onComplete(testContext.asyncAssertSuccess(amqpReceiver -> {
                amqpReceiver.endHandler(r4 -> {
                    testContext.fail("should not call end handler");
                });
                amqpReceiver.handler(amqpMessage -> {
                    testContext.assertNotNull(amqpMessage.bodyAsString(), "message body was null");
                    testContext.assertEquals(str, amqpMessage.bodyAsString(), "amqp message body not as expected");
                    amqpReceiver.close().onComplete(testContext.asyncAssertSuccess(r10 -> {
                        this.vertx.setTimer(50L, l -> {
                            this.client.close().onComplete(testContext.asyncAssertSuccess(r3 -> {
                                async.complete();
                            }));
                        });
                    }));
                });
            }));
        }));
        try {
            async.awaitSuccess();
            mockServer.close();
        } catch (Throwable th) {
            mockServer.close();
            throw th;
        }
    }

    @Test(timeout = 20000)
    public void testConnectionClosedLocallyDisconnectsTransport(TestContext testContext) throws Exception {
        Async async = testContext.async();
        Async async2 = testContext.async();
        Async async3 = testContext.async();
        MockServer mockServer = new MockServer(this.vertx, protonConnection -> {
            protonConnection.openHandler(asyncResult -> {
                protonConnection.open();
                protonConnection.closeHandler(asyncResult -> {
                    protonConnection.close();
                    async2.complete();
                });
                protonConnection.disconnectHandler(protonConnection -> {
                    testContext.assertTrue(async2.isCompleted());
                    async3.complete();
                });
            });
        });
        this.client = AmqpClient.create(this.vertx, new AmqpClientOptions().setHost("localhost").setPort(mockServer.actualPort()));
        this.client.connect().onComplete(testContext.asyncAssertSuccess(amqpConnection -> {
            amqpConnection.close().onComplete(testContext.asyncAssertSuccess(r3 -> {
                async.complete();
            }));
        }));
        try {
            async2.awaitSuccess();
            async3.awaitSuccess();
            async.awaitSuccess();
            mockServer.close();
        } catch (Throwable th) {
            mockServer.close();
            throw th;
        }
    }

    @Test(timeout = 20000)
    public void testConnectionClosedLocallyDoesNotCallExceptionHandler(TestContext testContext) throws Exception {
        Async async = testContext.async();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        MockServer mockServer = new MockServer(this.vertx, protonConnection -> {
            protonConnection.openHandler(asyncResult -> {
                protonConnection.closeHandler(asyncResult -> {
                    protonConnection.close();
                    protonConnection.disconnect();
                });
                protonConnection.open();
            });
        });
        this.client = AmqpClient.create(this.vertx, new AmqpClientOptions().setHost("localhost").setPort(mockServer.actualPort()));
        this.client.connect().onComplete(testContext.asyncAssertSuccess(amqpConnection -> {
            amqpConnection.exceptionHandler(th -> {
                countDownLatch.countDown();
            });
            amqpConnection.close().onComplete(testContext.asyncAssertSuccess(r3 -> {
                async.complete();
            }));
        }));
        try {
            async.awaitSuccess();
            testContext.assertFalse(countDownLatch.await(50L, TimeUnit.MILLISECONDS), "exception handler should not have fired");
            mockServer.close();
        } catch (Throwable th) {
            mockServer.close();
            throw th;
        }
    }

    @Test(timeout = 20000)
    public void testConnectionClosedRemotelyCallsExceptionHandler(TestContext testContext) throws Exception {
        doConnectionEndHandlerCalledTestImpl(testContext, false);
    }

    @Test(timeout = 20000)
    public void testConnectionDisconnectedCallsExceptionHandler(TestContext testContext) throws Exception {
        doConnectionEndHandlerCalledTestImpl(testContext, true);
    }

    private void doConnectionEndHandlerCalledTestImpl(TestContext testContext, boolean z) throws Exception {
        Async async = testContext.async();
        Async async2 = testContext.async();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        MockServer mockServer = new MockServer(this.vertx, protonConnection -> {
            protonConnection.openHandler(asyncResult -> {
                protonConnection.open();
                this.vertx.setTimer(100L, l -> {
                    if (z) {
                        protonConnection.disconnect();
                    } else {
                        protonConnection.close();
                        protonConnection.disconnect();
                    }
                });
            });
        });
        this.client = AmqpClient.create(this.vertx, new AmqpClientOptions().setHost("localhost").setPort(mockServer.actualPort()));
        this.client.connect().onComplete(testContext.asyncAssertSuccess(amqpConnection -> {
            amqpConnection.exceptionHandler(th -> {
                if (async2.isCompleted()) {
                    countDownLatch.countDown();
                }
                async2.complete();
                this.client.close().onComplete(testContext.asyncAssertSuccess(r3 -> {
                    async.complete();
                }));
            });
        }));
        try {
            async2.awaitSuccess();
            async.awaitSuccess();
            if (!z) {
                testContext.assertFalse(countDownLatch.await(50L, TimeUnit.MILLISECONDS), "exception handler should not have fired more than once");
            }
        } finally {
            mockServer.close();
        }
    }

    @Test(timeout = 20000)
    public void testConnectionClosedRemotelySendsCloseInResponseAndDisconnects(TestContext testContext) throws Exception {
        Async async = testContext.async();
        Async async2 = testContext.async();
        Async async3 = testContext.async();
        Async async4 = testContext.async();
        MockServer mockServer = new MockServer(this.vertx, protonConnection -> {
            protonConnection.openHandler(asyncResult -> {
                protonConnection.open();
                this.vertx.setTimer(20L, l -> {
                    protonConnection.closeHandler(asyncResult -> {
                        async3.complete();
                        testContext.assertTrue(async.isCompleted());
                        testContext.assertFalse(async4.isCompleted());
                    });
                    protonConnection.disconnectHandler(protonConnection -> {
                        async4.complete();
                        testContext.assertTrue(async.isCompleted());
                        testContext.assertTrue(async3.isCompleted());
                    });
                    protonConnection.close();
                });
            });
        });
        this.client = AmqpClient.create(this.vertx, new AmqpClientOptions().setHost("localhost").setPort(mockServer.actualPort()));
        this.client.connect().onComplete(testContext.asyncAssertSuccess(amqpConnection -> {
            amqpConnection.exceptionHandler(th -> {
                async.complete();
                testContext.assertFalse(async3.isCompleted());
                testContext.assertFalse(async4.isCompleted());
                this.client.close().onComplete(testContext.asyncAssertSuccess(r3 -> {
                    async2.complete();
                }));
            });
        }));
        try {
            async.awaitSuccess();
            async2.awaitSuccess();
            async3.awaitSuccess();
            async4.awaitSuccess();
            mockServer.close();
        } catch (Throwable th) {
            mockServer.close();
            throw th;
        }
    }

    @Test(timeout = 20000)
    public void testCloseClientThatWithoutConnection(TestContext testContext) {
        Async async = testContext.async();
        AmqpClient.create(new AmqpClientOptions().setHost("unused")).close().onComplete(testContext.asyncAssertSuccess(r3 -> {
            async.complete();
        }));
        async.awaitSuccess();
    }
}
