package io.vertx.ext.stomp.impl;

import com.jayway.awaitility.Awaitility;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.bridge.PermittedOptions;
import io.vertx.ext.stomp.BridgeOptions;
import io.vertx.ext.stomp.Frame;
import io.vertx.ext.stomp.StompClient;
import io.vertx.ext.stomp.StompClientConnection;
import io.vertx.ext.stomp.StompServer;
import io.vertx.ext.stomp.StompServerHandler;
import io.vertx.ext.stomp.utils.Headers;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:io/vertx/ext/stomp/impl/EventBusBridgeTest.class */
public class EventBusBridgeTest {
    private StompServer server;
    private Vertx vertx;
    private List<StompClient> clients = new ArrayList();
    private List<MessageConsumer> consumers = new ArrayList();

    @Before
    public void setUp() {
        AsyncLock asyncLock = new AsyncLock();
        this.vertx = Vertx.vertx();
        this.server = StompServer.create(this.vertx).handler(StompServerHandler.create(this.vertx).bridge(new BridgeOptions().addInboundPermitted(new PermittedOptions().setAddress("/bus")).addOutboundPermitted(new PermittedOptions().setAddress("/bus")))).listen(asyncLock.handler());
        asyncLock.waitForSuccess();
    }

    @After
    public void tearDown() {
        this.clients.forEach((v0) -> {
            v0.close();
        });
        this.clients.clear();
        this.consumers.forEach((v0) -> {
            v0.unregister();
        });
        this.consumers.clear();
        AsyncLock asyncLock = new AsyncLock();
        this.server.close(asyncLock.handler());
        asyncLock.waitForSuccess();
        AsyncLock asyncLock2 = new AsyncLock();
        this.vertx.close(asyncLock2.handler());
        asyncLock2.waitForSuccess();
    }

    @Test
    public void testThatStompMessagesAreTransferredToTheEventBus() {
        AtomicReference atomicReference = new AtomicReference();
        List<MessageConsumer> list = this.consumers;
        EventBus eventBus = this.vertx.eventBus();
        Objects.requireNonNull(atomicReference);
        list.add(eventBus.consumer("/bus", (v1) -> {
            r3.set(v1);
        }));
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult -> {
            ((StompClientConnection) asyncResult.result()).send("/bus", Headers.create(new String[]{"foo", "bar"}), Buffer.buffer("Hello from STOMP"));
        }));
        Awaitility.await().atMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(atomicReference.get() != null);
        });
        Assertions.assertThat(((Message) atomicReference.get()).headers().get("foo")).isEqualTo("bar");
        Assertions.assertThat(((Message) atomicReference.get()).headers().get("destination")).isEqualTo("/bus");
        Assertions.assertThat(((Message) atomicReference.get()).headers().get("content-length")).isEqualTo("16");
        Assertions.assertThat(((Message) atomicReference.get()).address()).isEqualTo("/bus");
        Assertions.assertThat(((Message) atomicReference.get()).replyAddress()).isNullOrEmpty();
        Assertions.assertThat(((Message) atomicReference.get()).body().toString()).isEqualTo("Hello from STOMP");
    }

    @Test
    public void testThatEventBusMessagesAreTransferredToStomp() {
        AtomicReference atomicReference = new AtomicReference();
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult.result();
            Objects.requireNonNull(atomicReference);
            stompClientConnection.subscribe("/bus", (v1) -> {
                r2.set(v1);
            }, asyncResult -> {
                this.vertx.eventBus().publish("/bus", "Hello from Vert.x", new DeliveryOptions().addHeader("foo", "bar"));
            });
        }));
        Awaitility.await().atMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(atomicReference.get() != null);
        });
        Assertions.assertThat((String) ((Frame) atomicReference.get()).getHeaders().get("foo")).isEqualTo("bar");
        Assertions.assertThat((String) ((Frame) atomicReference.get()).getHeaders().get("destination")).isEqualTo("/bus");
        Assertions.assertThat((String) ((Frame) atomicReference.get()).getHeaders().get("content-length")).isEqualTo("17");
        Assertions.assertThat(((Frame) atomicReference.get()).getBodyAsString()).isEqualTo("Hello from Vert.x");
    }

    @Test
    public void testBidirectionalPingPong() {
        this.server.stompHandler().bridge(new BridgeOptions().addInboundPermitted(new PermittedOptions().setAddressRegex("/toBu.")).addOutboundPermitted(new PermittedOptions().setAddressRegex("/to.tomp")));
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        this.consumers.add(this.vertx.eventBus().consumer("/toBus", message -> {
            arrayList2.add(message);
            if (arrayList2.size() < 5) {
                this.vertx.eventBus().send("/toStomp", "pong");
            }
        }));
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult.result();
            stompClientConnection.subscribe("/toStomp", frame -> {
                arrayList.add(frame);
                if (arrayList.size() < 4) {
                    stompClientConnection.send("/toBus", Buffer.buffer("ping"));
                }
            }, asyncResult -> {
                stompClientConnection.send("/toBus", Buffer.buffer("ping"));
            });
        }));
        Awaitility.await().atMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(arrayList2.size() == 4 && arrayList.size() == 4);
        });
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            Assertions.assertThat(((Frame) it.next()).getBodyAsString()).isEqualTo("pong");
        }
        Iterator it2 = arrayList2.iterator();
        while (it2.hasNext()) {
            Assertions.assertThat(((Message) it2.next()).body().toString()).isEqualTo("ping");
        }
    }

    @Test
    public void testThatEventBusMessagesContainingJsonObjectAreTransferredToStomp() {
        AtomicReference atomicReference = new AtomicReference();
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult.result();
            Objects.requireNonNull(atomicReference);
            stompClientConnection.subscribe("/bus", (v1) -> {
                r2.set(v1);
            }, asyncResult -> {
                this.vertx.eventBus().publish("/bus", new JsonObject().put("name", "vert.x").put("count", 1).put("bool", true), new DeliveryOptions().addHeader("foo", "bar"));
            });
        }));
        Awaitility.await().atMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(atomicReference.get() != null);
        });
        Assertions.assertThat((String) ((Frame) atomicReference.get()).getHeaders().get("foo")).isEqualTo("bar");
        Assertions.assertThat((String) ((Frame) atomicReference.get()).getHeaders().get("destination")).isEqualTo("/bus");
        JsonObject jsonObject = new JsonObject(((Frame) atomicReference.get()).getBodyAsString());
        Assertions.assertThat(jsonObject.getString("name")).isEqualTo("vert.x");
        Assertions.assertThat(jsonObject.getInteger("count")).isEqualTo(1);
        Assertions.assertThat(jsonObject.getBoolean("bool")).isTrue();
    }

    @Test
    public void testThatEventBusMessagesContainingBufferAreTransferredToStomp() {
        AtomicReference atomicReference = new AtomicReference();
        byte[] bArr = {0, 1, 2, 3, 4, 5, 6};
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult.result();
            Objects.requireNonNull(atomicReference);
            stompClientConnection.subscribe("/bus", (v1) -> {
                r2.set(v1);
            }, asyncResult -> {
                this.vertx.eventBus().publish("/bus", Buffer.buffer(bArr), new DeliveryOptions().addHeader("foo", "bar"));
            });
        }));
        Awaitility.await().atMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(atomicReference.get() != null);
        });
        Assertions.assertThat((String) ((Frame) atomicReference.get()).getHeaders().get("foo")).isEqualTo("bar");
        Assertions.assertThat((String) ((Frame) atomicReference.get()).getHeaders().get("destination")).isEqualTo("/bus");
        Assertions.assertThat(((Frame) atomicReference.get()).getBody().getBytes()).containsExactly(bArr);
    }

    @Test
    public void testThatEventBusMessagesContainingNoBodyAreTransferredToStomp() {
        AtomicReference atomicReference = new AtomicReference();
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult.result();
            Objects.requireNonNull(atomicReference);
            stompClientConnection.subscribe("/bus", (v1) -> {
                r2.set(v1);
            }, asyncResult -> {
                this.vertx.eventBus().publish("/bus", (Object) null, new DeliveryOptions().addHeader("foo", "bar"));
            });
        }));
        Awaitility.await().atMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(atomicReference.get() != null);
        });
        Assertions.assertThat((String) ((Frame) atomicReference.get()).getHeaders().get("foo")).isEqualTo("bar");
        Assertions.assertThat((String) ((Frame) atomicReference.get()).getHeaders().get("destination")).isEqualTo("/bus");
        Assertions.assertThat(((Frame) atomicReference.get()).getBody().getBytes()).hasSize(0);
    }

    @Test
    public void testThatTwoEventBusConsumersReceiveAStompMessage() {
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        List<MessageConsumer> list = this.consumers;
        EventBus eventBus = this.vertx.eventBus();
        Objects.requireNonNull(copyOnWriteArrayList);
        list.add(eventBus.consumer("/bus", (v1) -> {
            r3.add(v1);
        }));
        List<MessageConsumer> list2 = this.consumers;
        EventBus eventBus2 = this.vertx.eventBus();
        Objects.requireNonNull(copyOnWriteArrayList);
        list2.add(eventBus2.consumer("/bus", (v1) -> {
            r3.add(v1);
        }));
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult -> {
            ((StompClientConnection) asyncResult.result()).send("/bus", Headers.create(new String[]{"foo", "bar"}), Buffer.buffer("Hello from STOMP"));
        }));
        Awaitility.await().atMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(copyOnWriteArrayList.size() == 2);
        });
    }

    @Test
    public void testThatOnlyOnEventBusConsumersReceiveAStompMessageInP2P() throws InterruptedException {
        this.server.stompHandler().bridge(new BridgeOptions().addInboundPermitted(new PermittedOptions().setAddress("/toBus")).setPointToPoint(true));
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        List<MessageConsumer> list = this.consumers;
        EventBus eventBus = this.vertx.eventBus();
        Objects.requireNonNull(copyOnWriteArrayList);
        list.add(eventBus.consumer("/toBus", (v1) -> {
            r3.add(v1);
        }));
        List<MessageConsumer> list2 = this.consumers;
        EventBus eventBus2 = this.vertx.eventBus();
        Objects.requireNonNull(copyOnWriteArrayList);
        list2.add(eventBus2.consumer("/toBus", (v1) -> {
            r3.add(v1);
        }));
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult -> {
            ((StompClientConnection) asyncResult.result()).send("/toBus", Headers.create(new String[]{"foo", "bar"}), Buffer.buffer("Hello from STOMP"));
        }));
        Thread.sleep(500L);
        Assertions.assertThat(copyOnWriteArrayList).hasSize(1);
    }

    @Test
    public void testThatEventBusMessagesAreTransferredToSeveralStompClients() {
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult.result();
            Objects.requireNonNull(copyOnWriteArrayList);
            stompClientConnection.subscribe("/bus", (v1) -> {
                r2.add(v1);
            }, asyncResult -> {
                this.clients.add(StompClient.create(this.vertx).connect(asyncResult -> {
                    StompClientConnection stompClientConnection2 = (StompClientConnection) asyncResult.result();
                    Objects.requireNonNull(copyOnWriteArrayList);
                    stompClientConnection2.subscribe("/bus", (v1) -> {
                        r2.add(v1);
                    }, asyncResult -> {
                        this.vertx.eventBus().publish("/bus", "Hello from Vert.x", new DeliveryOptions().addHeader("foo", "bar"));
                    });
                }));
            });
        }));
        Awaitility.await().atMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(copyOnWriteArrayList.size() == 2);
        });
    }

    @Test
    public void testThatEventBusMessagesAreOnlyTransferredToOneStompClientsInP2P() throws InterruptedException {
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        this.server.stompHandler().bridge(new BridgeOptions().addOutboundPermitted(new PermittedOptions().setAddress("/toStomp")).setPointToPoint(true));
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult.result();
            Objects.requireNonNull(copyOnWriteArrayList);
            stompClientConnection.subscribe("/toStomp", (v1) -> {
                r2.add(v1);
            }, asyncResult -> {
                this.clients.add(StompClient.create(this.vertx).connect(asyncResult -> {
                    StompClientConnection stompClientConnection2 = (StompClientConnection) asyncResult.result();
                    Objects.requireNonNull(copyOnWriteArrayList);
                    stompClientConnection2.subscribe("/toStomp", (v1) -> {
                        r2.add(v1);
                    }, asyncResult -> {
                        this.vertx.eventBus().publish("/toStomp", "Hello from Vert.x", new DeliveryOptions().addHeader("foo", "bar"));
                    });
                }));
            });
        }));
        Thread.sleep(500L);
        Assertions.assertThat(copyOnWriteArrayList).hasSize(1);
    }

    @Test
    public void testThatEventBusConsumerCanReplyToStompMessages() {
        this.server.stompHandler().bridge(new BridgeOptions().addOutboundPermitted(new PermittedOptions().setAddress("/replyTo")).addInboundPermitted(new PermittedOptions().setAddress("/request")).setPointToPoint(true));
        AtomicReference atomicReference = new AtomicReference();
        this.consumers.add(this.vertx.eventBus().consumer("/request", message -> {
            message.reply("pong");
        }));
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult.result();
            Objects.requireNonNull(atomicReference);
            stompClientConnection.subscribe("/replyTo", (v1) -> {
                r2.set(v1);
            }, asyncResult -> {
                stompClientConnection.send("/request", Headers.create(new String[]{"reply-address", "/replyTo"}), Buffer.buffer("ping"));
            });
        }));
        Awaitility.await().atMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(atomicReference.get() != null);
        });
    }

    @Test
    public void testThatStompClientCanUnsubscribe() throws InterruptedException {
        ArrayList arrayList = new ArrayList();
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult.result();
            stompClientConnection.subscribe("/bus", frame -> {
                arrayList.add(frame);
                stompClientConnection.unsubscribe("/bus");
            }, asyncResult -> {
                this.vertx.eventBus().publish("/bus", "Hello from Vert.x", new DeliveryOptions().addHeader("foo", "bar"));
            });
        }));
        Awaitility.await().atMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(arrayList.size() == 1);
        });
        this.vertx.eventBus().publish("/bus", "Hello from Vert.x", new DeliveryOptions().addHeader("foo", "bar"));
        Thread.sleep(500L);
        Assertions.assertThat(arrayList).hasSize(1);
    }

    @Test
    public void testThatStompClientCanCloseTheConnection() throws InterruptedException {
        ArrayList arrayList = new ArrayList();
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult.result();
            stompClientConnection.subscribe("/bus", frame -> {
                arrayList.add(frame);
                stompClientConnection.close();
            }, asyncResult -> {
                this.vertx.eventBus().publish("/bus", "Hello from Vert.x", new DeliveryOptions().addHeader("foo", "bar"));
            });
        }));
        Awaitility.await().atMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(arrayList.size() == 1);
        });
        this.vertx.eventBus().publish("/bus", "Hello from Vert.x", new DeliveryOptions().addHeader("foo", "bar"));
        Thread.sleep(500L);
        Assertions.assertThat(arrayList).hasSize(1);
    }

    @Test
    public void testThatStompClientCanDisconnect() throws InterruptedException {
        ArrayList arrayList = new ArrayList();
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult.result();
            stompClientConnection.subscribe("/bus", frame -> {
                arrayList.add(frame);
                stompClientConnection.disconnect();
            }, asyncResult -> {
                this.vertx.eventBus().publish("/bus", "Hello from Vert.x", new DeliveryOptions().addHeader("foo", "bar"));
            });
        }));
        Awaitility.await().atMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(arrayList.size() == 1);
        });
        this.vertx.eventBus().publish("/bus", "Hello from Vert.x", new DeliveryOptions().addHeader("foo", "bar"));
        Thread.sleep(500L);
        Assertions.assertThat(arrayList).hasSize(1);
    }

    @Test
    public void testThatStompFrameMatchingTheStructureAreTransferred() {
        tearDown();
        AsyncLock asyncLock = new AsyncLock();
        this.vertx = Vertx.vertx();
        this.server = StompServer.create(this.vertx).handler(StompServerHandler.create(this.vertx).bridge(new BridgeOptions().addInboundPermitted(new PermittedOptions().setAddress("/bus").setMatch(new JsonObject().put("id", 1))).addOutboundPermitted(new PermittedOptions().setAddress("/bus")))).listen(asyncLock.handler());
        asyncLock.waitForSuccess();
        AtomicReference atomicReference = new AtomicReference();
        List<MessageConsumer> list = this.consumers;
        EventBus eventBus = this.vertx.eventBus();
        Objects.requireNonNull(atomicReference);
        list.add(eventBus.consumer("/bus", (v1) -> {
            r3.set(v1);
        }));
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult -> {
            ((StompClientConnection) asyncResult.result()).send("/bus", Headers.create(new String[]{"foo", "bar"}), Buffer.buffer(new JsonObject().put("id", 1).put("msg", "Hello from STOMP").toString()));
        }));
        Awaitility.await().atMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(atomicReference.get() != null);
        });
        Assertions.assertThat(((Message) atomicReference.get()).headers().get("foo")).isEqualTo("bar");
        Assertions.assertThat(((Message) atomicReference.get()).headers().get("destination")).isEqualTo("/bus");
        Assertions.assertThat(((Message) atomicReference.get()).address()).isEqualTo("/bus");
        Assertions.assertThat(((Message) atomicReference.get()).replyAddress()).isNullOrEmpty();
        Assertions.assertThat(new JsonObject(((Message) atomicReference.get()).body().toString()).getString("msg")).isEqualTo("Hello from STOMP");
    }

    @Test
    public void testThatStompFrameNotMatchingTheStructureAreRejected() throws InterruptedException {
        tearDown();
        AsyncLock asyncLock = new AsyncLock();
        this.vertx = Vertx.vertx();
        this.server = StompServer.create(this.vertx).handler(StompServerHandler.create(this.vertx).bridge(new BridgeOptions().addInboundPermitted(new PermittedOptions().setAddress("/bus").setMatch(new JsonObject().put("id", 2))).addOutboundPermitted(new PermittedOptions().setAddress("/bus")))).listen(asyncLock.handler());
        asyncLock.waitForSuccess();
        AtomicReference atomicReference = new AtomicReference();
        List<MessageConsumer> list = this.consumers;
        EventBus eventBus = this.vertx.eventBus();
        Objects.requireNonNull(atomicReference);
        list.add(eventBus.consumer("/bus", (v1) -> {
            r3.set(v1);
        }));
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult -> {
            ((StompClientConnection) asyncResult.result()).send("/bus", Headers.create(new String[]{"foo", "bar"}), Buffer.buffer(new JsonObject().put("msg", "Hello from STOMP").toString()));
        }));
        Thread.sleep(2000L);
        Assertions.assertThat((Message) atomicReference.get()).isNull();
    }

    @Test
    public void testThatEventBusMessagesMatchingTheStructureAreTransferredToStomp() {
        tearDown();
        AsyncLock asyncLock = new AsyncLock();
        this.vertx = Vertx.vertx();
        this.server = StompServer.create(this.vertx).handler(StompServerHandler.create(this.vertx).bridge(new BridgeOptions().addInboundPermitted(new PermittedOptions().setAddress("/bus")).addOutboundPermitted(new PermittedOptions().setAddress("/bus").setMatch(new JsonObject().put("id", 2))))).listen(asyncLock.handler());
        asyncLock.waitForSuccess();
        AtomicReference atomicReference = new AtomicReference();
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult.result();
            Objects.requireNonNull(atomicReference);
            stompClientConnection.subscribe("/bus", (v1) -> {
                r2.set(v1);
            }, asyncResult -> {
                this.vertx.eventBus().publish("/bus", new JsonObject().put("id", 2).put("message", "Hello from Vert.x"), new DeliveryOptions().addHeader("foo", "bar"));
            });
        }));
        Awaitility.await().atMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(atomicReference.get() != null);
        });
        Assertions.assertThat((String) ((Frame) atomicReference.get()).getHeaders().get("foo")).isEqualTo("bar");
        Assertions.assertThat((String) ((Frame) atomicReference.get()).getHeaders().get("destination")).isEqualTo("/bus");
        Assertions.assertThat(new JsonObject(((Frame) atomicReference.get()).getBodyAsString()).getString("message")).isEqualTo("Hello from Vert.x");
    }

    @Test
    public void testThatEventBusMessagesNotMatchingTheStructureAreRejected() throws InterruptedException {
        tearDown();
        AsyncLock asyncLock = new AsyncLock();
        this.vertx = Vertx.vertx();
        this.server = StompServer.create(this.vertx).handler(StompServerHandler.create(this.vertx).bridge(new BridgeOptions().addInboundPermitted(new PermittedOptions().setAddress("/bus")).addOutboundPermitted(new PermittedOptions().setAddress("/bus").setMatch(new JsonObject().put("id", 2))))).listen(asyncLock.handler());
        asyncLock.waitForSuccess();
        AtomicReference atomicReference = new AtomicReference();
        this.clients.add(StompClient.create(this.vertx).connect(asyncResult -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult.result();
            Objects.requireNonNull(atomicReference);
            stompClientConnection.subscribe("/bus", (v1) -> {
                r2.set(v1);
            }, asyncResult -> {
                this.vertx.eventBus().publish("/bus", new JsonObject().put("id", 1).put("message", "Hello from Vert.x"), new DeliveryOptions().addHeader("foo", "bar"));
            });
        }));
        Thread.sleep(2000L);
        Assertions.assertThat((Frame) atomicReference.get()).isNull();
    }
}
