package io.vertx.ext.stomp.impl;

import com.jayway.awaitility.Awaitility;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.ext.stomp.Command;
import io.vertx.ext.stomp.Destination;
import io.vertx.ext.stomp.Frame;
import io.vertx.ext.stomp.StompClient;
import io.vertx.ext.stomp.StompClientConnection;
import io.vertx.ext.stomp.StompServer;
import io.vertx.ext.stomp.StompServerHandler;
import io.vertx.ext.stomp.utils.Headers;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:io/vertx/ext/stomp/impl/SubscriptionsUsingQueueTest.class */
public class SubscriptionsUsingQueueTest {
    private Vertx vertx;
    private StompServer server;
    private List<StompClient> clients = new ArrayList();

    @Before
    public void setUp() {
        AsyncLock asyncLock = new AsyncLock();
        this.vertx = Vertx.vertx();
        this.server = StompServer.create(this.vertx).handler(StompServerHandler.create(this.vertx).destinationFactory(Destination::queue)).listen(asyncLock.handler());
        asyncLock.waitForSuccess();
    }

    @After
    public void tearDown() {
        AsyncLock asyncLock = new AsyncLock();
        this.clients.forEach((v0) -> {
            v0.close();
        });
        this.clients.clear();
        this.server.close(asyncLock.handler());
        asyncLock.waitForSuccess();
        AsyncLock asyncLock2 = new AsyncLock();
        this.vertx.close(asyncLock2.handler());
        asyncLock2.waitForSuccess();
    }

    @Test
    public void testSubscriptionAndReceptionUsingQueue() {
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult.result();
            Objects.requireNonNull(copyOnWriteArrayList);
            stompClientConnection.subscribe("/queue", (v1) -> {
                r2.add(v1);
            });
        }));
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(Helper.hasDestination(this.server.stompHandler().getDestinations(), "/queue"));
        });
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult2 -> {
            ((StompClientConnection) asyncResult2.result()).send("/queue", Buffer.buffer("Hello"));
        }));
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(!copyOnWriteArrayList.isEmpty());
        });
        Assertions.assertThat(copyOnWriteArrayList).hasSize(1);
        Assertions.assertThat(((Frame) copyOnWriteArrayList.get(0)).getBodyAsString()).isEqualTo("Hello");
        Assertions.assertThat(((Frame) copyOnWriteArrayList.get(0)).getCommand()).isEqualTo(Command.MESSAGE);
        Assertions.assertThat(((Frame) copyOnWriteArrayList.get(0)).getHeader("message-id")).isNotNull().isNotEmpty();
        Assertions.assertThat(((Frame) copyOnWriteArrayList.get(0)).getHeader("subscription")).isEqualTo("/queue");
        Assertions.assertThat(((Frame) copyOnWriteArrayList.get(0)).getHeader("destination")).isNotNull().isNotEmpty();
    }

    @Test
    public void testThatCustomHeadersArePropagatedWhenUsingQueue() {
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult.result();
            Objects.requireNonNull(copyOnWriteArrayList);
            stompClientConnection.subscribe("/queue", (v1) -> {
                r2.add(v1);
            });
        }));
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(Helper.hasDestination(this.server.stompHandler().getDestinations(), "/queue"));
        });
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult2 -> {
            ((StompClientConnection) asyncResult2.result()).send("/queue", Headers.create(new String[]{"foo", "bar", "toto", "titi"}), Buffer.buffer("Hello"));
        }));
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(!copyOnWriteArrayList.isEmpty());
        });
        Assertions.assertThat(copyOnWriteArrayList).hasSize(1);
        Assertions.assertThat(((Frame) copyOnWriteArrayList.get(0)).getBodyAsString()).isEqualTo("Hello");
        Assertions.assertThat(((Frame) copyOnWriteArrayList.get(0)).getCommand()).isEqualTo(Command.MESSAGE);
        Assertions.assertThat(((Frame) copyOnWriteArrayList.get(0)).getHeader("message-id")).isNotNull().isNotEmpty();
        Assertions.assertThat(((Frame) copyOnWriteArrayList.get(0)).getHeader("subscription")).isEqualTo("/queue");
        Assertions.assertThat(((Frame) copyOnWriteArrayList.get(0)).getHeader("destination")).isNotNull().isNotEmpty();
        Assertions.assertThat(((Frame) copyOnWriteArrayList.get(0)).getHeader("foo")).isEqualTo("bar");
        Assertions.assertThat(((Frame) copyOnWriteArrayList.get(0)).getHeader("toto")).isEqualTo("titi");
    }

    @Test
    public void testSubscriptionAndTwoReceptions() {
        CopyOnWriteArrayList copyOnWriteArrayList;
        CopyOnWriteArrayList copyOnWriteArrayList2;
        CopyOnWriteArrayList copyOnWriteArrayList3 = new CopyOnWriteArrayList();
        CopyOnWriteArrayList copyOnWriteArrayList4 = new CopyOnWriteArrayList();
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult.result();
            Objects.requireNonNull(copyOnWriteArrayList3);
            stompClientConnection.subscribe("/queue", (v1) -> {
                r2.add(v1);
            });
        }));
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult2 -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult2.result();
            Objects.requireNonNull(copyOnWriteArrayList4);
            stompClientConnection.subscribe("/queue", (v1) -> {
                r2.add(v1);
            });
        }));
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(this.server.stompHandler().getDestination("/queue").numberOfSubscriptions() == 2);
        });
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult3 -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult3.result();
            stompClientConnection.send("/queue", Buffer.buffer("Hello"));
            stompClientConnection.send("/queue", Buffer.buffer("vert.x"));
            stompClientConnection.send("/queue", Buffer.buffer("Hello"));
            stompClientConnection.send("/queue", Buffer.buffer("vert.x"));
        }));
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(copyOnWriteArrayList3.size() == 2 && copyOnWriteArrayList4.size() == 2);
        });
        if (((Frame) copyOnWriteArrayList3.get(0)).getBodyAsString().equalsIgnoreCase("Hello")) {
            copyOnWriteArrayList = copyOnWriteArrayList3;
            copyOnWriteArrayList2 = copyOnWriteArrayList4;
        } else {
            copyOnWriteArrayList = copyOnWriteArrayList4;
            copyOnWriteArrayList2 = copyOnWriteArrayList3;
        }
        Assertions.assertThat(((Frame) copyOnWriteArrayList.get(0)).getBodyAsString()).isEqualTo("Hello");
        Assertions.assertThat(((Frame) copyOnWriteArrayList.get(0)).getCommand()).isEqualTo(Command.MESSAGE);
        Assertions.assertThat(((Frame) copyOnWriteArrayList.get(0)).getHeader("message-id")).isNotNull().isNotEmpty();
        Assertions.assertThat(((Frame) copyOnWriteArrayList.get(0)).getHeader("subscription")).isEqualTo("/queue");
        Assertions.assertThat(((Frame) copyOnWriteArrayList.get(0)).getHeader("destination")).isNotNull().isNotEmpty();
        Assertions.assertThat(((Frame) copyOnWriteArrayList.get(1)).getBodyAsString()).isEqualTo("Hello");
        Assertions.assertThat(((Frame) copyOnWriteArrayList.get(1)).getCommand()).isEqualTo(Command.MESSAGE);
        Assertions.assertThat(((Frame) copyOnWriteArrayList.get(1)).getHeader("message-id")).isNotNull().isNotEmpty();
        Assertions.assertThat(((Frame) copyOnWriteArrayList.get(1)).getHeader("subscription")).isEqualTo("/queue");
        Assertions.assertThat(((Frame) copyOnWriteArrayList.get(1)).getHeader("destination")).isNotNull().isNotEmpty();
        Assertions.assertThat(((Frame) copyOnWriteArrayList2.get(0)).getBodyAsString()).isEqualTo("vert.x");
        Assertions.assertThat(((Frame) copyOnWriteArrayList2.get(0)).getCommand()).isEqualTo(Command.MESSAGE);
        Assertions.assertThat(((Frame) copyOnWriteArrayList2.get(0)).getHeader("message-id")).isNotNull().isNotEmpty();
        Assertions.assertThat(((Frame) copyOnWriteArrayList2.get(0)).getHeader("subscription")).isEqualTo("/queue");
        Assertions.assertThat(((Frame) copyOnWriteArrayList2.get(0)).getHeader("destination")).isNotNull().isNotEmpty();
        Assertions.assertThat(((Frame) copyOnWriteArrayList2.get(1)).getBodyAsString()).isEqualTo("vert.x");
        Assertions.assertThat(((Frame) copyOnWriteArrayList2.get(1)).getCommand()).isEqualTo(Command.MESSAGE);
        Assertions.assertThat(((Frame) copyOnWriteArrayList2.get(1)).getHeader("message-id")).isNotNull().isNotEmpty();
        Assertions.assertThat(((Frame) copyOnWriteArrayList2.get(1)).getHeader("subscription")).isEqualTo("/queue");
        Assertions.assertThat(((Frame) copyOnWriteArrayList2.get(1)).getHeader("destination")).isNotNull().isNotEmpty();
    }

    @Test
    public void testWhenNoSubscriptionsWhenUsingQueue() {
        this.server.options().setSendErrorOnNoSubscriptions(true);
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult.result();
            Objects.requireNonNull(copyOnWriteArrayList);
            stompClientConnection.subscribe("/queue2", (v1) -> {
                r2.add(v1);
            });
        }));
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(Helper.hasDestination(this.server.stompHandler().getDestinations(), "/queue2"));
        });
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult2 -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult2.result();
            stompClientConnection.send("/queue", Buffer.buffer("Hello"));
            Objects.requireNonNull(copyOnWriteArrayList);
            stompClientConnection.errorHandler((v1) -> {
                r1.add(v1);
            });
        }));
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(!copyOnWriteArrayList.isEmpty());
        });
        Assertions.assertThat(copyOnWriteArrayList).hasSize(1);
        Assertions.assertThat(((Frame) copyOnWriteArrayList.get(0)).getCommand()).isEqualTo(Command.ERROR);
        Assertions.assertThat(((Frame) copyOnWriteArrayList.get(0)).getHeader("destination")).isEqualTo("/queue");
        Assertions.assertThat(((Frame) copyOnWriteArrayList.get(0)).getBodyAsString()).contains(new CharSequence[]{"no subscriptions"});
    }

    @Test
    public void testMultipleSubscriptionsWithIdsOnQueues() {
        this.server.options().setSendErrorOnNoSubscriptions(true);
        HashMap hashMap = new HashMap();
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult.result();
            stompClientConnection.subscribe("/queue", Headers.create().add("id", "0"), frame -> {
                hashMap.put("/queue", frame);
            });
            stompClientConnection.subscribe("/queue2", Headers.create().add("id", "1"), frame2 -> {
                hashMap.put("/queue2", frame2);
            });
        }));
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(Helper.hasDestination(this.server.stompHandler().getDestinations(), "/queue2"));
        });
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(Helper.hasDestination(this.server.stompHandler().getDestinations(), "/queue"));
        });
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult2 -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult2.result();
            stompClientConnection.send("/queue", Buffer.buffer("Hello"));
            stompClientConnection.send("/queue2", Buffer.buffer("World"));
            stompClientConnection.errorHandler(frame -> {
                hashMap.put("error", frame);
            });
        }));
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(hashMap.size() >= 2);
        });
        Assertions.assertThat(hashMap).hasSize(2);
        Assertions.assertThat(hashMap).doesNotContainKeys(new String[]{"error"});
        Frame frame = (Frame) hashMap.get("/queue");
        Assertions.assertThat(frame.getCommand()).isEqualTo(Command.MESSAGE);
        Assertions.assertThat(frame.getHeader("destination")).isEqualTo("/queue");
        Assertions.assertThat(frame.getHeader("subscription")).isEqualTo("0");
        Assertions.assertThat(frame.getBodyAsString()).isEqualTo("Hello");
        Frame frame2 = (Frame) hashMap.get("/queue2");
        Assertions.assertThat(frame2.getCommand()).isEqualTo(Command.MESSAGE);
        Assertions.assertThat(frame2.getHeader("destination")).isEqualTo("/queue2");
        Assertions.assertThat(frame2.getHeader("subscription")).isEqualTo("1");
        Assertions.assertThat(frame2.getBodyAsString()).isEqualTo("World");
    }

    @Test
    public void testUnsubscriptionWithDefaultIdUsingQueue() {
        this.server.options().setSendErrorOnNoSubscriptions(true);
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult.result();
            stompClientConnection.subscribe("/queue", frame -> {
                copyOnWriteArrayList.add(frame);
                stompClientConnection.unsubscribe("/queue");
            });
        }));
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(Helper.hasDestination(this.server.stompHandler().getDestinations(), "/queue"));
        });
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult2 -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult2.result();
            Objects.requireNonNull(copyOnWriteArrayList);
            stompClientConnection.errorHandler((v1) -> {
                r1.add(v1);
            });
            stompClientConnection.send("/queue", Buffer.buffer("Hello"));
        }));
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(!Helper.hasDestination(this.server.stompHandler().getDestinations(), "/queue"));
        });
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult3 -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult3.result();
            Objects.requireNonNull(copyOnWriteArrayList);
            stompClientConnection.errorHandler((v1) -> {
                r1.add(v1);
            });
            stompClientConnection.send("/queue", Buffer.buffer("Hello"));
        }));
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(copyOnWriteArrayList.size() == 2 && ((Frame) copyOnWriteArrayList.get(1)).getCommand() == Command.ERROR);
        });
    }

    @Test
    public void testUnsubscriptionWithCustomId() {
        this.server.options().setSendErrorOnNoSubscriptions(true);
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult.result();
            stompClientConnection.subscribe("/queue", Headers.create(new String[]{"id", "0"}), frame -> {
                copyOnWriteArrayList.add(frame);
                stompClientConnection.unsubscribe("/queue", Headers.create(new String[]{"id", "0"}));
            });
        }));
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(Helper.hasDestination(this.server.stompHandler().getDestinations(), "/queue"));
        });
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult2 -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult2.result();
            stompClientConnection.send("/queue", Buffer.buffer("Hello"));
            Objects.requireNonNull(copyOnWriteArrayList);
            stompClientConnection.errorHandler((v1) -> {
                r1.add(v1);
            });
        }));
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(!Helper.hasDestination(this.server.stompHandler().getDestinations(), "/queue"));
        });
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult3 -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult3.result();
            stompClientConnection.send("/queue", Buffer.buffer("Hello"));
            Objects.requireNonNull(copyOnWriteArrayList);
            stompClientConnection.errorHandler((v1) -> {
                r1.add(v1);
            });
        }));
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(copyOnWriteArrayList.size() == 2 && ((Frame) copyOnWriteArrayList.get(1)).getCommand() == Command.ERROR);
        });
    }

    @Test
    public void testMultipleConnectionAndClosing() {
        for (int i = 0; i < 20; i++) {
            testClosingConnection();
        }
    }

    @Test
    public void testClosingConnection() {
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        StompClient connect = StompClient.create(this.vertx).connect(asyncResult -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult.result();
            Objects.requireNonNull(copyOnWriteArrayList);
            stompClientConnection.subscribe("/queue", (v1) -> {
                r2.add(v1);
            });
            Objects.requireNonNull(copyOnWriteArrayList);
            stompClientConnection.subscribe("/queue2", (v1) -> {
                r2.add(v1);
            });
        });
        this.clients.add(connect);
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(this.server.stompHandler().getDestinations().size() == 2);
        });
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult2 -> {
            ((StompClientConnection) asyncResult2.result()).send("/queue", Buffer.buffer("Hello"));
        }));
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult3 -> {
            ((StompClientConnection) asyncResult3.result()).send("/queue2", Buffer.buffer("World"));
        }));
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(copyOnWriteArrayList.size() == 2);
        });
        connect.close();
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(this.server.stompHandler().getDestinations().size() == 0);
        });
    }

    @Test
    public void testLeavingSubscriptions() {
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        CopyOnWriteArrayList copyOnWriteArrayList2 = new CopyOnWriteArrayList();
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult.result();
            Objects.requireNonNull(copyOnWriteArrayList);
            stompClientConnection.subscribe("/queue", (v1) -> {
                r2.add(v1);
            });
        }));
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult2 -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult2.result();
            stompClientConnection.subscribe("/queue", frame -> {
                copyOnWriteArrayList2.add(frame);
                if (copyOnWriteArrayList2.size() == 2) {
                    stompClientConnection.unsubscribe("/queue");
                }
            });
        }));
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(this.server.stompHandler().getDestination("/queue") != null && this.server.stompHandler().getDestination("/queue").numberOfSubscriptions() == 2);
        });
        AtomicReference atomicReference = new AtomicReference();
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult3 -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult3.result();
            atomicReference.set(stompClientConnection);
            stompClientConnection.send("/queue", Buffer.buffer("1"));
            stompClientConnection.send("/queue", Buffer.buffer("2"));
            stompClientConnection.send("/queue", Buffer.buffer("3"));
            stompClientConnection.send("/queue", Buffer.buffer("4"));
        }));
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(this.server.stompHandler().getDestination("/queue") != null && this.server.stompHandler().getDestination("/queue").numberOfSubscriptions() == 1);
        });
        this.vertx.runOnContext(r5 -> {
            ((StompClientConnection) atomicReference.get()).send("/queue", Buffer.buffer("5"));
            ((StompClientConnection) atomicReference.get()).send("/queue", Buffer.buffer("6"));
        });
        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(copyOnWriteArrayList.size() == 4 && copyOnWriteArrayList2.size() == 2);
        });
    }
}
