package io.vertx.mqtt.test.client;

import io.netty.handler.codec.mqtt.MqttQoS;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import io.vertx.mqtt.MqttClient;
import io.vertx.mqtt.MqttClientOptions;
import io.vertx.mqtt.MqttServer;
import io.vertx.mqtt.MqttTopicSubscription;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;

@RunWith(VertxUnitRunner.class)
/* loaded from: input_file:io/vertx/mqtt/test/client/MqttClientKeepAliveTest.class */
public class MqttClientKeepAliveTest {
    private Vertx vertx;
    private MqttServer server;

    private void startServer(TestContext testContext) throws Exception {
        this.server.listen().toCompletionStage().toCompletableFuture().get(20L, TimeUnit.SECONDS);
    }

    @Before
    public void before() {
        this.vertx = Vertx.vertx();
        this.server = MqttServer.create(this.vertx);
    }

    @After
    public void after(TestContext testContext) {
        this.server.close().onComplete(testContext.asyncAssertSuccess(r5 -> {
            this.vertx.close().onComplete(testContext.asyncAssertSuccess());
        }));
    }

    @Test
    public void autoKeepAlive(TestContext testContext) throws Exception {
        AtomicInteger atomicInteger = new AtomicInteger();
        this.server.endpointHandler(mqttEndpoint -> {
            mqttEndpoint.accept(false);
            mqttEndpoint.autoKeepAlive(true);
            mqttEndpoint.pingHandler(r3 -> {
                atomicInteger.incrementAndGet();
            });
        });
        startServer(testContext);
        MqttClientOptions mqttClientOptions = new MqttClientOptions();
        mqttClientOptions.setAutoKeepAlive(true);
        mqttClientOptions.setKeepAliveInterval(1);
        MqttClient create = MqttClient.create(this.vertx, mqttClientOptions);
        create.connect(1883, "localhost").onComplete(testContext.asyncAssertSuccess(mqttConnAckMessage -> {
            Async async = testContext.async();
            AtomicInteger atomicInteger2 = new AtomicInteger();
            create.pingResponseHandler(r5 -> {
                if (atomicInteger2.incrementAndGet() == 4) {
                    create.disconnect();
                }
            });
            create.closeHandler(r8 -> {
                Assert.assertEquals(4L, atomicInteger.get());
                Assert.assertEquals(4L, atomicInteger2.get());
                async.complete();
            });
        }));
    }

    @Test
    public void clientWillDisconnectOnMissingPingResponse(TestContext testContext) throws Exception {
        AtomicInteger atomicInteger = new AtomicInteger();
        this.server.endpointHandler(mqttEndpoint -> {
            mqttEndpoint.autoKeepAlive(false);
            mqttEndpoint.accept(false);
            mqttEndpoint.pingHandler(r3 -> {
                atomicInteger.incrementAndGet();
            });
        });
        startServer(testContext);
        MqttClientOptions mqttClientOptions = new MqttClientOptions();
        mqttClientOptions.setKeepAliveInterval(1);
        MqttClient create = MqttClient.create(this.vertx, mqttClientOptions);
        create.connect(1883, "localhost").onComplete(testContext.asyncAssertSuccess(mqttConnAckMessage -> {
            Async async = testContext.async();
            create.closeHandler(r7 -> {
                Assert.assertEquals(2L, atomicInteger.get());
                async.complete();
            });
        }));
    }

    @Test
    public void clientWillDisconnectOnMissingManualPingResponse(TestContext testContext) throws Exception {
        AtomicInteger atomicInteger = new AtomicInteger();
        this.server.endpointHandler(mqttEndpoint -> {
            mqttEndpoint.accept(false);
            mqttEndpoint.autoKeepAlive(false);
            mqttEndpoint.pingHandler(r3 -> {
                atomicInteger.incrementAndGet();
            });
        });
        startServer(testContext);
        MqttClientOptions mqttClientOptions = new MqttClientOptions();
        mqttClientOptions.setKeepAliveInterval(2);
        mqttClientOptions.setAutoKeepAlive(false);
        MqttClient create = MqttClient.create(this.vertx, mqttClientOptions);
        create.connect(1883, "localhost").onComplete(testContext.asyncAssertSuccess(mqttConnAckMessage -> {
            Async async = testContext.async();
            AtomicInteger atomicInteger2 = new AtomicInteger();
            create.pingResponseHandler(r3 -> {
                atomicInteger2.incrementAndGet();
            });
            create.ping();
            create.closeHandler(r8 -> {
                Assert.assertEquals(0L, atomicInteger2.get());
                Assert.assertEquals(1L, atomicInteger.get());
                async.complete();
            });
        }));
    }

    @Test
    public void clientSendingRegularMessageDoesNotPreventClientPings(TestContext testContext) throws Exception {
        AtomicInteger atomicInteger = new AtomicInteger();
        AtomicInteger atomicInteger2 = new AtomicInteger();
        this.server.endpointHandler(mqttEndpoint -> {
            mqttEndpoint.accept(false);
            mqttEndpoint.pingHandler(r3 -> {
                atomicInteger.incrementAndGet();
            });
            mqttEndpoint.publishHandler(mqttPublishMessage -> {
                if (atomicInteger2.incrementAndGet() == 4) {
                    mqttEndpoint.close();
                }
            });
        });
        startServer(testContext);
        MqttClientOptions mqttClientOptions = new MqttClientOptions();
        mqttClientOptions.setKeepAliveInterval(2);
        mqttClientOptions.setAutoKeepAlive(true);
        MqttClient create = MqttClient.create(this.vertx, mqttClientOptions);
        create.connect(1883, "localhost").onComplete(testContext.asyncAssertSuccess(mqttConnAckMessage -> {
            Async async = testContext.async();
            long periodic = this.vertx.setPeriodic(500L, l -> {
                create.publish("greetings", Buffer.buffer("hello"), MqttQoS.AT_MOST_ONCE, false, false);
            });
            AtomicInteger atomicInteger3 = new AtomicInteger();
            create.pingResponseHandler(r3 -> {
                atomicInteger3.incrementAndGet();
            });
            create.closeHandler(r11 -> {
                this.vertx.cancelTimer(periodic);
                testContext.assertEquals(0, Integer.valueOf(atomicInteger.get()));
                testContext.assertEquals(0, Integer.valueOf(atomicInteger3.get()));
                async.complete();
            });
        }));
    }

    @Test
    public void serverSendingRegularMessageDoesNotPreventClientPings(TestContext testContext) throws Exception {
        AtomicInteger atomicInteger = new AtomicInteger();
        this.server.endpointHandler(mqttEndpoint -> {
            mqttEndpoint.accept(false);
            mqttEndpoint.subscribeHandler(mqttSubscribeMessage -> {
                mqttEndpoint.subscribeAcknowledge(mqttSubscribeMessage.messageId(), (List) mqttSubscribeMessage.topicSubscriptions().stream().map((v0) -> {
                    return v0.qualityOfService();
                }).collect(Collectors.toList()));
                long periodic = this.vertx.setPeriodic(500L, l -> {
                    mqttEndpoint.publish(((MqttTopicSubscription) mqttSubscribeMessage.topicSubscriptions().get(0)).topicName(), Buffer.buffer("hello"), ((MqttTopicSubscription) mqttSubscribeMessage.topicSubscriptions().get(0)).qualityOfService(), false, false);
                });
                mqttEndpoint.unsubscribeHandler(mqttUnsubscribeMessage -> {
                    this.vertx.cancelTimer(periodic);
                    mqttEndpoint.close();
                });
            });
            mqttEndpoint.pingHandler(r3 -> {
                atomicInteger.incrementAndGet();
            });
        });
        startServer(testContext);
        MqttClientOptions mqttClientOptions = new MqttClientOptions();
        mqttClientOptions.setKeepAliveInterval(1);
        mqttClientOptions.setAutoKeepAlive(true);
        MqttClient create = MqttClient.create(this.vertx, mqttClientOptions);
        create.connect(1883, "localhost").onComplete(testContext.asyncAssertSuccess(mqttConnAckMessage -> {
            Async async = testContext.async();
            AtomicInteger atomicInteger2 = new AtomicInteger();
            create.subscribe("topic/topic", 0);
            AtomicInteger atomicInteger3 = new AtomicInteger();
            create.publishHandler(mqttPublishMessage -> {
                if (atomicInteger3.incrementAndGet() == 5) {
                    create.unsubscribe("topic/topic");
                }
            });
            create.pingResponseHandler(r3 -> {
                atomicInteger2.incrementAndGet();
            });
            create.closeHandler(r7 -> {
                testContext.assertTrue(atomicInteger.get() > 0);
                testContext.assertTrue(atomicInteger2.get() > 0);
                async.complete();
            });
        }));
    }
}
