package io.vertx.mqtt.it;

import io.netty.handler.codec.mqtt.MqttQoS;
import io.vertx.core.Vertx;
import io.vertx.core.internal.logging.Logger;
import io.vertx.core.internal.logging.LoggerFactory;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.mqtt.MqttClient;
import java.util.Arrays;
import java.util.List;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:io/vertx/mqtt/it/MqttClientUnsubscribeIT.class */
public class MqttClientUnsubscribeIT extends MqttClientBaseIT {
    private static final String MQTT_TOPIC = "/my_topic";
    private int messageId = 0;
    private static final Logger log = LoggerFactory.getLogger(MqttClientUnsubscribeIT.class);
    private static final List<String> MQTT_TOPIC_LIST = Arrays.asList("my_topic1", "my_topic2");

    @Test
    public void unsubscribeQoS0(TestContext testContext) throws InterruptedException {
        unsubscribe(testContext, MqttQoS.AT_MOST_ONCE);
    }

    @Test
    public void unsubscribeQoS1(TestContext testContext) throws InterruptedException {
        unsubscribe(testContext, MqttQoS.AT_LEAST_ONCE);
    }

    @Test
    public void unsubscribeQoS2(TestContext testContext) throws InterruptedException {
        unsubscribe(testContext, MqttQoS.EXACTLY_ONCE);
    }

    private void unsubscribe(TestContext testContext, MqttQoS mqttQoS) {
        this.messageId = 0;
        Async async = testContext.async();
        MqttClient create = MqttClient.create(Vertx.vertx());
        create.unsubscribeCompletionHandler(num -> {
            Assert.assertTrue(num.intValue() == this.messageId);
            log.info("unsubscribing complete for message id = " + num);
            create.disconnect();
            async.countDown();
        });
        create.subscribeCompletionHandler(mqttSubAckMessage -> {
            testContext.assertTrue(mqttSubAckMessage.messageId() == this.messageId);
            testContext.assertTrue(mqttSubAckMessage.grantedQoSLevels().contains(Integer.valueOf(mqttQoS.value())));
            log.info("subscribing complete for message id = " + mqttSubAckMessage.messageId() + " with QoS " + mqttSubAckMessage.grantedQoSLevels());
            create.unsubscribe(MQTT_TOPIC).onComplete(testContext.asyncAssertSuccess(num2 -> {
                this.messageId = num2.intValue();
                log.info("unsubscribing on [/my_topic] message id = " + this.messageId);
            }));
            create.unsubscribe(MQTT_TOPIC_LIST).onComplete(testContext.asyncAssertSuccess(num3 -> {
                this.messageId = num3.intValue();
                log.info("unsubscribing on [" + MQTT_TOPIC_LIST + "] message id = " + this.messageId);
            }));
        });
        create.connect(this.port, this.host).onComplete(testContext.asyncAssertSuccess(mqttConnAckMessage -> {
            create.subscribe(MQTT_TOPIC, mqttQoS.value()).onComplete(testContext.asyncAssertSuccess(num2 -> {
                this.messageId = num2.intValue();
                log.info("subscribing on [/my_topic] with QoS [" + mqttQoS.value() + "] message id = " + this.messageId);
            }));
        }));
        async.await();
    }
}
