package io.vertx.mqtt.test.server;

import io.vertx.core.MultiMap;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.internal.logging.Logger;
import io.vertx.core.internal.logging.LoggerFactory;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import io.vertx.mqtt.MqttServer;
import io.vertx.mqtt.MqttServerOptions;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;

@RunWith(VertxUnitRunner.class)
/* loaded from: input_file:io/vertx/mqtt/test/server/MqttServerWebSocketTest.class */
public class MqttServerWebSocketTest {
    private static final Logger log = LoggerFactory.getLogger(MqttServerWebSocketTest.class);
    protected static final String MQTT_SERVER_HOST = "localhost";
    protected static final int MQTT_SERVER_PORT = 1883;
    private Vertx vertx;

    @Before
    public void before() {
        this.vertx = Vertx.vertx();
    }

    @After
    public void after(TestContext testContext) {
        this.vertx.close().onComplete(testContext.asyncAssertSuccess());
    }

    @Test
    public void sharedServersRoundRobin(TestContext testContext) {
        int i = (VertxOptions.DEFAULT_EVENT_LOOP_POOL_SIZE / 2) - 1;
        int i2 = i * 20;
        ArrayList arrayList = new ArrayList();
        ConcurrentHashMap.KeySetView newKeySet = ConcurrentHashMap.newKeySet();
        CountDownLatch countDownLatch = new CountDownLatch(i);
        CountDownLatch countDownLatch2 = new CountDownLatch(i2);
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        for (int i3 = 0; i3 < i; i3++) {
            try {
                MqttServer create = MqttServer.create(this.vertx, new MqttServerOptions().setHost(MQTT_SERVER_HOST).setPort(MQTT_SERVER_PORT).setUseWebSocket(true));
                arrayList.add(create);
                create.endpointHandler(mqttEndpoint -> {
                    newKeySet.add(create);
                    Integer num = (Integer) concurrentHashMap.get(create);
                    concurrentHashMap.put(create, Integer.valueOf((num == null ? 0 : num.intValue()) + 1));
                    mqttEndpoint.accept(false);
                    countDownLatch2.countDown();
                }).listen().onComplete(asyncResult -> {
                    if (!asyncResult.succeeded()) {
                        log.error("Error starting MQTT server", asyncResult.cause());
                    } else {
                        log.info("MQTT server listening on port " + ((MqttServer) asyncResult.result()).actualPort());
                        countDownLatch.countDown();
                    }
                });
            } catch (InterruptedException e) {
                return;
            }
        }
        testContext.assertTrue(countDownLatch.await(10L, TimeUnit.SECONDS));
        for (int i4 = 0; i4 < i2; i4++) {
            String format = String.format("client-%d", Integer.valueOf(i4));
            try {
                new MqttClient(String.format("ws://%s:%d", MQTT_SERVER_HOST, Integer.valueOf(MQTT_SERVER_PORT)), format, new MemoryPersistence()).connect();
                log.info("Client connected " + format);
            } catch (MqttException e2) {
                log.error("Error on connecting client " + format, e2);
                testContext.assertTrue(false);
            }
        }
        testContext.assertTrue(countDownLatch2.await(10L, TimeUnit.SECONDS));
        testContext.assertEquals(Integer.valueOf(i), Integer.valueOf(newKeySet.size()));
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            testContext.assertTrue(newKeySet.contains((MqttServer) it.next()));
        }
        testContext.assertEquals(Integer.valueOf(i), Integer.valueOf(concurrentHashMap.size()));
        Iterator it2 = concurrentHashMap.values().iterator();
        while (it2.hasNext()) {
            testContext.assertEquals(Integer.valueOf(i2 / i), Integer.valueOf(((Integer) it2.next()).intValue()));
        }
        CountDownLatch countDownLatch3 = new CountDownLatch(i);
        Iterator it3 = arrayList.iterator();
        while (it3.hasNext()) {
            ((MqttServer) it3.next()).close().onComplete(testContext.asyncAssertSuccess(r3 -> {
                countDownLatch3.countDown();
            }));
        }
        testContext.assertTrue(countDownLatch3.await(10L, TimeUnit.SECONDS));
    }

    @Test
    public void testHttpHeaders(TestContext testContext) {
        MqttServer create = MqttServer.create(this.vertx, new MqttServerOptions().setHost(MQTT_SERVER_HOST).setPort(MQTT_SERVER_PORT).setUseWebSocket(true));
        Async async = testContext.async();
        create.endpointHandler(mqttEndpoint -> {
            MultiMap httpHeaders = mqttEndpoint.httpHeaders();
            testContext.assertNotNull(httpHeaders);
            testContext.assertEquals("Upgrade", httpHeaders.get("Connection"));
            testContext.assertEquals("/mqtt", mqttEndpoint.httpRequestURI());
            async.complete();
            mqttEndpoint.accept(false);
        });
        Async async2 = testContext.async();
        create.listen().onComplete(testContext.asyncAssertSuccess(mqttServer -> {
            async2.complete();
        }));
        async2.awaitSuccess(15000L);
        try {
            MqttClient mqttClient = new MqttClient(String.format("ws://%s:%d", MQTT_SERVER_HOST, Integer.valueOf(MQTT_SERVER_PORT)), "12345", new MemoryPersistence());
            try {
                mqttClient.connect();
                mqttClient.disconnect();
                mqttClient.close();
            } finally {
            }
        } catch (MqttException e) {
            testContext.fail(e);
        }
    }
}
