package io.vertx.proton.streams;

import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import io.vertx.proton.FutureHandler;
import io.vertx.proton.MockServerTestBase;
import io.vertx.proton.ProtonClient;
import io.vertx.proton.ProtonConnection;
import io.vertx.proton.ProtonHelper;
import io.vertx.proton.ProtonServer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.Modified;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Released;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.message.Message;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

@RunWith(VertxUnitRunner.class)
/* loaded from: input_file:io/vertx/proton/streams/ProtonPublisherIntTest.class */
public class ProtonPublisherIntTest extends MockServerTestBase {
    private static Logger LOG = LoggerFactory.getLogger(ProtonPublisherIntTest.class);

    @Test(timeout = 20000)
    public void testCreateCancelSubscription(TestContext testContext) throws Exception {
        this.server.close();
        Async async = testContext.async();
        Async async2 = testContext.async();
        Async async3 = testContext.async();
        Async async4 = testContext.async();
        ProtonServer protonServer = null;
        try {
            protonServer = createServer(protonConnection -> {
                protonConnection.openHandler(asyncResult -> {
                    protonConnection.open();
                });
                protonConnection.sessionOpenHandler(protonSession -> {
                    protonSession.open();
                });
                protonConnection.senderOpenHandler(protonSender -> {
                    protonSender.closeHandler(asyncResult2 -> {
                        async3.complete();
                        protonSender.close();
                    });
                    protonSender.send(ProtonHelper.message(String.valueOf(1)));
                    LOG.trace("Server sender opened");
                    protonSender.open();
                    async2.complete();
                });
            });
            ProtonClient.create(this.vertx).connect("localhost", protonServer.actualPort(), asyncResult -> {
                testContext.assertTrue(asyncResult.succeeded());
                ProtonConnection protonConnection2 = (ProtonConnection) asyncResult.result();
                protonConnection2.open();
                ProtonStreams.createDeliveryConsumer(protonConnection2, "myAddress").subscribe(new Subscriber<Delivery>() { // from class: io.vertx.proton.streams.ProtonPublisherIntTest.1
                    Subscription sub = null;

                    public void onSubscribe(Subscription subscription) {
                        async.complete();
                        this.sub = subscription;
                        subscription.request(5L);
                    }

                    public void onNext(Delivery delivery) {
                        ProtonPublisherIntTest.this.validateMessage(testContext, 1, String.valueOf(1), delivery.message());
                        this.sub.cancel();
                    }

                    public void onError(Throwable th) {
                        if (async4.isCompleted()) {
                            return;
                        }
                        testContext.fail("onError called");
                    }

                    public void onComplete() {
                        async4.complete();
                    }
                });
            });
            async2.awaitSuccess();
            async.awaitSuccess();
            async3.awaitSuccess();
            async4.awaitSuccess();
            if (protonServer != null) {
                protonServer.close();
            }
        } catch (Throwable th) {
            if (protonServer != null) {
                protonServer.close();
            }
            throw th;
        }
    }

    @Test(timeout = 20000)
    public void testSubscriberErrorOnLinkCLose(TestContext testContext) throws Exception {
        this.server.close();
        Async async = testContext.async();
        Async async2 = testContext.async();
        Async async3 = testContext.async();
        Async async4 = testContext.async();
        ProtonServer protonServer = null;
        try {
            protonServer = createServer(protonConnection -> {
                protonConnection.openHandler(asyncResult -> {
                    protonConnection.open();
                });
                protonConnection.sessionOpenHandler(protonSession -> {
                    protonSession.open();
                });
                protonConnection.senderOpenHandler(protonSender -> {
                    protonSender.closeHandler(asyncResult2 -> {
                        async3.complete();
                    });
                    LOG.trace("Server sender opened");
                    protonSender.open();
                    async2.complete();
                    protonSender.close();
                });
            });
            ProtonClient.create(this.vertx).connect("localhost", protonServer.actualPort(), asyncResult -> {
                testContext.assertTrue(asyncResult.succeeded());
                ProtonConnection protonConnection2 = (ProtonConnection) asyncResult.result();
                protonConnection2.open();
                ProtonStreams.createDeliveryConsumer(protonConnection2, "myAddress").subscribe(new Subscriber<Delivery>() { // from class: io.vertx.proton.streams.ProtonPublisherIntTest.2
                    public void onSubscribe(Subscription subscription) {
                        async.complete();
                    }

                    public void onNext(Delivery delivery) {
                        testContext.fail("onNext called");
                    }

                    public void onError(Throwable th) {
                        async4.complete();
                    }

                    public void onComplete() {
                        testContext.fail("onComplete called");
                    }
                });
            });
            async2.awaitSuccess();
            async.awaitSuccess();
            async4.awaitSuccess();
            async3.awaitSuccess();
            if (protonServer != null) {
                protonServer.close();
            }
        } catch (Throwable th) {
            if (protonServer != null) {
                protonServer.close();
            }
            throw th;
        }
    }

    @Test(timeout = 20000)
    public void testSubscriberErrorOnConnectionEnd(TestContext testContext) throws Exception {
        this.server.close();
        Async async = testContext.async();
        Async async2 = testContext.async();
        Async async3 = testContext.async();
        ProtonServer protonServer = null;
        try {
            ProtonServer createServer = createServer(protonConnection -> {
                protonConnection.openHandler(asyncResult -> {
                    protonConnection.open();
                });
                protonConnection.sessionOpenHandler(protonSession -> {
                    protonSession.open();
                });
                protonConnection.senderOpenHandler(protonSender -> {
                    LOG.trace("Server sender opened");
                    protonSender.open();
                    async2.complete();
                });
            });
            ProtonClient.create(this.vertx).connect("localhost", createServer.actualPort(), asyncResult -> {
                testContext.assertTrue(asyncResult.succeeded());
                ProtonConnection protonConnection2 = (ProtonConnection) asyncResult.result();
                protonConnection2.open();
                ProtonStreams.createDeliveryConsumer(protonConnection2, "myAddress").subscribe(new Subscriber<Delivery>() { // from class: io.vertx.proton.streams.ProtonPublisherIntTest.3
                    public void onSubscribe(Subscription subscription) {
                        async.complete();
                    }

                    public void onNext(Delivery delivery) {
                        testContext.fail("onNext called");
                    }

                    public void onError(Throwable th) {
                        ProtonPublisherIntTest.LOG.trace("onError called");
                        async3.complete();
                    }

                    public void onComplete() {
                        ProtonPublisherIntTest.LOG.trace("onComplete called");
                        testContext.fail("onComplete called");
                    }
                });
            });
            async2.awaitSuccess();
            async.awaitSuccess();
            testContext.assertFalse(async3.isCompleted());
            createServer.close();
            protonServer = null;
            async3.awaitSuccess();
            if (0 != 0) {
                protonServer.close();
            }
        } catch (Throwable th) {
            if (protonServer != null) {
                protonServer.close();
            }
            throw th;
        }
    }

    @Test(timeout = 20000)
    public void testConfigureSubscriptionDynamic(TestContext testContext) throws Exception {
        this.server.close();
        Async async = testContext.async();
        Async async2 = testContext.async();
        Async async3 = testContext.async();
        String str = "testConfigureSubscriptionDynamic:" + UUID.randomUUID();
        ProtonServer protonServer = null;
        try {
            protonServer = createServer(protonConnection -> {
                protonConnection.openHandler(asyncResult -> {
                    protonConnection.open();
                });
                protonConnection.sessionOpenHandler(protonSession -> {
                    protonSession.open();
                });
                protonConnection.senderOpenHandler(protonSender -> {
                    protonSender.closeHandler(asyncResult2 -> {
                        protonSender.close();
                    });
                    testContext.assertNotNull(protonSender.getRemoteSource(), "source should not be null");
                    Source remoteSource = protonSender.getRemoteSource();
                    testContext.assertTrue(remoteSource.getDynamic(), "expected dynamic source");
                    testContext.assertNull(remoteSource.getAddress(), "expected no source address");
                    Source copy = remoteSource.copy();
                    copy.setAddress(str);
                    protonSender.setSource(copy);
                    LOG.trace("Server sender opened");
                    protonSender.open();
                    async2.complete();
                });
            });
            ProtonClient.create(this.vertx).connect("localhost", protonServer.actualPort(), asyncResult -> {
                testContext.assertTrue(asyncResult.succeeded());
                ProtonConnection protonConnection2 = (ProtonConnection) asyncResult.result();
                protonConnection2.open();
                final ProtonPublisher createDeliveryConsumer = ProtonStreams.createDeliveryConsumer(protonConnection2, (String) null, new ProtonPublisherOptions().setDynamic(true));
                createDeliveryConsumer.subscribe(new Subscriber<Delivery>() { // from class: io.vertx.proton.streams.ProtonPublisherIntTest.4
                    Subscription sub = null;

                    public void onSubscribe(Subscription subscription) {
                        testContext.assertEquals(str, createDeliveryConsumer.getRemoteAddress(), "unexpected remote address");
                        Source remoteSource = createDeliveryConsumer.getRemoteSource();
                        testContext.assertTrue(remoteSource.getDynamic(), "expected dynamic source");
                        testContext.assertEquals(str, remoteSource.getAddress(), "unexpected source address");
                        async.complete();
                        this.sub = subscription;
                        subscription.request(1L);
                        this.sub.cancel();
                    }

                    public void onNext(Delivery delivery) {
                    }

                    public void onError(Throwable th) {
                        if (async3.isCompleted()) {
                            return;
                        }
                        testContext.fail("onError called");
                    }

                    public void onComplete() {
                        async3.complete();
                    }
                });
            });
            async2.awaitSuccess();
            async.awaitSuccess();
            async3.awaitSuccess();
            if (protonServer != null) {
                protonServer.close();
            }
        } catch (Throwable th) {
            if (protonServer != null) {
                protonServer.close();
            }
            throw th;
        }
    }

    @Test(timeout = 20000)
    public void testDelayedInitialRequest(TestContext testContext) {
        Async async = testContext.async();
        connect(testContext, protonConnection -> {
            protonConnection.open();
            final AtomicInteger atomicInteger = new AtomicInteger(0);
            final AtomicBoolean atomicBoolean = new AtomicBoolean();
            final AtomicBoolean atomicBoolean2 = new AtomicBoolean();
            final int i = 250;
            final long currentTimeMillis = System.currentTimeMillis();
            ProtonStreams.createDeliveryConsumer(protonConnection, "two_messages").subscribe(new Subscriber<Delivery>() { // from class: io.vertx.proton.streams.ProtonPublisherIntTest.5
                Subscription sub = null;

                public void onSubscribe(Subscription subscription) {
                    Vertx vertx = ProtonPublisherIntTest.this.vertx;
                    long j = i;
                    AtomicBoolean atomicBoolean3 = atomicBoolean;
                    vertx.setTimer(j, l -> {
                        ProtonPublisherIntTest.LOG.trace("Flowing initial credit");
                        atomicBoolean3.set(true);
                        this.sub = subscription;
                        this.sub.request(1L);
                    });
                }

                public void onNext(Delivery delivery) {
                    int incrementAndGet = atomicInteger.incrementAndGet();
                    switch (incrementAndGet) {
                        case 1:
                            ProtonPublisherIntTest.this.validateMessage(testContext, incrementAndGet, String.valueOf(incrementAndGet), delivery.message());
                            testContext.assertTrue(atomicBoolean.get(), "Initial credit not yet granted, so we should not have received message 1 yet!");
                            testContext.assertTrue(System.currentTimeMillis() > currentTimeMillis + ((long) i), "Message received before expected time delay elapsed!");
                            ProtonPublisherIntTest.LOG.trace("Got msg 1");
                            Vertx vertx = ProtonPublisherIntTest.this.vertx;
                            long j = i;
                            AtomicBoolean atomicBoolean3 = atomicBoolean2;
                            vertx.setTimer(j, l -> {
                                ProtonPublisherIntTest.LOG.trace("Granting additional credit");
                                atomicBoolean3.set(true);
                                this.sub.request(1L);
                            });
                            return;
                        case 2:
                            ProtonPublisherIntTest.this.validateMessage(testContext, incrementAndGet, String.valueOf(incrementAndGet), delivery.message());
                            testContext.assertTrue(atomicBoolean2.get(), "Additional credit not yet granted, so we should not have received message " + incrementAndGet + " yet!");
                            testContext.assertTrue(System.currentTimeMillis() > currentTimeMillis + 500, "Message received before expected time delay elapsed!");
                            ProtonPublisherIntTest.LOG.trace("Got msg 2, completing async");
                            async.complete();
                            protonConnection.disconnect();
                            return;
                        default:
                            return;
                    }
                }

                public void onError(Throwable th) {
                    if (async.isCompleted()) {
                        return;
                    }
                    testContext.fail("onError called");
                }

                public void onComplete() {
                    testContext.fail("onComplete called");
                }
            });
        });
    }

    @Test(timeout = 20000)
    public void testImmediateInitialRequest(TestContext testContext) {
        Async async = testContext.async();
        connect(testContext, protonConnection -> {
            protonConnection.open();
            final AtomicInteger atomicInteger = new AtomicInteger(0);
            final AtomicBoolean atomicBoolean = new AtomicBoolean();
            ProtonStreams.createDeliveryConsumer(protonConnection, "five_messages").subscribe(new Subscriber<Delivery>() { // from class: io.vertx.proton.streams.ProtonPublisherIntTest.6
                Subscription sub = null;

                public void onSubscribe(Subscription subscription) {
                    ProtonPublisherIntTest.LOG.trace("Flowing initial credit");
                    this.sub = subscription;
                    this.sub.request(4L);
                }

                public void onNext(Delivery delivery) {
                    int incrementAndGet = atomicInteger.incrementAndGet();
                    switch (incrementAndGet) {
                        case 1:
                        case 2:
                        case 3:
                            ProtonPublisherIntTest.this.validateMessage(testContext, incrementAndGet, String.valueOf(incrementAndGet), delivery.message());
                            return;
                        case 4:
                            ProtonPublisherIntTest.this.validateMessage(testContext, incrementAndGet, String.valueOf(incrementAndGet), delivery.message());
                            Vertx vertx = ProtonPublisherIntTest.this.vertx;
                            AtomicBoolean atomicBoolean2 = atomicBoolean;
                            vertx.setTimer(1000L, l -> {
                                ProtonPublisherIntTest.LOG.trace("Flowing more credit");
                                atomicBoolean2.set(true);
                                this.sub.request(1L);
                            });
                            Vertx vertx2 = ProtonPublisherIntTest.this.vertx;
                            TestContext testContext2 = testContext;
                            AtomicInteger atomicInteger2 = atomicInteger;
                            vertx2.setTimer(500L, l2 -> {
                                ProtonPublisherIntTest.LOG.trace("Checking msg 5 not received yet");
                                testContext2.assertEquals(4, Integer.valueOf(atomicInteger2.get()));
                            });
                            return;
                        case 5:
                            ProtonPublisherIntTest.this.validateMessage(testContext, incrementAndGet, String.valueOf(incrementAndGet), delivery.message());
                            testContext.assertTrue(atomicBoolean.get(), "Additional credit not yet granted, so we should not have received message 5 yet!");
                            ProtonPublisherIntTest.LOG.trace("Got msg 5, completing async");
                            async.complete();
                            protonConnection.disconnect();
                            return;
                        default:
                            return;
                    }
                }

                public void onError(Throwable th) {
                    if (async.isCompleted()) {
                        return;
                    }
                    testContext.fail("onError called");
                }

                public void onComplete() {
                    testContext.fail("onComplete called");
                }
            });
        });
    }

    @Test(timeout = 20000)
    public void testMaxOutstandingCredit1(TestContext testContext) throws Exception {
        this.server.close();
        Async async = testContext.async();
        Async async2 = testContext.async();
        Async async3 = testContext.async();
        Async async4 = testContext.async();
        ArrayList arrayList = new ArrayList();
        AtomicInteger atomicInteger = new AtomicInteger();
        ProtonServer protonServer = null;
        try {
            protonServer = createServer(protonConnection -> {
                protonConnection.openHandler(asyncResult -> {
                    protonConnection.open();
                });
                protonConnection.sessionOpenHandler(protonSession -> {
                    protonSession.open();
                });
                protonConnection.senderOpenHandler(protonSender -> {
                    LOG.trace("Server sender opened");
                    protonSender.open();
                    AtomicInteger atomicInteger2 = new AtomicInteger();
                    protonSender.sendQueueDrainHandler(protonSender -> {
                        int credit = protonSender.getCredit();
                        LOG.trace("Server credit: " + credit);
                        arrayList.add(Integer.valueOf(credit));
                        if (credit > 1000) {
                            testContext.fail("Received unexpected amount of credit: " + credit);
                            return;
                        }
                        if (credit == 1000) {
                            LOG.trace("Server reached max outstanding: " + credit);
                            async.complete();
                            async2.await();
                            while (!protonSender.sendQueueFull()) {
                                protonSender.send(ProtonHelper.message(String.valueOf(atomicInteger2.incrementAndGet())));
                            }
                            return;
                        }
                        if (async.isCompleted()) {
                            LOG.trace("Server received topup credit, post-max-outstanding: " + credit);
                            async3.complete();
                            while (!protonSender.sendQueueFull()) {
                                protonSender.send(ProtonHelper.message(String.valueOf(atomicInteger2.incrementAndGet())));
                            }
                        }
                    });
                });
            });
            ProtonClient.create(this.vertx).connect("localhost", protonServer.actualPort(), asyncResult -> {
                testContext.assertTrue(asyncResult.succeeded());
                ProtonConnection protonConnection2 = (ProtonConnection) asyncResult.result();
                protonConnection2.open();
                ProtonStreams.createDeliveryConsumer(protonConnection2, "myAddress").subscribe(new Subscriber<Delivery>() { // from class: io.vertx.proton.streams.ProtonPublisherIntTest.7
                    Subscription sub = null;

                    public void onSubscribe(Subscription subscription) {
                        ProtonPublisherIntTest.LOG.trace("Flowing initial credit");
                        this.sub = subscription;
                        this.sub.request(15L);
                        ProtonPublisherIntTest.this.vertx.setTimer(250L, l -> {
                            ProtonPublisherIntTest.LOG.trace("Granting additional credit");
                            this.sub.request(1100L);
                        });
                    }

                    public void onNext(Delivery delivery) {
                        int incrementAndGet = atomicInteger.incrementAndGet();
                        ProtonPublisherIntTest.this.validateMessage(testContext, incrementAndGet, String.valueOf(incrementAndGet), delivery.message());
                        if (incrementAndGet >= 1115) {
                            async4.complete();
                        }
                    }

                    public void onError(Throwable th) {
                        if (async4.isCompleted()) {
                            return;
                        }
                        testContext.fail("onError called");
                    }

                    public void onComplete() {
                        testContext.fail("onComplete called");
                    }
                });
            });
            async.awaitSuccess();
            testContext.assertEquals(Integer.valueOf(arrayList.size()), 2, "Unexpected credits count");
            testContext.assertEquals(arrayList.get(0), 15, "Unexpected credit 1");
            testContext.assertEquals(arrayList.get(1), 1000, "Unexpected credit 2");
            async2.complete();
            async3.awaitSuccess();
            testContext.assertEquals(Integer.valueOf(arrayList.size()), 3, "Unexpected credits count");
            testContext.assertEquals(arrayList.get(2), 115, "Unexpected credit 3");
            async4.awaitSuccess();
            if (protonServer != null) {
                protonServer.close();
            }
        } catch (Throwable th) {
            if (protonServer != null) {
                protonServer.close();
            }
            throw th;
        }
    }

    @Test(timeout = 20000)
    public void testMaxOutstandingCredit2(TestContext testContext) throws Exception {
        this.server.close();
        Async async = testContext.async();
        AtomicInteger atomicInteger = new AtomicInteger();
        AtomicInteger atomicInteger2 = new AtomicInteger();
        AtomicInteger atomicInteger3 = new AtomicInteger();
        ProtonServer protonServer = null;
        try {
            protonServer = createServer(protonConnection -> {
                protonConnection.openHandler(asyncResult -> {
                    protonConnection.open();
                });
                protonConnection.sessionOpenHandler(protonSession -> {
                    protonSession.open();
                });
                protonConnection.senderOpenHandler(protonSender -> {
                    LOG.trace("Server sender opened");
                    protonSender.open();
                    AtomicInteger atomicInteger4 = new AtomicInteger();
                    protonSender.sendQueueDrainHandler(protonSender -> {
                        int credit = protonSender.getCredit();
                        LOG.trace("Server credit: " + credit);
                        int i = atomicInteger2.get();
                        if (credit > i) {
                            atomicInteger2.compareAndSet(i, credit);
                        }
                        while (!protonSender.sendQueueFull()) {
                            protonSender.send(ProtonHelper.message(String.valueOf(atomicInteger4.incrementAndGet())));
                        }
                        atomicInteger.addAndGet(credit);
                    });
                });
            });
            ProtonClient.create(this.vertx).connect("localhost", protonServer.actualPort(), asyncResult -> {
                testContext.assertTrue(asyncResult.succeeded());
                ProtonConnection protonConnection2 = (ProtonConnection) asyncResult.result();
                protonConnection2.open();
                ProtonStreams.createDeliveryConsumer(protonConnection2, "myAddress", new ProtonPublisherOptions().setMaxOutstandingCredit(20)).subscribe(new Subscriber<Delivery>() { // from class: io.vertx.proton.streams.ProtonPublisherIntTest.8
                    Subscription sub = null;

                    public void onSubscribe(Subscription subscription) {
                        ProtonPublisherIntTest.LOG.trace("Flowing initial credit");
                        this.sub = subscription;
                        this.sub.request(107L);
                    }

                    public void onNext(Delivery delivery) {
                        int incrementAndGet = atomicInteger3.incrementAndGet();
                        ProtonPublisherIntTest.this.validateMessage(testContext, incrementAndGet, String.valueOf(incrementAndGet), delivery.message());
                        if (incrementAndGet >= 107) {
                            async.complete();
                        }
                    }

                    public void onError(Throwable th) {
                        if (async.isCompleted()) {
                            return;
                        }
                        testContext.fail("onError called");
                    }

                    public void onComplete() {
                        testContext.fail("onComplete called");
                    }
                });
            });
            async.awaitSuccess();
            testContext.assertEquals(Integer.valueOf(atomicInteger2.get()), 20, "Unexpected max credit");
            testContext.assertEquals(Integer.valueOf(atomicInteger.get()), 107, "Unexpected total credits received");
            if (protonServer != null) {
                protonServer.close();
            }
        } catch (Throwable th) {
            if (protonServer != null) {
                protonServer.close();
            }
            throw th;
        }
    }

    /* JADX WARN: Finally extract failed */
    @Test(timeout = 20000)
    public void testAutoAccept(TestContext testContext) throws Exception {
        this.server.close();
        Async async = testContext.async();
        Async async2 = testContext.async();
        int i = 5;
        List synchronizedList = Collections.synchronizedList(new ArrayList());
        AtomicInteger atomicInteger = new AtomicInteger(1);
        ProtonServer protonServer = null;
        try {
            protonServer = createServer(protonConnection -> {
                protonConnection.openHandler(asyncResult -> {
                    protonConnection.open();
                });
                protonConnection.sessionOpenHandler(protonSession -> {
                    protonSession.open();
                });
                protonConnection.senderOpenHandler(protonSender -> {
                    LOG.trace("Server sender opened");
                    protonSender.open();
                    protonSender.sendQueueDrainHandler(protonSender -> {
                        int i2 = atomicInteger.get();
                        while (true) {
                            int i3 = i2;
                            if (i3 > i) {
                                return;
                            }
                            protonSender.send(ProtonHelper.message(String.valueOf(i3)), protonDelivery -> {
                                LOG.trace("Server received disposition for msg: " + i3);
                                if (!(protonDelivery.getRemoteState() instanceof Accepted)) {
                                    testContext.fail("Expected message to be accepted");
                                    return;
                                }
                                synchronizedList.add(Integer.valueOf(i3));
                                if (synchronizedList.size() == i) {
                                    async2.complete();
                                }
                            });
                            i2 = atomicInteger.incrementAndGet();
                        }
                    });
                });
            });
            AtomicInteger atomicInteger2 = new AtomicInteger(0);
            ProtonClient.create(this.vertx).connect("localhost", protonServer.actualPort(), asyncResult -> {
                testContext.assertTrue(asyncResult.succeeded());
                ProtonConnection protonConnection2 = (ProtonConnection) asyncResult.result();
                ProtonStreams.createConsumer(protonConnection2, "myAddress").subscribe(new Subscriber<Message>() { // from class: io.vertx.proton.streams.ProtonPublisherIntTest.9
                    Subscription sub = null;

                    public void onSubscribe(Subscription subscription) {
                        this.sub = subscription;
                        ProtonPublisherIntTest.LOG.trace("Flowing initial credit");
                        this.sub.request(i);
                    }

                    public void onNext(Message message) {
                        int incrementAndGet = atomicInteger2.incrementAndGet();
                        ProtonPublisherIntTest.this.validateMessage(testContext, incrementAndGet, String.valueOf(incrementAndGet), message);
                        if (incrementAndGet == i) {
                            ProtonPublisherIntTest.LOG.trace("Got all messages, completing async");
                            async.complete();
                        }
                    }

                    public void onError(Throwable th) {
                        if (async.isCompleted()) {
                            return;
                        }
                        testContext.fail("onError called");
                    }

                    public void onComplete() {
                        testContext.fail("onComplete called");
                    }
                });
                protonConnection2.open();
            });
            async.awaitSuccess();
            async2.awaitSuccess();
            if (protonServer != null) {
                protonServer.close();
            }
            testContext.assertEquals(Integer.valueOf(synchronizedList.size()), 5, "Unexpected accepted count");
            for (int i2 = 1; i2 <= 5; i2++) {
                testContext.assertEquals(synchronizedList.remove(0), Integer.valueOf(i2), "Unexpected msgNum");
            }
        } catch (Throwable th) {
            if (protonServer != null) {
                protonServer.close();
            }
            throw th;
        }
    }

    /* JADX WARN: Finally extract failed */
    @Test(timeout = 20000)
    public void testManualAccept(TestContext testContext) throws Exception {
        this.server.close();
        Async async = testContext.async();
        Async async2 = testContext.async();
        int i = 5;
        List synchronizedList = Collections.synchronizedList(new ArrayList());
        List synchronizedList2 = Collections.synchronizedList(new ArrayList());
        AtomicInteger atomicInteger = new AtomicInteger(1);
        ProtonServer protonServer = null;
        try {
            protonServer = createServer(protonConnection -> {
                protonConnection.openHandler(asyncResult -> {
                    protonConnection.open();
                });
                protonConnection.sessionOpenHandler(protonSession -> {
                    protonSession.open();
                });
                protonConnection.senderOpenHandler(protonSender -> {
                    LOG.trace("Server sender opened");
                    protonSender.open();
                    protonSender.sendQueueDrainHandler(protonSender -> {
                        int i2 = atomicInteger.get();
                        while (true) {
                            int i3 = i2;
                            if (i3 > i) {
                                return;
                            }
                            protonSender.send(ProtonHelper.message(String.valueOf(i3)), protonDelivery -> {
                                LOG.trace("Server received disposition for msg: " + i3);
                                if (!(protonDelivery.getRemoteState() instanceof Accepted)) {
                                    testContext.fail("Expected message to be accepted");
                                    return;
                                }
                                synchronizedList.add(Integer.valueOf(i3));
                                if (synchronizedList.size() == i) {
                                    async2.complete();
                                }
                            });
                            i2 = atomicInteger.incrementAndGet();
                        }
                    });
                });
            });
            AtomicInteger atomicInteger2 = new AtomicInteger(0);
            ProtonClient.create(this.vertx).connect("localhost", protonServer.actualPort(), asyncResult -> {
                testContext.assertTrue(asyncResult.succeeded());
                ProtonConnection protonConnection2 = (ProtonConnection) asyncResult.result();
                ProtonStreams.createDeliveryConsumer(protonConnection2, "myAddress").subscribe(new Subscriber<Delivery>() { // from class: io.vertx.proton.streams.ProtonPublisherIntTest.10
                    Subscription sub = null;

                    public void onSubscribe(Subscription subscription) {
                        this.sub = subscription;
                        ProtonPublisherIntTest.LOG.trace("Flowing initial credit");
                        this.sub.request(i);
                    }

                    public void onNext(Delivery delivery) {
                        int incrementAndGet = atomicInteger2.incrementAndGet();
                        synchronizedList2.add(delivery);
                        if (incrementAndGet == i) {
                            ProtonPublisherIntTest.LOG.trace("Got all messages, completing async");
                            async.complete();
                        }
                    }

                    public void onError(Throwable th) {
                        if (async.isCompleted()) {
                            return;
                        }
                        testContext.fail("onError called");
                    }

                    public void onComplete() {
                        testContext.fail("onComplete called");
                    }
                });
                protonConnection2.open();
            });
            async.awaitSuccess();
            Thread.sleep(50L);
            testContext.assertTrue(synchronizedList.isEmpty(), "Unexpected accepted count");
            Iterator it = synchronizedList2.iterator();
            while (it.hasNext()) {
                ((Delivery) it.next()).accept();
            }
            async2.awaitSuccess();
            if (protonServer != null) {
                protonServer.close();
            }
            testContext.assertEquals(Integer.valueOf(synchronizedList.size()), 5, "Unexpected accepted count");
            for (int i2 = 1; i2 <= 5; i2++) {
                testContext.assertEquals(synchronizedList.remove(0), Integer.valueOf(i2), "Unexpected msgNum");
            }
        } catch (Throwable th) {
            if (protonServer != null) {
                protonServer.close();
            }
            throw th;
        }
    }

    @Test(timeout = 20000)
    public void testAlternativeDispositionStates(TestContext testContext) throws Exception {
        this.server.close();
        Async async = testContext.async();
        Async async2 = testContext.async();
        int i = 4;
        List synchronizedList = Collections.synchronizedList(new ArrayList());
        List synchronizedList2 = Collections.synchronizedList(new ArrayList());
        AtomicInteger atomicInteger = new AtomicInteger(1);
        ProtonServer protonServer = null;
        try {
            protonServer = createServer(protonConnection -> {
                protonConnection.openHandler(asyncResult -> {
                    protonConnection.open();
                });
                protonConnection.sessionOpenHandler(protonSession -> {
                    protonSession.open();
                });
                protonConnection.senderOpenHandler(protonSender -> {
                    LOG.trace("Server sender opened");
                    protonSender.open();
                    protonSender.sendQueueDrainHandler(protonSender -> {
                        int i2 = atomicInteger.get();
                        while (true) {
                            int i3 = i2;
                            if (i3 > i) {
                                return;
                            }
                            protonSender.send(ProtonHelper.message(String.valueOf(i3)), protonDelivery -> {
                                LOG.trace("Server received disposition for msg: " + i3);
                                DeliveryState remoteState = protonDelivery.getRemoteState();
                                testContext.assertNotNull(remoteState, "Expected message to have a delivery state");
                                synchronizedList.add(remoteState);
                                if (synchronizedList.size() == i) {
                                    async2.complete();
                                }
                            });
                            i2 = atomicInteger.incrementAndGet();
                        }
                    });
                });
            });
            AtomicInteger atomicInteger2 = new AtomicInteger(0);
            ProtonClient.create(this.vertx).connect("localhost", protonServer.actualPort(), asyncResult -> {
                testContext.assertTrue(asyncResult.succeeded());
                ProtonConnection protonConnection2 = (ProtonConnection) asyncResult.result();
                ProtonStreams.createDeliveryConsumer(protonConnection2, "myAddress").subscribe(new Subscriber<Delivery>() { // from class: io.vertx.proton.streams.ProtonPublisherIntTest.11
                    Subscription sub = null;

                    public void onSubscribe(Subscription subscription) {
                        this.sub = subscription;
                        ProtonPublisherIntTest.LOG.trace("Flowing initial credit");
                        this.sub.request(i);
                    }

                    public void onNext(Delivery delivery) {
                        int incrementAndGet = atomicInteger2.incrementAndGet();
                        synchronizedList2.add(delivery);
                        if (incrementAndGet == i) {
                            ProtonPublisherIntTest.LOG.trace("Got all messages, completing async");
                            async.complete();
                        }
                    }

                    public void onError(Throwable th) {
                        if (async.isCompleted()) {
                            return;
                        }
                        testContext.fail("onError called");
                    }

                    public void onComplete() {
                        testContext.fail("onComplete called");
                    }
                });
                protonConnection2.open();
            });
            async.awaitSuccess();
            Thread.sleep(50L);
            testContext.assertTrue(synchronizedList.isEmpty(), "Unexpected acks count");
            ((Delivery) synchronizedList2.get(0)).disposition(Accepted.getInstance(), true);
            ((Delivery) synchronizedList2.get(1)).disposition(Released.getInstance(), true);
            ((Delivery) synchronizedList2.get(2)).disposition(new Rejected(), true);
            ((Delivery) synchronizedList2.get(3)).disposition(new Modified(), true);
            async2.awaitSuccess();
            if (protonServer != null) {
                protonServer.close();
            }
            testContext.assertEquals(Integer.valueOf(synchronizedList.size()), 4, "Unexpected state count: " + synchronizedList);
            testContext.assertTrue(synchronizedList.get(0) instanceof Accepted, "Unexpected state: " + synchronizedList.get(0));
            testContext.assertTrue(synchronizedList.get(1) instanceof Released, "Unexpected state: " + synchronizedList.get(1));
            testContext.assertTrue(synchronizedList.get(2) instanceof Rejected, "Unexpected state: " + synchronizedList.get(2));
            testContext.assertTrue(synchronizedList.get(3) instanceof Modified, "Unexpected state: " + synchronizedList.get(3));
        } catch (Throwable th) {
            if (protonServer != null) {
                protonServer.close();
            }
            throw th;
        }
    }

    @Test(timeout = 20000)
    public void testConfigureSubscriptionLinkName(TestContext testContext) throws Exception {
        doSubscriptionConfigTestImpl(testContext, false, "testConfigureSubscriptionLinkName", false, false);
    }

    @Test(timeout = 20000)
    public void testConfigureSubscriptionDurable(TestContext testContext) throws Exception {
        doSubscriptionConfigTestImpl(testContext, true, "testConfigureSubscriptionDurable", false, false);
    }

    @Test(timeout = 20000)
    public void testConfigureSubscriptionDurableShared(TestContext testContext) throws Exception {
        doSubscriptionConfigTestImpl(testContext, true, "testConfigureSubscriptionDurableShared", true, false);
    }

    @Test(timeout = 20000)
    public void testConfigureSubscriptionDurableSharedGlobal(TestContext testContext) throws Exception {
        doSubscriptionConfigTestImpl(testContext, true, "testConfigureSubscriptionDurableSharedGlobal", true, true);
    }

    private void doSubscriptionConfigTestImpl(TestContext testContext, boolean z, String str, boolean z2, boolean z3) throws InterruptedException, ExecutionException {
        this.server.close();
        Async async = testContext.async();
        Async async2 = testContext.async();
        Async async3 = testContext.async();
        ProtonServer protonServer = null;
        try {
            protonServer = createServer(protonConnection -> {
                protonConnection.openHandler(asyncResult -> {
                    protonConnection.open();
                });
                protonConnection.sessionOpenHandler(protonSession -> {
                    protonSession.open();
                });
                protonConnection.senderOpenHandler(protonSender -> {
                    protonSender.closeHandler(asyncResult2 -> {
                        testContext.assertFalse(z, "unexpected link close for durable sub");
                        protonSender.close();
                    });
                    protonSender.detachHandler(asyncResult3 -> {
                        testContext.assertTrue(z, "unexpected link detach for non-durable sub");
                        protonSender.detach();
                    });
                    LOG.trace("Server sender opened");
                    protonSender.open();
                    testContext.assertEquals(str, protonSender.getName(), "unexpected link name");
                    testContext.assertNotNull(protonSender.getRemoteSource(), "source should not be null");
                    Source remoteSource = protonSender.getRemoteSource();
                    if (z) {
                        testContext.assertEquals(TerminusExpiryPolicy.NEVER, remoteSource.getExpiryPolicy(), "unexpected expiry");
                        testContext.assertEquals(TerminusDurability.UNSETTLED_STATE, remoteSource.getDurable(), "unexpected durability");
                    }
                    Symbol[] capabilities = remoteSource.getCapabilities();
                    if (z2 && z3) {
                        testContext.assertTrue(Arrays.equals(new Symbol[]{Symbol.valueOf("shared"), Symbol.valueOf("global")}, capabilities), "Unexpected capabilities: " + Arrays.toString(capabilities));
                    } else if (z2) {
                        testContext.assertTrue(Arrays.equals(new Symbol[]{Symbol.valueOf("shared")}, capabilities), "Unexpected capabilities: " + Arrays.toString(capabilities));
                    }
                    async2.complete();
                });
            });
            ProtonClient.create(this.vertx).connect("localhost", protonServer.actualPort(), asyncResult -> {
                testContext.assertTrue(asyncResult.succeeded());
                ProtonConnection protonConnection2 = (ProtonConnection) asyncResult.result();
                protonConnection2.open();
                ProtonPublisherOptions linkName = new ProtonPublisherOptions().setLinkName(str);
                if (z) {
                    linkName.setDurable(true);
                }
                if (z2) {
                    linkName.setShared(true);
                }
                if (z3) {
                    linkName.setGlobal(true);
                }
                ProtonStreams.createDeliveryConsumer(protonConnection2, "myAddress", linkName).subscribe(new Subscriber<Delivery>() { // from class: io.vertx.proton.streams.ProtonPublisherIntTest.12
                    Subscription sub = null;

                    public void onSubscribe(Subscription subscription) {
                        async.complete();
                        this.sub = subscription;
                        subscription.request(1L);
                        this.sub.cancel();
                    }

                    public void onNext(Delivery delivery) {
                    }

                    public void onError(Throwable th) {
                        if (async3.isCompleted()) {
                            return;
                        }
                        testContext.fail("onError called");
                    }

                    public void onComplete() {
                        async3.complete();
                    }
                });
            });
            async2.awaitSuccess();
            async.awaitSuccess();
            async3.awaitSuccess();
            if (protonServer != null) {
                protonServer.close();
            }
        } catch (Throwable th) {
            if (protonServer != null) {
                protonServer.close();
            }
            throw th;
        }
    }

    @Test(timeout = 20000)
    public void testCreateUsingCustomSource(TestContext testContext) throws Exception {
        this.server.close();
        Async async = testContext.async();
        Async async2 = testContext.async();
        Async async3 = testContext.async();
        String str = "testCreateUsingCustomSource:" + UUID.randomUUID();
        ProtonServer protonServer = null;
        try {
            protonServer = createServer(protonConnection -> {
                protonConnection.openHandler(asyncResult -> {
                    protonConnection.open();
                });
                protonConnection.sessionOpenHandler(protonSession -> {
                    protonSession.open();
                });
                protonConnection.senderOpenHandler(protonSender -> {
                    protonSender.closeHandler(asyncResult2 -> {
                        protonSender.close();
                    });
                    testContext.assertNotNull(protonSender.getRemoteSource(), "source should not be null");
                    Source remoteSource = protonSender.getRemoteSource();
                    testContext.assertTrue(remoteSource.getDynamic(), "expected dynamic source");
                    testContext.assertNull(remoteSource.getAddress(), "expected no source address");
                    Source copy = remoteSource.copy();
                    copy.setAddress(str);
                    protonSender.setSource(copy);
                    LOG.trace("Server sender opened");
                    protonSender.open();
                    async2.complete();
                });
            });
            ProtonClient.create(this.vertx).connect("localhost", protonServer.actualPort(), asyncResult -> {
                testContext.assertTrue(asyncResult.succeeded());
                ProtonConnection protonConnection2 = (ProtonConnection) asyncResult.result();
                protonConnection2.open();
                final ProtonPublisher createDeliveryConsumer = ProtonStreams.createDeliveryConsumer(protonConnection2, (String) null);
                createDeliveryConsumer.getSource().setDynamic(true);
                createDeliveryConsumer.subscribe(new Subscriber<Delivery>() { // from class: io.vertx.proton.streams.ProtonPublisherIntTest.13
                    Subscription sub = null;

                    public void onSubscribe(Subscription subscription) {
                        Source remoteSource = createDeliveryConsumer.getRemoteSource();
                        testContext.assertTrue(remoteSource.getDynamic(), "expected dynamic source");
                        testContext.assertEquals(str, remoteSource.getAddress(), "unexpected source address");
                        async.complete();
                        this.sub = subscription;
                        subscription.request(1L);
                        this.sub.cancel();
                    }

                    public void onNext(Delivery delivery) {
                    }

                    public void onError(Throwable th) {
                        if (async3.isCompleted()) {
                            return;
                        }
                        testContext.fail("onError called");
                    }

                    public void onComplete() {
                        async3.complete();
                    }
                });
            });
            async2.awaitSuccess();
            async.awaitSuccess();
            async3.awaitSuccess();
            if (protonServer != null) {
                protonServer.close();
            }
        } catch (Throwable th) {
            if (protonServer != null) {
                protonServer.close();
            }
            throw th;
        }
    }

    private ProtonServer createServer(Handler<ProtonConnection> handler) throws InterruptedException, ExecutionException {
        ProtonServer create = ProtonServer.create(this.vertx);
        create.connectHandler(handler);
        FutureHandler asyncResult = FutureHandler.asyncResult();
        create.listen(0, asyncResult);
        asyncResult.get();
        return create;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void validateMessage(TestContext testContext, int i, Object obj, Message message) {
        Object messageBody = getMessageBody(testContext, message);
        if (LOG.isTraceEnabled()) {
            LOG.trace("Got msg " + i + ", body: " + messageBody);
        }
        testContext.assertEquals(obj, messageBody, "Unexpected message body");
    }

    private Object getMessageBody(TestContext testContext, Message message) {
        AmqpValue body = message.getBody();
        testContext.assertNotNull(body);
        testContext.assertTrue(body instanceof AmqpValue);
        return body.getValue();
    }
}
