package io.vertx.amqp.tests;

import io.vertx.amqp.AmqpClient;
import io.vertx.amqp.AmqpClientOptions;
import io.vertx.amqp.AmqpConnection;
import io.vertx.amqp.AmqpMessage;
import io.vertx.amqp.AmqpReceiver;
import io.vertx.amqp.AmqpSender;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import io.vertx.proton.ProtonSender;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.Target;
import org.apache.qpid.proton.message.Message;
import org.junit.Test;
import org.junit.runner.RunWith;

@RunWith(VertxUnitRunner.class)
/* loaded from: input_file:io/vertx/amqp/tests/RequestReplyTest.class */
public class RequestReplyTest extends BareTestBase {
    private static final String REQUEST = "what's your name?";
    private static final String RESPONSE = "my name is Neo";

    private Future<Void> prepareRequestReceiverAndAnonymousSenderResponder(TestContext testContext, AmqpConnection amqpConnection, String str, String str2) {
        Promise promise = Promise.promise();
        amqpConnection.createReceiver(str).onComplete(asyncResult -> {
            ((AmqpReceiver) asyncResult.result()).handler(amqpMessage -> {
                testContext.assertEquals(REQUEST, amqpMessage.bodyAsString());
                testContext.assertEquals(str2, amqpMessage.replyTo());
                amqpConnection.createAnonymousSender().onComplete(asyncResult -> {
                    ((AmqpSender) asyncResult.result()).send(AmqpMessage.create().address(amqpMessage.replyTo()).withBody(RESPONSE).build());
                });
            });
            promise.handle(asyncResult.mapEmpty());
        });
        return promise.future();
    }

    private Future<AmqpReceiver> prepareDynamicReplyReceiver(TestContext testContext, AmqpConnection amqpConnection, Async async) {
        Promise promise = Promise.promise();
        amqpConnection.createDynamicReceiver().onComplete(testContext.asyncAssertSuccess(amqpReceiver -> {
            testContext.assertNotNull(amqpReceiver.address());
            amqpReceiver.handler(amqpMessage -> {
                testContext.assertEquals(amqpMessage.bodyAsString(), RESPONSE);
                async.complete();
            });
            promise.complete(amqpReceiver);
        }));
        return promise.future();
    }

    private Future<Void> prepareSenderAndSendRequestMessage(TestContext testContext, AmqpConnection amqpConnection, String str, String str2) {
        Promise promise = Promise.promise();
        amqpConnection.createSender(str).onComplete(testContext.asyncAssertSuccess(amqpSender -> {
            amqpSender.sendWithAck(AmqpMessage.create().replyTo(str2).withBody(REQUEST).build()).onComplete(promise);
        }));
        return promise.future();
    }

    @Test(timeout = 10000)
    public void testRequesting(TestContext testContext) throws Exception {
        MockServer createServerForRequestorTestImpl = createServerForRequestorTestImpl(testContext, "requestQueue", String.valueOf(UUID.randomUUID()) + "dynamicAddress");
        try {
            Async async = testContext.async();
            this.client = AmqpClient.create(this.vertx, new AmqpClientOptions().setHost("localhost").setPort(createServerForRequestorTestImpl.actualPort()));
            this.client.connect().onComplete(testContext.asyncAssertSuccess(amqpConnection -> {
                prepareDynamicReplyReceiver(testContext, amqpConnection, async).compose(amqpReceiver -> {
                    return prepareSenderAndSendRequestMessage(testContext, amqpConnection, "requestQueue", amqpReceiver.address());
                });
            }));
            async.awaitSuccess();
            createServerForRequestorTestImpl.close();
        } catch (Throwable th) {
            createServerForRequestorTestImpl.close();
            throw th;
        }
    }

    private MockServer createServerForRequestorTestImpl(TestContext testContext, String str, String str2) throws Exception {
        AtomicReference atomicReference = new AtomicReference();
        return new MockServer(this.vertx, protonConnection -> {
            protonConnection.openHandler(asyncResult -> {
                protonConnection.closeHandler(asyncResult -> {
                    protonConnection.close();
                });
                protonConnection.open();
            });
            protonConnection.sessionOpenHandler(protonSession -> {
                protonSession.closeHandler(asyncResult2 -> {
                    protonSession.close();
                });
                protonSession.open();
            });
            protonConnection.senderOpenHandler(protonSender -> {
                protonSender.closeHandler(asyncResult2 -> {
                    protonSender.close();
                });
                testContext.assertNotNull(protonSender.getRemoteSource(), "source should not be null");
                Source remoteSource = protonSender.getRemoteSource();
                testContext.assertTrue(remoteSource.getDynamic(), "expected dynamic source to be requested");
                testContext.assertNull(remoteSource.getAddress(), "expected no source address to be set");
                Source copy = remoteSource.copy();
                copy.setAddress(str2);
                protonSender.setSource(copy);
                protonSender.open();
                atomicReference.set(protonSender);
            });
            protonConnection.receiverOpenHandler(protonReceiver -> {
                protonReceiver.closeHandler(asyncResult2 -> {
                    protonReceiver.close();
                });
                testContext.assertNotNull(protonReceiver.getRemoteTarget(), "target should not be null");
                Target remoteTarget = protonReceiver.getRemoteTarget();
                testContext.assertFalse(remoteTarget.getDynamic(), "should not be requested to be dynamic target");
                testContext.assertEquals(str, remoteTarget.getAddress(), "expected request queue address to be set");
                Target copy = remoteTarget.copy();
                copy.setAddress(str);
                protonReceiver.setTarget(copy);
                protonReceiver.handler((protonDelivery, message) -> {
                    AmqpValue body = message.getBody();
                    testContext.assertNotNull(body);
                    testContext.assertTrue(body instanceof AmqpValue);
                    testContext.assertEquals(REQUEST, body.getValue());
                    Message create = Message.Factory.create();
                    create.setBody(new AmqpValue(RESPONSE));
                    ((ProtonSender) atomicReference.get()).send(create);
                });
                protonReceiver.open();
            });
        });
    }

    @Test(timeout = 10000)
    public void testResponder(TestContext testContext) throws Exception {
        String str = String.valueOf(UUID.randomUUID()) + "replyToAddress";
        Async async = testContext.async();
        MockServer createServerForResponderTestImpl = createServerForResponderTestImpl(testContext, "requestQueue", str, async);
        try {
            this.client = AmqpClient.create(this.vertx, new AmqpClientOptions().setHost("localhost").setPort(createServerForResponderTestImpl.actualPort()));
            this.client.connect().onComplete(testContext.asyncAssertSuccess(amqpConnection -> {
                prepareRequestReceiverAndAnonymousSenderResponder(testContext, amqpConnection, "requestQueue", str);
            }));
            async.awaitSuccess();
            createServerForResponderTestImpl.close();
        } catch (Throwable th) {
            createServerForResponderTestImpl.close();
            throw th;
        }
    }

    private MockServer createServerForResponderTestImpl(TestContext testContext, String str, String str2, Async async) throws Exception {
        return new MockServer(this.vertx, protonConnection -> {
            protonConnection.openHandler(asyncResult -> {
                protonConnection.closeHandler(asyncResult -> {
                    protonConnection.close();
                });
                protonConnection.open();
            });
            protonConnection.sessionOpenHandler(protonSession -> {
                protonSession.closeHandler(asyncResult2 -> {
                    protonSession.close();
                });
                protonSession.open();
            });
            protonConnection.receiverOpenHandler(protonReceiver -> {
                protonReceiver.closeHandler(asyncResult2 -> {
                    protonReceiver.close();
                });
                testContext.assertNotNull(protonReceiver.getRemoteTarget(), "target should not be null");
                Target remoteTarget = protonReceiver.getRemoteTarget();
                testContext.assertFalse(remoteTarget.getDynamic(), "should not be requested to be dynamic target");
                testContext.assertNull(remoteTarget.getAddress(), "expected address to be null, to reflect using the anonymous terminus");
                protonReceiver.setTarget(remoteTarget.copy());
                protonReceiver.handler((protonDelivery, message) -> {
                    AmqpValue body = message.getBody();
                    testContext.assertNotNull(body);
                    testContext.assertTrue(body instanceof AmqpValue);
                    testContext.assertEquals(RESPONSE, body.getValue());
                    testContext.assertEquals(str2, message.getAddress());
                    async.complete();
                });
                protonReceiver.open();
            });
            protonConnection.senderOpenHandler(protonSender -> {
                protonSender.closeHandler(asyncResult2 -> {
                    protonSender.close();
                });
                testContext.assertNotNull(protonSender.getRemoteSource(), "source should not be null");
                Source remoteSource = protonSender.getRemoteSource();
                testContext.assertFalse(remoteSource.getDynamic(), "should not be requesting dynamic source");
                testContext.assertEquals(str, remoteSource.getAddress(), "expected source address to be request queue");
                Source copy = remoteSource.copy();
                copy.setAddress(str);
                protonSender.setSource(copy);
                protonSender.open();
                Message create = Message.Factory.create();
                create.setBody(new AmqpValue(REQUEST));
                create.setReplyTo(str2);
                protonSender.send(create);
            });
        });
    }
}
