package io.vertx.stomp.tests.impl;

import com.jayway.awaitility.Awaitility;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.ext.stomp.Command;
import io.vertx.ext.stomp.Frame;
import io.vertx.ext.stomp.StompClient;
import io.vertx.ext.stomp.StompClientConnection;
import io.vertx.ext.stomp.StompServer;
import io.vertx.ext.stomp.StompServerHandler;
import io.vertx.ext.stomp.utils.Headers;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:io/vertx/stomp/tests/impl/ReceiptTest.class */
public class ReceiptTest {
    private Vertx vertx;
    private StompServer server;
    private List<StompClient> clients = new ArrayList();

    @Before
    public void setUp() {
        this.vertx = Vertx.vertx();
        AsyncLock asyncLock = new AsyncLock();
        this.vertx = Vertx.vertx();
        this.server = StompServer.create(this.vertx).handler(StompServerHandler.create(this.vertx));
        this.server.listen().onComplete(asyncLock.handler());
        asyncLock.waitForSuccess();
    }

    @After
    public void tearDown() {
        this.clients.forEach((v0) -> {
            v0.close();
        });
        this.clients.clear();
        AsyncLock asyncLock = new AsyncLock();
        this.server.close().onComplete(asyncLock.handler());
        asyncLock.waitForSuccess();
        AsyncLock asyncLock2 = new AsyncLock();
        this.vertx.close().onComplete(asyncLock2.handler());
        asyncLock2.waitForSuccess();
    }

    private void client(Handler<AsyncResult<StompClientConnection>> handler) {
        StompClient create = StompClient.create(this.vertx);
        this.clients.add(create);
        create.connect().onComplete(handler);
    }

    @Test
    public void testReceiptsOnSend() {
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        CopyOnWriteArrayList copyOnWriteArrayList2 = new CopyOnWriteArrayList();
        client(asyncResult -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult.result();
            Objects.requireNonNull(copyOnWriteArrayList);
            stompClientConnection.subscribe("/queue", (v1) -> {
                r2.add(v1);
            }).onComplete(asyncResult -> {
                copyOnWriteArrayList2.add(asyncResult);
            });
        });
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(Helper.hasDestination(this.server.stompHandler().getDestinations(), "/queue"));
        });
        Assertions.assertThat(copyOnWriteArrayList2).hasSize(1);
        Assertions.assertThat(((AsyncResult) copyOnWriteArrayList2.get(0)).succeeded()).isTrue();
        Assertions.assertThat(((AsyncResult) copyOnWriteArrayList2.get(0)).result().toString()).isEqualTo("/queue");
        client(asyncResult2 -> {
            ((StompClientConnection) asyncResult2.result()).send("/queue", Buffer.buffer("Hello")).onComplete(asyncResult2 -> {
                copyOnWriteArrayList2.add(asyncResult2);
            });
        });
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(!copyOnWriteArrayList.isEmpty());
        });
        Assertions.assertThat(copyOnWriteArrayList2).hasSize(2);
    }

    @Test
    public void testReceiptsOnSubscribeAndUnsubscribe() {
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        CopyOnWriteArrayList copyOnWriteArrayList2 = new CopyOnWriteArrayList();
        AtomicReference atomicReference = new AtomicReference();
        client(asyncResult -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult.result();
            Objects.requireNonNull(copyOnWriteArrayList);
            stompClientConnection.subscribe("/queue", (v1) -> {
                r2.add(v1);
            }).onComplete(asyncResult -> {
                copyOnWriteArrayList2.add(asyncResult);
            });
        });
        client(asyncResult2 -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult2.result();
            atomicReference.set(stompClientConnection);
            Objects.requireNonNull(copyOnWriteArrayList);
            stompClientConnection.subscribe("/queue", (v1) -> {
                r2.add(v1);
            }).onComplete(asyncResult2 -> {
                copyOnWriteArrayList2.add(asyncResult2);
            });
        });
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(Helper.hasDestination(this.server.stompHandler().getDestinations(), "/queue"));
        });
        client(asyncResult3 -> {
            ((StompClientConnection) asyncResult3.result()).send("/queue", Buffer.buffer("Hello")).onComplete(asyncResult3 -> {
                copyOnWriteArrayList2.add(asyncResult3);
            });
        });
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(copyOnWriteArrayList.size() >= 2);
        });
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(copyOnWriteArrayList2.size() == 3);
        });
        ((StompClientConnection) atomicReference.get()).unsubscribe("/queue").onComplete(asyncResult4 -> {
            copyOnWriteArrayList2.add(asyncResult4);
        });
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(copyOnWriteArrayList2.size() == 4);
        });
    }

    @Test
    public void testReceiptsWithAck() {
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        client(asyncResult -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult.result();
            stompClientConnection.subscribe("/queue", Headers.create(new String[]{"ack", "client"}), frame -> {
                stompClientConnection.ack(frame.getAck());
            }).onComplete(asyncResult -> {
                copyOnWriteArrayList.add(asyncResult);
            });
        });
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(Helper.hasDestination(this.server.stompHandler().getDestinations(), "/queue"));
        });
        client(asyncResult2 -> {
            ((StompClientConnection) asyncResult2.result()).send("/queue", Buffer.buffer("Hello")).onComplete(asyncResult2 -> {
                copyOnWriteArrayList.add(asyncResult2);
            });
        });
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(copyOnWriteArrayList.size() == 2);
        });
    }

    @Test
    public void testReceiptsWithNack() {
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        client(asyncResult -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult.result();
            stompClientConnection.subscribe("/queue", Headers.create(new String[]{"ack", "client"}), frame -> {
                stompClientConnection.nack(frame.getAck());
            }).onComplete(asyncResult -> {
                copyOnWriteArrayList.add(asyncResult);
            });
        });
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(Helper.hasDestination(this.server.stompHandler().getDestinations(), "/queue"));
        });
        client(asyncResult2 -> {
            ((StompClientConnection) asyncResult2.result()).send("/queue", Buffer.buffer("Hello")).onComplete(asyncResult2 -> {
                copyOnWriteArrayList.add(asyncResult2);
            });
        });
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(copyOnWriteArrayList.size() == 2);
        });
    }

    @Test
    public void testReceiptsInCommittedTransactions() {
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        CopyOnWriteArrayList copyOnWriteArrayList2 = new CopyOnWriteArrayList();
        CopyOnWriteArrayList copyOnWriteArrayList3 = new CopyOnWriteArrayList();
        client(asyncResult -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult.result();
            Objects.requireNonNull(copyOnWriteArrayList2);
            stompClientConnection.subscribe("/queue", (v1) -> {
                r2.add(v1);
            }).onComplete(asyncResult -> {
                copyOnWriteArrayList.add(asyncResult);
            });
        });
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(copyOnWriteArrayList.size() == 1);
        });
        client(asyncResult2 -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult2.result();
            Objects.requireNonNull(copyOnWriteArrayList3);
            stompClientConnection.errorHandler((v1) -> {
                r1.add(v1);
            });
            stompClientConnection.beginTX("my-tx").onComplete(asyncResult2 -> {
                copyOnWriteArrayList.add(asyncResult2);
            });
            stompClientConnection.send(new Frame().setCommand(Command.SEND).setDestination("/queue").setTransaction("my-tx").setBody(Buffer.buffer("Hello"))).onComplete(asyncResult3 -> {
                copyOnWriteArrayList.add(asyncResult3);
            });
            stompClientConnection.send(new Frame().setCommand(Command.SEND).setDestination("/queue").setTransaction("my-tx").setBody(Buffer.buffer("World"))).onComplete(asyncResult4 -> {
                copyOnWriteArrayList.add(asyncResult4);
            });
            stompClientConnection.send(new Frame().setCommand(Command.SEND).setDestination("/queue").setTransaction("my-tx").setBody(Buffer.buffer("!!!"))).onComplete(asyncResult5 -> {
                copyOnWriteArrayList.add(asyncResult5);
            });
            stompClientConnection.commit("my-tx").onComplete(asyncResult6 -> {
                copyOnWriteArrayList.add(asyncResult6);
            });
        });
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(copyOnWriteArrayList2.size() == 3 && copyOnWriteArrayList3.isEmpty() && copyOnWriteArrayList.size() == 6);
        });
    }

    @Test
    public void testReceiptsInAbortedTransactions() throws InterruptedException {
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        CopyOnWriteArrayList copyOnWriteArrayList2 = new CopyOnWriteArrayList();
        CopyOnWriteArrayList copyOnWriteArrayList3 = new CopyOnWriteArrayList();
        client(asyncResult -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult.result();
            Objects.requireNonNull(copyOnWriteArrayList);
            stompClientConnection.subscribe("/queue", (v1) -> {
                r2.add(v1);
            }).onComplete(asyncResult -> {
                copyOnWriteArrayList3.add(asyncResult);
            });
        });
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(copyOnWriteArrayList3.size() == 1);
        });
        client(asyncResult2 -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult2.result();
            Objects.requireNonNull(copyOnWriteArrayList2);
            stompClientConnection.errorHandler((v1) -> {
                r1.add(v1);
            });
            stompClientConnection.beginTX("my-tx").onComplete(asyncResult2 -> {
                copyOnWriteArrayList3.add(asyncResult2);
            });
            stompClientConnection.send(new Frame().setCommand(Command.SEND).setDestination("/queue").setTransaction("my-tx").setBody(Buffer.buffer("Hello"))).onComplete(asyncResult3 -> {
                copyOnWriteArrayList3.add(asyncResult3);
            });
            stompClientConnection.send(new Frame().setCommand(Command.SEND).setDestination("/queue").setTransaction("my-tx").setBody(Buffer.buffer("World"))).onComplete(asyncResult4 -> {
                copyOnWriteArrayList3.add(asyncResult4);
            });
            stompClientConnection.send(new Frame().setCommand(Command.SEND).setDestination("/queue").setTransaction("my-tx").setBody(Buffer.buffer("!!!"))).onComplete(asyncResult5 -> {
                copyOnWriteArrayList3.add(asyncResult5);
            });
            stompClientConnection.abort("my-tx").onComplete(asyncResult6 -> {
                copyOnWriteArrayList3.add(asyncResult6);
            });
        });
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(copyOnWriteArrayList.size() == 0 && copyOnWriteArrayList2.isEmpty() && copyOnWriteArrayList3.size() == 6);
        });
    }
}
