package io.qalipsis.plugins.netty.mqtt;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageBuilders;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.util.concurrent.Future;
import io.qalipsis.api.logging.LoggerHelper;
import io.qalipsis.plugins.netty.NativeTransportUtils;
import io.qalipsis.plugins.netty.handlers.monitoring.ChannelMonitoringHandler;
import io.qalipsis.plugins.netty.mqtt.pendingmessage.MqttPendingMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import kotlin.Metadata;
import kotlin.Unit;
import kotlin.jvm.JvmStatic;
import kotlin.jvm.functions.Function0;
import kotlin.jvm.internal.DefaultConstructorMarker;
import kotlin.jvm.internal.Intrinsics;
import kotlin.jvm.internal.SourceDebugExtension;
import mu.KLogger;
import mu.KotlinLogging;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

/* compiled from: MqttClient.kt */
@Metadata(mv = {ChannelMonitoringHandler.DATA_SENT_PHASE, 8, ChannelMonitoringHandler.INIT_PHASE}, k = ChannelMonitoringHandler.DATA_SENT_PHASE, xi = 48, d1 = {"��j\n\u0002\u0018\u0002\n\u0002\u0010��\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0010\u000b\n��\n\u0002\u0010\u0002\n\u0002\b\u0002\n\u0002\u0010\b\n\u0002\b\u0002\n\u0002\u0010\u000e\n��\n\u0002\u0010\u0012\n��\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\b��\u0018�� $2\u00020\u0001:\u0001$B\u0015\u0012\u0006\u0010\u0002\u001a\u00020\u0003\u0012\u0006\u0010\u0004\u001a\u00020\u0005¢\u0006\u0002\u0010\u0006J\u0006\u0010\u0011\u001a\u00020\u0012J\u0010\u0010\u0013\u001a\u00020\u00122\u0006\u0010\u0007\u001a\u00020\bH\u0002J\b\u0010\u0014\u001a\u00020\u0015H\u0002J \u0010\u0016\u001a\u00020\u00122\u0006\u0010\u0017\u001a\u00020\u00182\u0006\u0010\u0019\u001a\u00020\u001a2\b\b\u0002\u0010\u001b\u001a\u00020\u001cJ*\u0010\u0016\u001a\u00020\u00122\u0006\u0010\u0017\u001a\u00020\u00182\u0006\u0010\u0019\u001a\u00020\u001a2\b\b\u0002\u0010\u001d\u001a\u00020\u00102\b\b\u0002\u0010\u001b\u001a\u00020\u001cJ\b\u0010\u000f\u001a\u00020\u0012H\u0002J\u0010\u0010\u001e\u001a\u00020\u00122\u0006\u0010\u001f\u001a\u00020 H\u0002J\u000e\u0010!\u001a\u00020\u00122\u0006\u0010\"\u001a\u00020#R\u000e\u0010\u0007\u001a\u00020\bX\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0004\u001a\u00020\u0005X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\t\u001a\u00020\nX\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u000b\u001a\u00020\fX\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\r\u001a\u00020\u000eX\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u000f\u001a\u00020\u0010X\u0082\u000e¢\u0006\u0002\n��¨\u0006%"}, d2 = {"Lio/qalipsis/plugins/netty/mqtt/MqttClient;", "", "clientOptions", "Lio/qalipsis/plugins/netty/mqtt/MqttClientOptions;", "eventLoopGroup", "Lio/netty/channel/EventLoopGroup;", "(Lio/qalipsis/plugins/netty/mqtt/MqttClientOptions;Lio/netty/channel/EventLoopGroup;)V", "bootstrap", "Lio/netty/bootstrap/Bootstrap;", "messageHandler", "Lio/qalipsis/plugins/netty/mqtt/MqttMessageHandler;", "mqttHandler", "Lio/qalipsis/plugins/netty/mqtt/MqttChannelHandler;", "nextMessageId", "Ljava/util/concurrent/atomic/AtomicInteger;", "reconnect", "", "close", "", "connect", "getNewMessageId", "", "publish", "topicName", "", "payload", "", "qoS", "Lio/netty/handler/codec/mqtt/MqttQoS;", "retained", "scheduleReconnect", "channelFuture", "Lio/netty/channel/ChannelFuture;", "subscribe", "mqttSubscriber", "Lio/qalipsis/plugins/netty/mqtt/MqttSubscriber;", "Companion", "qalipsis-plugin-netty"})
@SourceDebugExtension({"SMAP\nMqttClient.kt\nKotlin\n*S Kotlin\n*F\n+ 1 MqttClient.kt\nio/qalipsis/plugins/netty/mqtt/MqttClient\n+ 2 LoggerHelper.kt\nio/qalipsis/api/logging/LoggerHelper\n*L\n1#1,190:1\n31#2:191\n*S KotlinDebug\n*F\n+ 1 MqttClient.kt\nio/qalipsis/plugins/netty/mqtt/MqttClient\n*L\n168#1:191\n*E\n"})
/* loaded from: input_file:io/qalipsis/plugins/netty/mqtt/MqttClient.class */
public final class MqttClient {

    @NotNull
    public static final Companion Companion = new Companion(null);

    @NotNull
    private final EventLoopGroup eventLoopGroup;

    @NotNull
    private final AtomicInteger nextMessageId;

    @NotNull
    private final Bootstrap bootstrap;
    private boolean reconnect;

    @NotNull
    private final MqttMessageHandler messageHandler;

    @NotNull
    private final MqttChannelHandler mqttHandler;

    @NotNull
    private static final KLogger log;
    private static final long DEFAULT_EVENT_LOOP_SHUTDOWN_TIMEOUT = 10000;

    /* compiled from: MqttClient.kt */
    @Metadata(mv = {ChannelMonitoringHandler.DATA_SENT_PHASE, 8, ChannelMonitoringHandler.INIT_PHASE}, k = ChannelMonitoringHandler.DATA_SENT_PHASE, xi = 48, d1 = {"��\u001a\n\u0002\u0018\u0002\n\u0002\u0010��\n\u0002\b\u0002\n\u0002\u0010\t\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\b\u0086\u0003\u0018��2\u00020\u0001B\u0007\b\u0002¢\u0006\u0002\u0010\u0002R\u000e\u0010\u0003\u001a\u00020\u0004X\u0082T¢\u0006\u0002\n��R\u0016\u0010\u0005\u001a\u00020\u00068\u0002X\u0083\u0004¢\u0006\b\n��\u0012\u0004\b\u0007\u0010\u0002¨\u0006\b"}, d2 = {"Lio/qalipsis/plugins/netty/mqtt/MqttClient$Companion;", "", "()V", "DEFAULT_EVENT_LOOP_SHUTDOWN_TIMEOUT", "", "log", "Lmu/KLogger;", "getLog$annotations", "qalipsis-plugin-netty"})
    /* loaded from: input_file:io/qalipsis/plugins/netty/mqtt/MqttClient$Companion.class */
    public static final class Companion {
        private Companion() {
        }

        @JvmStatic
        private static /* synthetic */ void getLog$annotations() {
        }

        public /* synthetic */ Companion(DefaultConstructorMarker defaultConstructorMarker) {
            this();
        }
    }

    public MqttClient(@NotNull MqttClientOptions mqttClientOptions, @NotNull EventLoopGroup eventLoopGroup) {
        Intrinsics.checkNotNullParameter(mqttClientOptions, "clientOptions");
        Intrinsics.checkNotNullParameter(eventLoopGroup, "eventLoopGroup");
        this.eventLoopGroup = eventLoopGroup;
        this.nextMessageId = new AtomicInteger(1);
        this.reconnect = true;
        this.messageHandler = new MqttMessageHandler();
        this.mqttHandler = new MqttChannelHandler(this.messageHandler, mqttClientOptions);
        log.debug(new Function0<Object>() { // from class: io.qalipsis.plugins.netty.mqtt.MqttClient.1
            @Nullable
            public final Object invoke() {
                return "Trying to connect to MQTT broker";
            }
        });
        this.reconnect = mqttClientOptions.getConnectionConfiguration().getReconnect();
        Bootstrap group = new Bootstrap().remoteAddress(mqttClientOptions.getConnectionConfiguration().getHost(), mqttClientOptions.getConnectionConfiguration().getPort()).channel(NativeTransportUtils.INSTANCE.getSocketChannelClass()).option(ChannelOption.SO_REUSEADDR, true).handler(new MqttChannelInitializer(this.mqttHandler)).group(this.eventLoopGroup);
        Intrinsics.checkNotNullExpressionValue(group, "Bootstrap()\n            …   .group(eventLoopGroup)");
        this.bootstrap = group;
        connect(this.bootstrap);
    }

    private final void connect(Bootstrap bootstrap) {
        ChannelFuture connect = bootstrap.connect();
        MqttMessageHandler mqttMessageHandler = this.messageHandler;
        Intrinsics.checkNotNullExpressionValue(connect, "connection");
        mqttMessageHandler.connectionListener(connect);
        scheduleReconnect(connect);
    }

    private final void scheduleReconnect(ChannelFuture channelFuture) {
        channelFuture.addListener((v2) -> {
            scheduleReconnect$lambda$1(r1, r2, v2);
        });
    }

    private final void reconnect() {
        if (this.reconnect) {
            this.eventLoopGroup.schedule(() -> {
                reconnect$lambda$2(r1);
            }, 500L, TimeUnit.MILLISECONDS);
        }
    }

    private final int getNewMessageId() {
        return this.nextMessageId.getAndIncrement();
    }

    public final void subscribe(@NotNull MqttSubscriber mqttSubscriber) {
        Intrinsics.checkNotNullParameter(mqttSubscriber, "mqttSubscriber");
        int newMessageId = getNewMessageId();
        MqttMessage build = MqttMessageBuilders.subscribe().messageId(newMessageId).addSubscription(mqttSubscriber.getQoS(), mqttSubscriber.getTopic()).build();
        MqttMessageHandler mqttMessageHandler = this.messageHandler;
        Intrinsics.checkNotNullExpressionValue(build, "subscribeMessage");
        mqttMessageHandler.subscribe(mqttSubscriber, new MqttPendingMessage(build, newMessageId));
    }

    public final void publish(@NotNull String str, @NotNull byte[] bArr, @NotNull MqttQoS mqttQoS) {
        Intrinsics.checkNotNullParameter(str, "topicName");
        Intrinsics.checkNotNullParameter(bArr, "payload");
        Intrinsics.checkNotNullParameter(mqttQoS, "qoS");
        publish(str, bArr, true, mqttQoS);
    }

    public static /* synthetic */ void publish$default(MqttClient mqttClient, String str, byte[] bArr, MqttQoS mqttQoS, int i, Object obj) {
        if ((i & 4) != 0) {
            mqttQoS = MqttQoS.AT_LEAST_ONCE;
        }
        mqttClient.publish(str, bArr, mqttQoS);
    }

    public final void publish(@NotNull String str, @NotNull byte[] bArr, boolean z, @NotNull MqttQoS mqttQoS) {
        Intrinsics.checkNotNullParameter(str, "topicName");
        Intrinsics.checkNotNullParameter(bArr, "payload");
        Intrinsics.checkNotNullParameter(mqttQoS, "qoS");
        ByteBuf writeBytes = Unpooled.buffer().writeBytes(bArr);
        int newMessageId = getNewMessageId();
        MqttMessage build = MqttMessageBuilders.publish().messageId(newMessageId).topicName(str).payload(writeBytes).retained(z).qos(mqttQoS).build();
        MqttMessageHandler mqttMessageHandler = this.messageHandler;
        Intrinsics.checkNotNullExpressionValue(build, "publishMessage");
        mqttMessageHandler.publish(new MqttPendingMessage(build, newMessageId));
    }

    public static /* synthetic */ void publish$default(MqttClient mqttClient, String str, byte[] bArr, boolean z, MqttQoS mqttQoS, int i, Object obj) {
        if ((i & 4) != 0) {
            z = true;
        }
        if ((i & 8) != 0) {
            mqttQoS = MqttQoS.AT_LEAST_ONCE;
        }
        mqttClient.publish(str, bArr, z, mqttQoS);
    }

    public final void close() {
        log.debug(new Function0<Object>() { // from class: io.qalipsis.plugins.netty.mqtt.MqttClient$close$1
            @Nullable
            public final Object invoke() {
                return "Closing MQTT client";
            }
        });
        this.messageHandler.close();
        this.reconnect = false;
    }

    private static final void scheduleReconnect$lambda$1$lambda$0(MqttClient mqttClient, Future future) {
        Intrinsics.checkNotNullParameter(mqttClient, "this$0");
        mqttClient.reconnect();
    }

    private static final void scheduleReconnect$lambda$1(ChannelFuture channelFuture, MqttClient mqttClient, Future future) {
        Intrinsics.checkNotNullParameter(channelFuture, "$channelFuture");
        Intrinsics.checkNotNullParameter(mqttClient, "this$0");
        if (!future.isSuccess()) {
            mqttClient.reconnect();
            return;
        }
        Channel channel = channelFuture.channel();
        if (channel != null) {
            ChannelFuture closeFuture = channel.closeFuture();
            if (closeFuture != null) {
                closeFuture.addListener((v1) -> {
                    scheduleReconnect$lambda$1$lambda$0(r1, v1);
                });
            }
        }
    }

    private static final void reconnect$lambda$2(MqttClient mqttClient) {
        Intrinsics.checkNotNullParameter(mqttClient, "this$0");
        mqttClient.connect(mqttClient.bootstrap);
    }

    static {
        LoggerHelper loggerHelper = LoggerHelper.INSTANCE;
        Companion companion = Companion;
        log = KotlinLogging.INSTANCE.logger(new Function0<Unit>() { // from class: io.qalipsis.plugins.netty.mqtt.MqttClient$special$$inlined$logger$1
            public final void invoke() {
            }

            /* renamed from: invoke, reason: collision with other method in class */
            public /* bridge */ /* synthetic */ Object m171invoke() {
                invoke();
                return Unit.INSTANCE;
            }
        });
    }
}
