package io.vertx.mqtt.test.server;

import io.netty.handler.codec.mqtt.MqttProperties;
import io.vertx.core.buffer.Buffer;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import io.vertx.mqtt.MqttEndpoint;
import io.vertx.mqtt.MqttTopicSubscription;
import io.vertx.mqtt.messages.codes.MqttSubAckReasonCode;
import java.util.ArrayList;
import java.util.Objects;
import org.eclipse.paho.mqttv5.client.MqttActionListener;
import org.eclipse.paho.mqttv5.client.MqttAsyncClient;
import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttSubscription;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;

@RunWith(VertxUnitRunner.class)
/* loaded from: input_file:io/vertx/mqtt/test/server/Mqtt5ServerSubscribeTest.class */
public class Mqtt5ServerSubscribeTest extends MqttServerBaseTest {
    private Async async;
    private int requestedQos;
    private boolean requestedRetainAsPublished;
    private boolean requestedNoLocal;
    private int requestedRetainHandling;
    private static final String MQTT_TOPIC = "/my_topic";
    private static final String MQTT_TOPIC_FAILURE = "/my_topic/failure";
    private static final String MQTT_FAILURE_REASON = "test reason";
    private static final int SUBSCRIPTION_IDENTIFIER = 42;

    @Before
    public void before(TestContext testContext) {
        setUp(testContext);
    }

    @After
    public void after(TestContext testContext) {
        tearDown(testContext);
    }

    @Test
    public void subscribeQos0(TestContext testContext) {
        subscribe(testContext, MQTT_TOPIC, 0, true, false, 0);
    }

    @Test
    public void subscribeQos1(TestContext testContext) {
        subscribe(testContext, MQTT_TOPIC, 1, false, true, 1);
    }

    @Test
    public void subscribeQos2(TestContext testContext) {
        subscribe(testContext, MQTT_TOPIC, 2, true, true, 2);
    }

    @Test
    public void subscribeFailure(TestContext testContext) {
        subscribe(testContext, MQTT_TOPIC_FAILURE, 0, false, false, 0);
    }

    private void subscribe(TestContext testContext, String str, int i, boolean z, boolean z2, int i2) {
        this.async = testContext.async();
        try {
            MqttAsyncClient mqttAsyncClient = new MqttAsyncClient(String.format("tcp://%s:%d", "localhost", 1883), "12345", new MemoryPersistence());
            mqttAsyncClient.connect().waitForCompletion();
            MqttSubscription mqttSubscription = new MqttSubscription(str, i);
            this.requestedQos = i;
            mqttSubscription.setNoLocal(z2);
            mqttSubscription.setRetainAsPublished(z);
            mqttSubscription.setRetainHandling(i2);
            this.requestedNoLocal = z2;
            this.requestedRetainAsPublished = z;
            this.requestedRetainHandling = i2;
            MqttProperties mqttProperties = new MqttProperties();
            mqttProperties.setSubscriptionIdentifier(Integer.valueOf(SUBSCRIPTION_IDENTIFIER));
            mqttAsyncClient.subscribe(new MqttSubscription[]{mqttSubscription}, (Object) null, (MqttActionListener) null, mqttProperties).waitForCompletion();
            this.async.await();
        } catch (MqttException e) {
            e.printStackTrace();
            if (str.equals(MQTT_TOPIC_FAILURE)) {
                testContext.assertEquals(Integer.valueOf(e.getReasonCode()), 143);
                testContext.assertEquals(e.getMessage(), MQTT_FAILURE_REASON);
            }
        }
    }

    @Test
    public void subscribeUnsupportedMqttVersion(TestContext testContext) {
        Async async = testContext.async();
        this.vertx.createNetClient().connect(1883, "localhost").onComplete(testContext.asyncAssertSuccess(netSocket -> {
            netSocket.write(Buffer.buffer(new byte[]{16, 17, 0, 4, 77, 81, 84, 84, 6, 2, 0, 60, 0, 5, 49, 50, 51, 52, 53}));
            Buffer buffer = Buffer.buffer();
            Objects.requireNonNull(buffer);
            netSocket.handler(buffer::appendBuffer);
            Objects.requireNonNull(testContext);
            netSocket.exceptionHandler(testContext::fail);
            netSocket.closeHandler(r8 -> {
                testContext.assertEquals(Buffer.buffer(new byte[]{32, 2, 0, 1}), buffer);
                async.complete();
            });
        }));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.vertx.mqtt.test.server.MqttServerBaseTest
    public void endpointHandler(MqttEndpoint mqttEndpoint, TestContext testContext) {
        mqttEndpoint.subscribeHandler(mqttSubscribeMessage -> {
            MqttTopicSubscription mqttTopicSubscription = (MqttTopicSubscription) mqttSubscribeMessage.topicSubscriptions().get(0);
            testContext.assertEquals(Integer.valueOf(this.requestedQos), Integer.valueOf(mqttTopicSubscription.subscriptionOption().qos().value()));
            testContext.assertEquals(Boolean.valueOf(this.requestedNoLocal), Boolean.valueOf(mqttTopicSubscription.subscriptionOption().isNoLocal()));
            testContext.assertEquals(Boolean.valueOf(this.requestedRetainAsPublished), Boolean.valueOf(mqttTopicSubscription.subscriptionOption().isRetainAsPublished()));
            testContext.assertEquals(Integer.valueOf(this.requestedRetainHandling), Integer.valueOf(mqttTopicSubscription.subscriptionOption().retainHandling().value()));
            testContext.assertEquals(Integer.valueOf(SUBSCRIPTION_IDENTIFIER), mqttSubscribeMessage.properties().getProperty(MqttProperties.MqttPropertyType.SUBSCRIPTION_IDENTIFIER.value()).value());
            ArrayList arrayList = new ArrayList();
            io.netty.handler.codec.mqtt.MqttProperties mqttProperties = new io.netty.handler.codec.mqtt.MqttProperties();
            if (((MqttTopicSubscription) mqttSubscribeMessage.topicSubscriptions().get(0)).topicName().equals(MQTT_TOPIC_FAILURE)) {
                arrayList.add(MqttSubAckReasonCode.TOPIC_FILTER_INVALID);
                mqttProperties.add(new MqttProperties.StringProperty(MqttProperties.MqttPropertyType.REASON_STRING.value(), MQTT_FAILURE_REASON));
            } else {
                arrayList.add(MqttSubAckReasonCode.qosGranted(((MqttTopicSubscription) mqttSubscribeMessage.topicSubscriptions().get(0)).qualityOfService()));
            }
            mqttEndpoint.subscribeAcknowledge(mqttSubscribeMessage.messageId(), arrayList, mqttProperties);
            this.async.complete();
        });
        mqttEndpoint.accept(false);
    }
}
