package io.vertx.tests.streams;

import io.reactivex.Flowable;
import io.vertx.core.Handler;
import io.vertx.core.internal.logging.Logger;
import io.vertx.core.internal.logging.LoggerFactory;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import io.vertx.proton.ProtonClient;
import io.vertx.proton.ProtonConnection;
import io.vertx.proton.ProtonHelper;
import io.vertx.proton.ProtonServer;
import io.vertx.proton.streams.ProtonStreams;
import io.vertx.proton.streams.ProtonSubscriber;
import io.vertx.proton.streams.ProtonSubscriberOptions;
import io.vertx.proton.streams.Tracker;
import io.vertx.tests.FutureHandler;
import io.vertx.tests.MockServerTestBase;
import java.util.Arrays;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.Modified;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Released;
import org.apache.qpid.proton.amqp.messaging.Target;
import org.apache.qpid.proton.message.Message;
import org.junit.Test;
import org.junit.runner.RunWith;

@RunWith(VertxUnitRunner.class)
/* loaded from: input_file:io/vertx/tests/streams/ProtonSubscriberIntTest.class */
public class ProtonSubscriberIntTest extends MockServerTestBase {
    private static Logger LOG = LoggerFactory.getLogger(ProtonSubscriberIntTest.class);

    @Test(timeout = 20000)
    public void testCreateUseAndCancelSubscriber(TestContext testContext) throws Exception {
        this.server.close();
        Async async = testContext.async();
        Async async2 = testContext.async();
        Async async3 = testContext.async();
        ProtonServer protonServer = null;
        try {
            protonServer = createServer(protonConnection -> {
                protonConnection.openHandler(asyncResult -> {
                    protonConnection.open();
                });
                protonConnection.sessionOpenHandler(protonSession -> {
                    protonSession.open();
                });
                protonConnection.receiverOpenHandler(protonReceiver -> {
                    LOG.trace("Server receiver opened");
                    protonReceiver.handler((protonDelivery, message) -> {
                        if (LOG.isTraceEnabled()) {
                            LOG.trace("Server got msg: " + getMessageBody(testContext, message));
                        }
                        validateMessage(testContext, 1, "1", message);
                        async2.complete();
                    });
                    protonReceiver.closeHandler(asyncResult2 -> {
                        protonReceiver.close();
                        async3.complete();
                    });
                    protonReceiver.setTarget(protonReceiver.getRemoteTarget().copy());
                    protonReceiver.open();
                    async.complete();
                });
            });
            ProtonClient.create(this.vertx).connect("localhost", protonServer.actualPort(), asyncResult -> {
                testContext.assertTrue(asyncResult.succeeded());
                ProtonConnection protonConnection2 = (ProtonConnection) asyncResult.result();
                protonConnection2.open();
                Flowable.just(Tracker.create(ProtonHelper.message("1"), tracker -> {
                    testContext.assertTrue(tracker.isAccepted(), "msg should be accepted");
                    testContext.assertTrue(tracker.isRemotelySettled(), "msg should be remotely settled");
                })).subscribe(ProtonStreams.createTrackerProducer(protonConnection2, "myAddress"));
            });
            async.awaitSuccess();
            async2.awaitSuccess();
            async3.awaitSuccess();
            if (protonServer != null) {
                protonServer.close();
            }
        } catch (Throwable th) {
            if (protonServer != null) {
                protonServer.close();
            }
            throw th;
        }
    }

    @Test(timeout = 20000)
    public void testSubCancelledOnLinkClose(TestContext testContext) throws Exception {
        this.server.close();
        Async async = testContext.async();
        Async async2 = testContext.async();
        ProtonServer protonServer = null;
        try {
            protonServer = createServer(protonConnection -> {
                protonConnection.openHandler(asyncResult -> {
                    protonConnection.open();
                });
                protonConnection.sessionOpenHandler(protonSession -> {
                    protonSession.open();
                });
                protonConnection.receiverOpenHandler(protonReceiver -> {
                    LOG.trace("Server receiver opened");
                    protonReceiver.setTarget(protonReceiver.getRemoteTarget().copy());
                    protonReceiver.open();
                    async.complete();
                    protonReceiver.close();
                });
            });
            ProtonClient.create(this.vertx).connect("localhost", protonServer.actualPort(), asyncResult -> {
                testContext.assertTrue(asyncResult.succeeded());
                ProtonConnection protonConnection2 = (ProtonConnection) asyncResult.result();
                protonConnection2.open();
                Flowable.never().doOnCancel(() -> {
                    LOG.trace("Cancelled!");
                    async2.complete();
                }).subscribe(ProtonStreams.createTrackerProducer(protonConnection2, "myAddress"));
            });
            async.awaitSuccess();
            async2.awaitSuccess();
            if (protonServer != null) {
                protonServer.close();
            }
        } catch (Throwable th) {
            if (protonServer != null) {
                protonServer.close();
            }
            throw th;
        }
    }

    @Test(timeout = 20000)
    public void testSubCancelledOnConnectionEnd(TestContext testContext) throws Exception {
        this.server.close();
        Async async = testContext.async();
        Async async2 = testContext.async();
        ProtonServer protonServer = null;
        try {
            ProtonServer createServer = createServer(protonConnection -> {
                protonConnection.openHandler(asyncResult -> {
                    protonConnection.open();
                });
                protonConnection.sessionOpenHandler(protonSession -> {
                    protonSession.open();
                });
                protonConnection.receiverOpenHandler(protonReceiver -> {
                    LOG.trace("Server receiver opened");
                    protonReceiver.setTarget(protonReceiver.getRemoteTarget().copy());
                    protonReceiver.open();
                    async.complete();
                });
            });
            ProtonClient.create(this.vertx).connect("localhost", createServer.actualPort(), asyncResult -> {
                testContext.assertTrue(asyncResult.succeeded());
                ProtonConnection protonConnection2 = (ProtonConnection) asyncResult.result();
                protonConnection2.open();
                Flowable.never().doOnCancel(() -> {
                    LOG.trace("Cancelled!");
                    async2.complete();
                }).subscribe(ProtonStreams.createTrackerProducer(protonConnection2, "myAddress"));
            });
            async.awaitSuccess();
            testContext.assertFalse(async2.isCompleted());
            createServer.close();
            protonServer = null;
            async2.awaitSuccess();
            if (0 != 0) {
                protonServer.close();
            }
        } catch (Throwable th) {
            if (protonServer != null) {
                protonServer.close();
            }
            throw th;
        }
    }

    @Test(timeout = 20000)
    public void testConfigureProducerLinkName(TestContext testContext) throws Exception {
        this.server.close();
        Async async = testContext.async();
        Async async2 = testContext.async();
        Async async3 = testContext.async();
        ProtonServer protonServer = null;
        try {
            protonServer = createServer(protonConnection -> {
                protonConnection.openHandler(asyncResult -> {
                    protonConnection.open();
                });
                protonConnection.sessionOpenHandler(protonSession -> {
                    protonSession.open();
                });
                protonConnection.receiverOpenHandler(protonReceiver -> {
                    LOG.trace("Server receiver opened");
                    testContext.assertEquals("testConfigureProducerLinkName", protonReceiver.getName(), "unexpected link name");
                    protonReceiver.handler((protonDelivery, message) -> {
                        if (LOG.isTraceEnabled()) {
                            LOG.trace("Server got msg: " + getMessageBody(testContext, message));
                        }
                        validateMessage(testContext, 1, "1", message);
                        async2.complete();
                    });
                    protonReceiver.closeHandler(asyncResult2 -> {
                        protonReceiver.close();
                        async3.complete();
                    });
                    protonReceiver.setTarget(protonReceiver.getRemoteTarget().copy());
                    protonReceiver.open();
                    async.complete();
                });
            });
            ProtonClient.create(this.vertx).connect("localhost", protonServer.actualPort(), asyncResult -> {
                testContext.assertTrue(asyncResult.succeeded());
                ProtonConnection protonConnection2 = (ProtonConnection) asyncResult.result();
                protonConnection2.open();
                Flowable.just(Tracker.create(ProtonHelper.message("1"))).subscribe(ProtonStreams.createTrackerProducer(protonConnection2, "myAddress", new ProtonSubscriberOptions().setLinkName("testConfigureProducerLinkName")));
            });
            async.awaitSuccess();
            async2.awaitSuccess();
            async3.awaitSuccess();
            if (protonServer != null) {
                protonServer.close();
            }
        } catch (Throwable th) {
            if (protonServer != null) {
                protonServer.close();
            }
            throw th;
        }
    }

    @Test(timeout = 20000)
    public void testCreateAnonymousRelaySubscriber(TestContext testContext) throws Exception {
        this.server.close();
        Async async = testContext.async();
        Async async2 = testContext.async();
        Async async3 = testContext.async();
        AtomicInteger atomicInteger = new AtomicInteger(0);
        AtomicInteger atomicInteger2 = new AtomicInteger(0);
        ProtonServer protonServer = null;
        try {
            protonServer = createServer(protonConnection -> {
                protonConnection.openHandler(asyncResult -> {
                    protonConnection.open();
                });
                protonConnection.sessionOpenHandler(protonSession -> {
                    protonSession.open();
                });
                protonConnection.receiverOpenHandler(protonReceiver -> {
                    LOG.trace("Server receiver opened");
                    testContext.assertFalse(async.isCompleted(), "should only be one link opened");
                    testContext.assertNull(protonReceiver.getRemoteTarget().getAddress(), "link target should have null address for anonymous relay");
                    protonReceiver.handler((protonDelivery, message) -> {
                        int incrementAndGet = atomicInteger.incrementAndGet();
                        if (LOG.isTraceEnabled()) {
                            LOG.trace("Server got msg: " + getMessageBody(testContext, message));
                        }
                        switch (incrementAndGet) {
                            case 1:
                                validateMessage(testContext, 1, "1", message);
                                testContext.assertEquals("testCreateAnonymousRelaySubscriber1", message.getAddress(), "Unexpected message1 'to' address");
                                return;
                            case 2:
                                validateMessage(testContext, 2, "2", message);
                                testContext.assertEquals("testCreateAnonymousRelaySubscriber2", message.getAddress(), "Unexpected message2 'to' address");
                                async2.complete();
                                return;
                            default:
                                return;
                        }
                    });
                    protonReceiver.closeHandler(asyncResult2 -> {
                        protonReceiver.close();
                        async3.complete();
                    });
                    protonReceiver.setTarget(protonReceiver.getRemoteTarget().copy());
                    protonReceiver.open();
                    async.complete();
                });
            });
            ProtonClient.create(this.vertx).connect("localhost", protonServer.actualPort(), asyncResult -> {
                testContext.assertTrue(asyncResult.succeeded());
                ProtonConnection protonConnection2 = (ProtonConnection) asyncResult.result();
                protonConnection2.open();
                Flowable.just(Tracker.create(ProtonHelper.message("testCreateAnonymousRelaySubscriber1", "1"), tracker -> {
                    testContext.assertTrue(tracker.isAccepted(), "msg1 should be accepted");
                    testContext.assertTrue(tracker.isRemotelySettled(), "msg1 should be remotely settled");
                    testContext.assertTrue(atomicInteger2.compareAndSet(0, 1), "unexpected acceptedMsgCounter:" + atomicInteger2);
                }), Tracker.create(ProtonHelper.message("testCreateAnonymousRelaySubscriber2", "2"), tracker2 -> {
                    testContext.assertTrue(tracker2.isAccepted(), "msg1 should be accepted");
                    testContext.assertTrue(tracker2.isRemotelySettled(), "msg1 should be remotely settled");
                    testContext.assertTrue(atomicInteger2.compareAndSet(1, 2), "unexpected acceptedMsgCounter:" + atomicInteger2);
                })).subscribe(ProtonStreams.createTrackerProducer(protonConnection2, (String) null));
            });
            async.awaitSuccess();
            async2.awaitSuccess();
            async3.awaitSuccess();
            if (protonServer != null) {
                protonServer.close();
            }
        } catch (Throwable th) {
            if (protonServer != null) {
                protonServer.close();
            }
            throw th;
        }
    }

    @Test(timeout = 20000)
    public void testAlternativeDispositionStates(TestContext testContext) throws Exception {
        this.server.close();
        Async async = testContext.async();
        Async async2 = testContext.async();
        Async async3 = testContext.async();
        int i = 4;
        ProtonServer protonServer = null;
        try {
            protonServer = createServer(protonConnection -> {
                protonConnection.openHandler(asyncResult -> {
                    protonConnection.open();
                });
                protonConnection.sessionOpenHandler(protonSession -> {
                    protonSession.open();
                });
                AtomicInteger atomicInteger = new AtomicInteger(0);
                protonConnection.receiverOpenHandler(protonReceiver -> {
                    LOG.trace("Server receiver opened");
                    protonReceiver.setAutoAccept(false);
                    protonReceiver.handler((protonDelivery, message) -> {
                        int incrementAndGet = atomicInteger.incrementAndGet();
                        if (LOG.isTraceEnabled()) {
                            LOG.trace("Server got msg: " + getMessageBody(testContext, message));
                        }
                        validateMessage(testContext, incrementAndGet, String.valueOf(incrementAndGet), message);
                        switch (incrementAndGet) {
                            case 1:
                                protonDelivery.disposition(Accepted.getInstance(), true);
                                break;
                            case 2:
                                protonDelivery.disposition(Released.getInstance(), true);
                                break;
                            case 3:
                                protonDelivery.disposition(new Rejected(), true);
                                break;
                            case 4:
                                protonDelivery.disposition(new Modified(), true);
                                break;
                        }
                        if (incrementAndGet == i) {
                            async2.complete();
                        }
                    });
                    protonReceiver.closeHandler(asyncResult2 -> {
                        protonReceiver.close();
                        async3.complete();
                    });
                    protonReceiver.setTarget(protonReceiver.getRemoteTarget().copy());
                    protonReceiver.open();
                    async.complete();
                });
            });
            ProtonClient.create(this.vertx).connect("localhost", protonServer.actualPort(), asyncResult -> {
                testContext.assertTrue(asyncResult.succeeded());
                ProtonConnection protonConnection2 = (ProtonConnection) asyncResult.result();
                protonConnection2.open();
                Flowable.just(Tracker.create(ProtonHelper.message("1"), tracker -> {
                    testContext.assertTrue(tracker.isAccepted(), "msg should be accepted");
                    testContext.assertTrue(tracker.isRemotelySettled(), "msg should be remotely settled");
                }), Tracker.create(ProtonHelper.message("2"), tracker2 -> {
                    testContext.assertFalse(tracker2.isAccepted(), "msg should not be accepted");
                    testContext.assertTrue(tracker2.getRemoteState() instanceof Released, "unexpected remote state: " + tracker2.getRemoteState());
                    testContext.assertTrue(tracker2.isRemotelySettled(), "msg should be remotely settled");
                }), Tracker.create(ProtonHelper.message("3"), tracker3 -> {
                    testContext.assertFalse(tracker3.isAccepted(), "msg should not be accepted");
                    testContext.assertTrue(tracker3.getRemoteState() instanceof Rejected, "unexpected remote state: " + tracker3.getRemoteState());
                    testContext.assertTrue(tracker3.isRemotelySettled(), "msg should be remotely settled");
                }), Tracker.create(ProtonHelper.message("4"), tracker4 -> {
                    testContext.assertFalse(tracker4.isAccepted(), "msg should not be accepted");
                    testContext.assertTrue(tracker4.getRemoteState() instanceof Modified, "unexpected remote state: " + tracker4.getRemoteState());
                    testContext.assertTrue(tracker4.isRemotelySettled(), "msg should be remotely settled");
                })).subscribe(ProtonStreams.createTrackerProducer(protonConnection2, "myAddress"));
            });
            async.awaitSuccess();
            async2.awaitSuccess();
            async3.awaitSuccess();
            if (protonServer != null) {
                protonServer.close();
            }
        } catch (Throwable th) {
            if (protonServer != null) {
                protonServer.close();
            }
            throw th;
        }
    }

    @Test(timeout = 20000)
    public void testCreateUsingCustomTarget(TestContext testContext) throws Exception {
        this.server.close();
        Async async = testContext.async();
        Async async2 = testContext.async();
        Async async3 = testContext.async();
        ProtonServer protonServer = null;
        try {
            protonServer = createServer(protonConnection -> {
                protonConnection.openHandler(asyncResult -> {
                    protonConnection.open();
                });
                protonConnection.sessionOpenHandler(protonSession -> {
                    protonSession.open();
                });
                protonConnection.receiverOpenHandler(protonReceiver -> {
                    protonReceiver.handler((protonDelivery, message) -> {
                        if (LOG.isTraceEnabled()) {
                            LOG.trace("Server got msg: " + getMessageBody(testContext, message));
                        }
                        validateMessage(testContext, 1, "1", message);
                        async2.complete();
                    });
                    testContext.assertNotNull(protonReceiver.getRemoteTarget(), "target should not be null");
                    Target remoteTarget = protonReceiver.getRemoteTarget();
                    testContext.assertEquals("testCreateUsingCustomTarget", remoteTarget.getAddress(), "unexpected target address");
                    Symbol[] capabilities = remoteTarget.getCapabilities();
                    testContext.assertTrue(Arrays.equals(new Symbol[]{Symbol.valueOf("custom")}, capabilities), "Unexpected capabilities: " + Arrays.toString(capabilities));
                    protonReceiver.setTarget(remoteTarget.copy());
                    protonReceiver.closeHandler(asyncResult2 -> {
                        protonReceiver.close();
                        async3.complete();
                    });
                    LOG.trace("Server receiver opened");
                    protonReceiver.open();
                    async.complete();
                });
            });
            ProtonClient.create(this.vertx).connect("localhost", protonServer.actualPort(), asyncResult -> {
                testContext.assertTrue(asyncResult.succeeded());
                ProtonConnection protonConnection2 = (ProtonConnection) asyncResult.result();
                protonConnection2.open();
                ProtonSubscriber createTrackerProducer = ProtonStreams.createTrackerProducer(protonConnection2, "testCreateUsingCustomTarget");
                createTrackerProducer.getTarget().setCapabilities(new Symbol[]{Symbol.valueOf("custom")});
                Flowable.just(Tracker.create(ProtonHelper.message("1"))).subscribe(createTrackerProducer);
            });
            async.awaitSuccess();
            async2.awaitSuccess();
            async3.awaitSuccess();
            if (protonServer != null) {
                protonServer.close();
            }
        } catch (Throwable th) {
            if (protonServer != null) {
                protonServer.close();
            }
            throw th;
        }
    }

    private ProtonServer createServer(Handler<ProtonConnection> handler) throws InterruptedException, ExecutionException {
        ProtonServer create = ProtonServer.create(this.vertx);
        create.connectHandler(handler);
        FutureHandler asyncResult = FutureHandler.asyncResult();
        create.listen(0, asyncResult);
        asyncResult.get();
        return create;
    }

    private void validateMessage(TestContext testContext, int i, Object obj, Message message) {
        Object messageBody = getMessageBody(testContext, message);
        if (LOG.isTraceEnabled()) {
            LOG.trace("Got msg " + i + ", body: " + messageBody);
        }
        testContext.assertEquals(obj, messageBody, "Unexpected message body");
    }

    private Object getMessageBody(TestContext testContext, Message message) {
        AmqpValue body = message.getBody();
        testContext.assertNotNull(body);
        testContext.assertTrue(body instanceof AmqpValue);
        return body.getValue();
    }
}
