package io.vertx.stomp.tests.impl;

import com.jayway.awaitility.Awaitility;
import com.jayway.awaitility.core.ConditionFactory;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.stomp.Command;
import io.vertx.ext.stomp.Frame;
import io.vertx.ext.stomp.Frames;
import io.vertx.ext.stomp.ServerFrame;
import io.vertx.ext.stomp.StompClient;
import io.vertx.ext.stomp.StompClientConnection;
import io.vertx.ext.stomp.StompClientOptions;
import io.vertx.ext.stomp.StompServer;
import io.vertx.ext.stomp.StompServerHandler;
import io.vertx.ext.stomp.StompServerOptions;
import io.vertx.ext.stomp.utils.Headers;
import io.vertx.ext.unit.junit.Repeat;
import io.vertx.ext.unit.junit.RepeatRule;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.assertj.core.api.Assertions;
import org.hamcrest.Matchers;
import org.hamcrest.core.Is;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

/* loaded from: input_file:io/vertx/stomp/tests/impl/StompClientImplTest.class */
public class StompClientImplTest {
    private Vertx vertx;
    private StompServer server;
    private StompServerOptions options;

    @Rule
    public RepeatRule rule = new RepeatRule();

    @Before
    public void setUp() {
        this.vertx = Vertx.vertx();
        this.options = new StompServerOptions();
        this.server = StompServer.create(this.vertx, this.options).handler(StompServerHandler.create(this.vertx));
    }

    private void startServer() {
        AsyncLock asyncLock = new AsyncLock();
        this.server.listen().onComplete(asyncLock.handler());
        asyncLock.waitForSuccess();
    }

    @After
    public void tearDown() {
        AsyncLock asyncLock = new AsyncLock();
        this.server.close().onComplete(asyncLock.handler());
        asyncLock.waitForSuccess();
        AsyncLock asyncLock2 = new AsyncLock();
        this.vertx.close().onComplete(asyncLock2.handler());
        asyncLock2.waitForSuccess();
    }

    @Test
    @Repeat(10)
    public void testRejectedConnection() throws InterruptedException {
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        this.vertx.createNetServer().connectHandler((v0) -> {
            v0.close();
        }).listen(61614).onComplete(asyncResult -> {
            atomicBoolean.set(true);
        });
        Awaitility.await().untilAtomic(atomicBoolean, Is.is(true));
        CountDownLatch countDownLatch = new CountDownLatch(1);
        AtomicBoolean atomicBoolean2 = new AtomicBoolean();
        AtomicReference atomicReference = new AtomicReference();
        StompClientOptions port = new StompClientOptions().setPort(61614);
        port.setConnectTimeout(1000);
        StompClient.create(this.vertx, port).connect().onComplete(asyncResult2 -> {
            if (asyncResult2.failed()) {
                atomicBoolean2.set(true);
                atomicReference.set(null);
            } else {
                atomicReference.set((StompClientConnection) asyncResult2.result());
            }
            countDownLatch.countDown();
        });
        countDownLatch.await(10L, TimeUnit.SECONDS);
        Assert.assertNull(atomicReference.get());
        Assert.assertTrue(atomicBoolean2.get());
    }

    @Test
    @Repeat(10)
    public void testRejectedConnectionWithExceptionHandler() throws InterruptedException {
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        this.vertx.createNetServer().connectHandler((v0) -> {
            v0.close();
        }).listen(61614).onComplete(asyncResult -> {
            atomicBoolean.set(true);
        });
        Awaitility.await().untilAtomic(atomicBoolean, Is.is(true));
        CountDownLatch countDownLatch = new CountDownLatch(1);
        AtomicReference atomicReference = new AtomicReference();
        AtomicReference atomicReference2 = new AtomicReference();
        StompClientOptions port = new StompClientOptions().setPort(61614);
        port.setConnectTimeout(1000);
        StompClient create = StompClient.create(this.vertx, port);
        Objects.requireNonNull(atomicReference2);
        StompClient exceptionHandler = create.exceptionHandler((v1) -> {
            r1.set(v1);
        });
        AtomicBoolean atomicBoolean2 = new AtomicBoolean();
        exceptionHandler.connect().onComplete(asyncResult2 -> {
            if (asyncResult2.failed()) {
                atomicBoolean2.set(true);
                atomicReference.set(null);
            } else {
                atomicReference.set((StompClientConnection) asyncResult2.result());
            }
            countDownLatch.countDown();
        });
        countDownLatch.await(1L, TimeUnit.MINUTES);
        Assert.assertNull(atomicReference.get());
        Assert.assertTrue(atomicBoolean2.get());
        Assertions.assertThat((Throwable) atomicReference2.get()).isNull();
    }

    @Test
    public void testConnection() throws InterruptedException {
        startServer();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        AtomicReference atomicReference = new AtomicReference();
        StompClient.create(this.vertx).connect().onComplete(asyncResult -> {
            if (asyncResult.failed()) {
                atomicReference.set(null);
            } else {
                atomicReference.set((StompClientConnection) asyncResult.result());
            }
            countDownLatch.countDown();
        });
        countDownLatch.await(1L, TimeUnit.MINUTES);
        Assert.assertNotNull(atomicReference.get());
        Assert.assertNotNull(((StompClientConnection) atomicReference.get()).session());
        Assert.assertNotNull(((StompClientConnection) atomicReference.get()).server());
        Assert.assertNotNull(((StompClientConnection) atomicReference.get()).version());
    }

    @Test
    public void testPingFrameGetAckedImmediately() throws Exception {
        startServer();
        CompletableFuture completableFuture = new CompletableFuture();
        Future onSuccess = StompClient.create(this.vertx).connect().onSuccess(stompClientConnection -> {
            Future onSuccess2 = stompClientConnection.send(Frames.PING).onSuccess(frame -> {
                completableFuture.complete(null);
            });
            Objects.requireNonNull(completableFuture);
            onSuccess2.onFailure(completableFuture::completeExceptionally);
        });
        Objects.requireNonNull(completableFuture);
        onSuccess.onFailure(completableFuture::completeExceptionally);
        completableFuture.get(1L, TimeUnit.MINUTES);
    }

    @Test
    public void testConnectionWithTrailingLine() throws InterruptedException {
        this.options.setTrailingLine(true);
        startServer();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        AtomicReference atomicReference = new AtomicReference();
        StompClient.create(this.vertx, new StompClientOptions().setTrailingLine(true)).connect().onComplete(asyncResult -> {
            if (asyncResult.failed()) {
                atomicReference.set(null);
            } else {
                atomicReference.set((StompClientConnection) asyncResult.result());
            }
            countDownLatch.countDown();
        });
        countDownLatch.await(1L, TimeUnit.MINUTES);
        Assert.assertNotNull(atomicReference.get());
        Assert.assertNotNull(((StompClientConnection) atomicReference.get()).session());
        Assert.assertNotNull(((StompClientConnection) atomicReference.get()).server());
        Assert.assertNotNull(((StompClientConnection) atomicReference.get()).version());
    }

    @Test
    public void testConnectionWithStompFrame() throws InterruptedException {
        startServer();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        AtomicReference atomicReference = new AtomicReference();
        StompClient.create(this.vertx, new StompClientOptions().setUseStompFrame(true)).connect().onComplete(asyncResult -> {
            if (asyncResult.failed()) {
                atomicReference.set(null);
            } else {
                atomicReference.set((StompClientConnection) asyncResult.result());
            }
            countDownLatch.countDown();
        });
        countDownLatch.await(1L, TimeUnit.MINUTES);
        Assert.assertNotNull(atomicReference.get());
        Assert.assertNotNull(((StompClientConnection) atomicReference.get()).session());
        Assert.assertNotNull(((StompClientConnection) atomicReference.get()).server());
        Assert.assertNotNull(((StompClientConnection) atomicReference.get()).version());
    }

    @Test
    public void testConnectionWithStompFrameWithTrailingLine() throws InterruptedException {
        this.options.setTrailingLine(true);
        startServer();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        AtomicReference atomicReference = new AtomicReference();
        StompClient.create(this.vertx, new StompClientOptions().setUseStompFrame(true).setTrailingLine(true)).connect().onComplete(asyncResult -> {
            if (asyncResult.failed()) {
                atomicReference.set(null);
            } else {
                atomicReference.set((StompClientConnection) asyncResult.result());
            }
            countDownLatch.countDown();
        });
        countDownLatch.await(1L, TimeUnit.MINUTES);
        Assert.assertNotNull(atomicReference.get());
        Assert.assertNotNull(((StompClientConnection) atomicReference.get()).session());
        Assert.assertNotNull(((StompClientConnection) atomicReference.get()).server());
        Assert.assertNotNull(((StompClientConnection) atomicReference.get()).version());
    }

    @Test
    public void testSendingMessages() {
        startServer();
        AtomicReference atomicReference = new AtomicReference();
        StompClient.create(this.vertx).connect().onComplete(asyncResult -> {
            if (asyncResult.failed()) {
                return;
            }
            Future send = ((StompClientConnection) asyncResult.result()).send("/hello", Buffer.buffer("this is my content"));
            Objects.requireNonNull(atomicReference);
            send.onComplete((v1) -> {
                r1.set(v1);
            });
        });
        Awaitility.await().atMost(5L, TimeUnit.SECONDS).untilAtomic(atomicReference, Matchers.notNullValue(AsyncResult.class));
        Assertions.assertThat(((AsyncResult) atomicReference.get()).succeeded()).isTrue();
        Assertions.assertThat(((Frame) ((AsyncResult) atomicReference.get()).result()).getDestination()).isEqualTo("/hello");
    }

    @Test
    public void testSendingMessagesWithTrailingLine() {
        this.options.setTrailingLine(true);
        startServer();
        AtomicReference atomicReference = new AtomicReference();
        StompClient.create(this.vertx, new StompClientOptions().setTrailingLine(true)).connect().onComplete(asyncResult -> {
            if (asyncResult.failed()) {
                return;
            }
            Future send = ((StompClientConnection) asyncResult.result()).send("/hello", Buffer.buffer("this is my content"));
            Objects.requireNonNull(atomicReference);
            send.onComplete((v1) -> {
                r1.set(v1);
            });
        });
        Awaitility.await().atMost(5L, TimeUnit.SECONDS).untilAtomic(atomicReference, Matchers.notNullValue(AsyncResult.class));
        Assertions.assertThat(((AsyncResult) atomicReference.get()).succeeded()).isTrue();
        Assertions.assertThat(((Frame) ((AsyncResult) atomicReference.get()).result()).getDestination()).isEqualTo("/hello");
    }

    @Test
    public void testConnectionAndDisconnect() throws InterruptedException {
        startServer();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        AtomicReference atomicReference = new AtomicReference();
        StompClient.create(this.vertx, new StompClientOptions().setUseStompFrame(true)).connect().onComplete(asyncResult -> {
            if (!asyncResult.failed()) {
                ((StompClientConnection) asyncResult.result()).disconnect().onComplete(asyncResult -> {
                    atomicReference.set(asyncResult);
                    countDownLatch.countDown();
                });
            } else {
                atomicReference.set(null);
                countDownLatch.countDown();
            }
        });
        countDownLatch.await(1L, TimeUnit.MINUTES);
        Assert.assertNotNull(atomicReference.get());
    }

    @Test
    public void testConnectionAndDisconnectWithCustomFrame() throws InterruptedException {
        startServer();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        AtomicReference atomicReference = new AtomicReference();
        StompClient.create(this.vertx, new StompClientOptions().setUseStompFrame(true)).connect().onComplete(asyncResult -> {
            if (!asyncResult.failed()) {
                ((StompClientConnection) asyncResult.result()).disconnect(new Frame(Command.DISCONNECT, Headers.create(new String[]{"message", "bye bye"}), (Buffer) null)).onComplete(asyncResult -> {
                    atomicReference.set(asyncResult);
                    countDownLatch.countDown();
                });
            } else {
                atomicReference.set(null);
                countDownLatch.countDown();
            }
        });
        countDownLatch.await(1L, TimeUnit.MINUTES);
        Assert.assertNotNull(atomicReference.get());
        Assert.assertNotNull(((AsyncResult) atomicReference.get()).result());
        Assertions.assertThat(((Frame) ((AsyncResult) atomicReference.get()).result()).getHeader("message")).contains(new CharSequence[]{"bye bye"});
    }

    @Test
    public void testClientHeartbeatWhenNoServerActivity() {
        AtomicReference atomicReference = new AtomicReference();
        this.server = StompServer.create(this.vertx, new StompServerOptions().setHeartbeat(new JsonObject().put("x", 100).put("y", 100))).handler(StompServerHandler.create(this.vertx).pingHandler(stompServerConnection -> {
        }));
        startServer();
        StompClient.create(this.vertx, new StompClientOptions().setHeartbeat(new JsonObject().put("x", 100).put("y", 100))).connect().onComplete(asyncResult -> {
            atomicReference.set((StompClientConnection) asyncResult.result());
        });
        Awaitility.await().atMost(1000L, TimeUnit.MILLISECONDS).until(() -> {
            return Boolean.valueOf(((StompClientConnection) atomicReference.get()).session() == null);
        });
    }

    @Test
    public void testClientHeartbeatWithServerActivity() throws InterruptedException {
        AtomicReference atomicReference = new AtomicReference();
        this.server = StompServer.create(this.vertx, new StompServerOptions().setHeartbeat(new JsonObject().put("x", 100).put("y", 100))).handler(StompServerHandler.create(this.vertx));
        startServer();
        StompClient.create(this.vertx, new StompClientOptions().setHeartbeat(new JsonObject().put("x", 100).put("y", 100))).connect().onComplete(asyncResult -> {
            atomicReference.set((StompClientConnection) asyncResult.result());
        });
        Thread.sleep(1000L);
        Assertions.assertThat(((StompClientConnection) atomicReference.get()).server()).isNotNull();
    }

    @Test
    public void testServerHeartbeatWhenNoClientActivity() {
        AtomicReference atomicReference = new AtomicReference();
        this.server = StompServer.create(this.vertx, new StompServerOptions().setHeartbeat(new JsonObject().put("x", 100).put("y", 100))).handler(StompServerHandler.create(this.vertx));
        startServer();
        StompClient.create(this.vertx, new StompClientOptions().setHeartbeat(new JsonObject().put("x", 100).put("y", 100))).connect().onComplete(asyncResult -> {
            atomicReference.set((StompClientConnection) asyncResult.result());
            ((StompClientConnection) asyncResult.result()).pingHandler(stompClientConnection -> {
            });
        });
        Awaitility.await().atMost(1L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(((StompClientConnection) atomicReference.get()).session() == null);
        });
    }

    @Test
    public void testAsymmetricHeartbeatTime() throws InterruptedException {
        AtomicReference atomicReference = new AtomicReference();
        ArrayList arrayList = new ArrayList();
        this.server = StompServer.create(this.vertx, new StompServerOptions().setHeartbeat(new JsonObject().put("x", 300).put("y", 200))).handler(StompServerHandler.create(this.vertx).receivedFrameHandler(serverFrame -> {
            if (Command.PING.equals(serverFrame.frame().getCommand())) {
                arrayList.add(Long.valueOf(System.currentTimeMillis()));
            }
        }));
        startServer();
        ArrayList arrayList2 = new ArrayList();
        StompClient.create(this.vertx, new StompClientOptions().setHeartbeat(new JsonObject().put("x", 100).put("y", 400))).receivedFrameHandler(frame -> {
            if (Command.PING.equals(frame.getCommand())) {
                arrayList2.add(Long.valueOf(System.currentTimeMillis()));
            }
        }).connect().onComplete(asyncResult -> {
            atomicReference.set((StompClientConnection) asyncResult.result());
        });
        Thread.sleep(2000L);
        arrayList.stream().reduce(0L, (l, l2) -> {
            Assert.assertTrue(l2.longValue() - l.longValue() > 150);
            return l2;
        });
        arrayList2.stream().reduce(0L, (l3, l4) -> {
            Assert.assertTrue(l4.longValue() - l3.longValue() > 350);
            return l4;
        });
        Assertions.assertThat(((StompClientConnection) atomicReference.get()).server()).isNotNull();
    }

    @Test
    public void testConnectionDroppedHandler() throws InterruptedException {
        AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        AtomicBoolean atomicBoolean2 = new AtomicBoolean(false);
        this.server = StompServer.create(this.vertx, new StompServerOptions().setHeartbeat(new JsonObject().put("x", 100).put("y", 100))).handler(StompServerHandler.create(this.vertx).pingHandler(stompServerConnection -> {
            if (atomicBoolean.get()) {
                stompServerConnection.ping();
            }
        }));
        startServer();
        StompClient.create(this.vertx, new StompClientOptions().setHeartbeat(new JsonObject().put("x", 100).put("y", 100))).connect().onComplete(asyncResult -> {
            ((StompClientConnection) asyncResult.result()).connectionDroppedHandler(stompClientConnection -> {
                atomicBoolean2.set(true);
            });
            atomicBoolean.set(false);
        });
        ConditionFactory atMost = Awaitility.await().atMost(10L, TimeUnit.SECONDS);
        Objects.requireNonNull(atomicBoolean2);
        atMost.until(atomicBoolean2::get);
    }

    @Test
    public void testReconnection() throws InterruptedException {
        AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        AtomicInteger atomicInteger2 = new AtomicInteger();
        ArrayList arrayList = new ArrayList();
        StompServer create = StompServer.create(this.vertx, new StompServerOptions().setHeartbeat(new JsonObject().put("x", 1000).put("y", 1000)));
        StompServerHandler pingHandler = StompServerHandler.create(this.vertx).pingHandler(stompServerConnection -> {
            if (atomicBoolean.get()) {
                stompServerConnection.ping();
            }
        });
        Objects.requireNonNull(arrayList);
        this.server = create.handler(pingHandler.receivedFrameHandler((v1) -> {
            r3.add(v1);
        }));
        startServer();
        StompClient create2 = StompClient.create(this.vertx, new StompClientOptions().setHeartbeat(new JsonObject().put("x", 1000).put("y", 1000)));
        create2.connect().onComplete(getConnectionHandler(create2, atomicBoolean, atomicInteger, atomicInteger2));
        Awaitility.await().atMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(atomicInteger.get() == 1);
        });
        Awaitility.await().atMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(atomicInteger2.get() == 2);
        });
        Awaitility.await().atMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(containsClientFrame(arrayList, 1) && containsClientFrame(arrayList, 2));
        });
    }

    @Test
    public void testReconnectionWithDeadServer() throws InterruptedException {
        AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        AtomicInteger atomicInteger2 = new AtomicInteger();
        ArrayList arrayList = new ArrayList();
        StompServer create = StompServer.create(this.vertx, new StompServerOptions().setHeartbeat(new JsonObject().put("x", 1000).put("y", 1000)));
        StompServerHandler pingHandler = StompServerHandler.create(this.vertx).pingHandler(stompServerConnection -> {
            if (atomicBoolean.get()) {
                stompServerConnection.ping();
            } else {
                this.server.close();
            }
        });
        Objects.requireNonNull(arrayList);
        this.server = create.handler(pingHandler.receivedFrameHandler((v1) -> {
            r3.add(v1);
        }));
        startServer();
        StompClient create2 = StompClient.create(this.vertx, new StompClientOptions().setHeartbeat(new JsonObject().put("x", 1000).put("y", 1000)));
        create2.connect().onComplete(getConnectionHandler(create2, atomicBoolean, atomicInteger, atomicInteger2));
        Awaitility.await().atMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(atomicInteger.get() == 1);
        });
        Awaitility.await().atMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(atomicInteger2.get() == 1);
        });
        Awaitility.await().atMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(containsClientFrame(arrayList, 1) && !containsClientFrame(arrayList, 2));
        });
    }

    private boolean containsClientFrame(List<ServerFrame> list, int i) {
        for (ServerFrame serverFrame : list) {
            if (serverFrame.frame().getBody() != null && serverFrame.frame().getBodyAsString().contains("some body " + i)) {
                return true;
            }
        }
        return false;
    }

    private Handler<AsyncResult<StompClientConnection>> getConnectionHandler(StompClient stompClient, AtomicBoolean atomicBoolean, AtomicInteger atomicInteger, AtomicInteger atomicInteger2) {
        return asyncResult -> {
            if (asyncResult.succeeded()) {
                ((StompClientConnection) asyncResult.result()).connectionDroppedHandler(stompClientConnection -> {
                    atomicInteger.incrementAndGet();
                    stompClient.connect().onComplete(getConnectionHandler(stompClient, atomicBoolean, atomicInteger, atomicInteger2));
                });
                int incrementAndGet = atomicInteger2.incrementAndGet();
                atomicBoolean.set(false);
                ((StompClientConnection) asyncResult.result()).send("some-address", Buffer.buffer("some body " + incrementAndGet));
            }
        };
    }

    @Test
    public void testThatDroppedHandlerIsNotCalledWhenTheClientIsClosing() {
        this.server = StompServer.create(this.vertx, new StompServerOptions().setHeartbeat(new JsonObject().put("x", 1000).put("y", 1000))).handler(StompServerHandler.create(this.vertx));
        startServer();
        StompClient create = StompClient.create(this.vertx, new StompClientOptions().setHeartbeat(new JsonObject().put("x", 1000).put("y", 1000)));
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        AtomicBoolean atomicBoolean2 = new AtomicBoolean();
        AtomicBoolean atomicBoolean3 = new AtomicBoolean();
        create.connect().onComplete(asyncResult -> {
            StompClientConnection stompClientConnection = (StompClientConnection) asyncResult.result();
            stompClientConnection.closeHandler(stompClientConnection2 -> {
                atomicBoolean3.set(true);
            });
            stompClientConnection.connectionDroppedHandler(stompClientConnection3 -> {
                atomicBoolean.set(true);
            });
            atomicBoolean2.set(asyncResult.succeeded());
        });
        ConditionFactory atMost = Awaitility.await().atMost(10L, TimeUnit.SECONDS);
        Objects.requireNonNull(atomicBoolean2);
        atMost.until(atomicBoolean2::get);
        create.close();
        ConditionFactory atMost2 = Awaitility.await().atMost(10L, TimeUnit.SECONDS);
        Objects.requireNonNull(atomicBoolean3);
        atMost2.until(atomicBoolean3::get);
        Assertions.assertThat(atomicBoolean.get()).isFalse();
    }
}
