package io.qalipsis.plugins.netty.mqtt;

import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoop;
import io.netty.handler.codec.mqtt.MqttConnAckMessage;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPubAckMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttSubAckMessage;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import io.qalipsis.api.lang.CollectionsKt;
import io.qalipsis.api.logging.LoggerHelper;
import io.qalipsis.plugins.netty.handlers.monitoring.ChannelMonitoringHandler;
import io.qalipsis.plugins.netty.mqtt.pendingmessage.MqttPendingMessage;
import io.qalipsis.plugins.netty.mqtt.pendingmessage.MqttPublishReceivedPendingMessage;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import kotlin.Metadata;
import kotlin.Unit;
import kotlin.jvm.JvmStatic;
import kotlin.jvm.functions.Function0;
import kotlin.jvm.functions.Function1;
import kotlin.jvm.internal.DefaultConstructorMarker;
import kotlin.jvm.internal.Intrinsics;
import kotlin.jvm.internal.SourceDebugExtension;
import mu.KLogger;
import mu.KotlinLogging;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;

/* compiled from: MqttMessageHandler.kt */
@Metadata(mv = {ChannelMonitoringHandler.DATA_SENT_PHASE, 8, ChannelMonitoringHandler.INIT_PHASE}, k = ChannelMonitoringHandler.DATA_SENT_PHASE, xi = 48, d1 = {"��p\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0010!\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0010\u0002\n\u0002\b\u0004\n\u0002\u0010\b\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0005\n\u0002\u0018\u0002\n\u0002\b\r\b��\u0018�� 62\u000e\u0012\u0004\u0012\u00020\u0002\u0012\u0004\u0012\u00020\u00030\u0001:\u00016B\u0005¢\u0006\u0002\u0010\u0004J\u0018\u0010\f\u001a\u00020\r2\u0006\u0010\u000e\u001a\u00020\u00022\u0006\u0010\u000f\u001a\u00020\u0003H\u0016J\u0018\u0010\u0010\u001a\u00020\r2\u0006\u0010\u000f\u001a\u00020\u00032\u0006\u0010\u0011\u001a\u00020\u0012H\u0002J\u0018\u0010\u0013\u001a\u00020\u00032\u0006\u0010\u0014\u001a\u00020\u00152\u0006\u0010\u0011\u001a\u00020\u0012H\u0002J\u0006\u0010\u0016\u001a\u00020\rJ\u000e\u0010\u0017\u001a\u00020\r2\u0006\u0010\u0018\u001a\u00020\u0019J(\u0010\u001a\u001a\u00020\r2\u0006\u0010\u0011\u001a\u00020\u00122\u0016\b\u0002\u0010\u001b\u001a\u0010\u0012\u0004\u0012\u00020\t\u0012\u0004\u0012\u00020\r\u0018\u00010\u001cH\u0002J\u0018\u0010\u001d\u001a\u00020\r2\u0006\u0010\u0005\u001a\u00020\u00062\u0006\u0010\u001e\u001a\u00020\u001fH\u0002J\u0018\u0010 \u001a\u00020\r2\u0006\u0010\u0005\u001a\u00020\u00062\u0006\u0010\u001e\u001a\u00020!H\u0002J\u0010\u0010\"\u001a\u00020\r2\u0006\u0010#\u001a\u00020$H\u0002J\u0010\u0010%\u001a\u00020\r2\u0006\u0010\u001e\u001a\u00020\u0003H\u0002J\u0010\u0010&\u001a\u00020\r2\u0006\u0010\u001e\u001a\u00020\u0003H\u0002J\u0010\u0010'\u001a\u00020\r2\u0006\u0010\u001e\u001a\u00020\u0003H\u0002J\u0010\u0010(\u001a\u00020\r2\u0006\u0010)\u001a\u00020*H\u0002J\u000e\u0010+\u001a\u00020\r2\u0006\u0010,\u001a\u00020\tJ\u0012\u0010-\u001a\u0004\u0018\u00010\u00192\u0006\u0010\u001e\u001a\u00020\u0003H\u0002J\u0010\u0010.\u001a\u00020\r2\u0006\u0010\u0005\u001a\u00020\u0006H\u0002J\u0010\u0010/\u001a\u00020\r2\u0006\u0010\u001e\u001a\u00020!H\u0002J\u0010\u00100\u001a\u00020\r2\u0006\u0010\u0011\u001a\u00020\u0012H\u0002J\u0018\u00101\u001a\u00020\r2\u0006\u0010\u001e\u001a\u00020!2\u0006\u0010\u0005\u001a\u00020\u0006H\u0002J\u0010\u00102\u001a\u00020\r2\u0006\u0010\u0011\u001a\u00020\u0012H\u0002J\u0016\u00103\u001a\u00020\r2\u0006\u00104\u001a\u00020\u000b2\u0006\u00105\u001a\u00020\tR\u0010\u0010\u0005\u001a\u0004\u0018\u00010\u0006X\u0082\u000e¢\u0006\u0002\n��R\u0014\u0010\u0007\u001a\b\u0012\u0004\u0012\u00020\t0\bX\u0082\u000e¢\u0006\u0002\n��R\u0010\u0010\n\u001a\u0004\u0018\u00010\u000bX\u0082\u000e¢\u0006\u0002\n��¨\u00067"}, d2 = {"Lio/qalipsis/plugins/netty/mqtt/MqttMessageHandler;", "Ljava/util/function/BiConsumer;", "Lio/netty/channel/ChannelHandlerContext;", "Lio/netty/handler/codec/mqtt/MqttMessage;", "()V", "channel", "Lio/netty/channel/Channel;", "pendingMessages", "", "Lio/qalipsis/plugins/netty/mqtt/pendingmessage/MqttPendingMessage;", "subscriber", "Lio/qalipsis/plugins/netty/mqtt/MqttSubscriber;", "accept", "", "channelHandlerContext", "mqttMessage", "addPendingMessage", "messageId", "", "buildMessageByTypeAndId", "type", "Lio/netty/handler/codec/mqtt/MqttMessageType;", "close", "connectionListener", "channelFuture", "Lio/netty/channel/ChannelFuture;", "findAndRemovePendingMessage", "handler", "Lkotlin/Function1;", "handleConnectionAck", "message", "Lio/netty/handler/codec/mqtt/MqttConnAckMessage;", "handlePublish", "Lio/netty/handler/codec/mqtt/MqttPublishMessage;", "handlePublishAck", "mqttPubAckMessage", "Lio/netty/handler/codec/mqtt/MqttPubAckMessage;", "handlePublishCompleted", "handlePublishReceived", "handlePublishRelease", "handleSubscribeAck", "mqttSubAckMessage", "Lio/netty/handler/codec/mqtt/MqttSubAckMessage;", "publish", "pendingPublishMessage", "sendAndFlushPacket", "sendPendingMessages", "sendPubAckMessage", "sendPublishComplete", "sendPublishReceivedMessage", "sendPublishReleaseMessage", "subscribe", "mqttSubscriber", "pendingMessageSubscription", "Companion", "qalipsis-plugin-netty"})
@SourceDebugExtension({"SMAP\nMqttMessageHandler.kt\nKotlin\n*S Kotlin\n*F\n+ 1 MqttMessageHandler.kt\nio/qalipsis/plugins/netty/mqtt/MqttMessageHandler\n+ 2 _Collections.kt\nkotlin/collections/CollectionsKt___CollectionsKt\n+ 3 Operators.kt\nio/qalipsis/api/lang/OperatorsKt\n+ 4 LoggerHelper.kt\nio/qalipsis/api/logging/LoggerHelper\n*L\n1#1,334:1\n1855#2,2:335\n350#2,7:337\n75#3,5:344\n31#4:349\n*S KotlinDebug\n*F\n+ 1 MqttMessageHandler.kt\nio/qalipsis/plugins/netty/mqtt/MqttMessageHandler\n*L\n174#1:335,2\n295#1:337,7\n320#1:344,5\n326#1:349\n*E\n"})
/* loaded from: input_file:io/qalipsis/plugins/netty/mqtt/MqttMessageHandler.class */
public final class MqttMessageHandler implements BiConsumer<ChannelHandlerContext, MqttMessage> {

    @NotNull
    public static final Companion Companion = new Companion(null);

    @Nullable
    private Channel channel;

    @Nullable
    private MqttSubscriber subscriber;

    @NotNull
    private List<MqttPendingMessage> pendingMessages = CollectionsKt.concurrentList();

    @NotNull
    private static final KLogger log;
    private static final long CLOSE_TIMEOUT = 5;

    /* compiled from: MqttMessageHandler.kt */
    @Metadata(mv = {ChannelMonitoringHandler.DATA_SENT_PHASE, 8, ChannelMonitoringHandler.INIT_PHASE}, k = ChannelMonitoringHandler.DATA_SENT_PHASE, xi = 48, d1 = {"��\u001a\n\u0002\u0018\u0002\n\u0002\u0010��\n\u0002\b\u0002\n\u0002\u0010\t\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\b\u0086\u0003\u0018��2\u00020\u0001B\u0007\b\u0002¢\u0006\u0002\u0010\u0002R\u000e\u0010\u0003\u001a\u00020\u0004X\u0082T¢\u0006\u0002\n��R\u0016\u0010\u0005\u001a\u00020\u00068\u0002X\u0083\u0004¢\u0006\b\n��\u0012\u0004\b\u0007\u0010\u0002¨\u0006\b"}, d2 = {"Lio/qalipsis/plugins/netty/mqtt/MqttMessageHandler$Companion;", "", "()V", "CLOSE_TIMEOUT", "", "log", "Lmu/KLogger;", "getLog$annotations", "qalipsis-plugin-netty"})
    /* loaded from: input_file:io/qalipsis/plugins/netty/mqtt/MqttMessageHandler$Companion.class */
    public static final class Companion {
        private Companion() {
        }

        @JvmStatic
        private static /* synthetic */ void getLog$annotations() {
        }

        public /* synthetic */ Companion(DefaultConstructorMarker defaultConstructorMarker) {
            this();
        }
    }

    /* compiled from: MqttMessageHandler.kt */
    @Metadata(mv = {ChannelMonitoringHandler.DATA_SENT_PHASE, 8, ChannelMonitoringHandler.INIT_PHASE}, k = 3, xi = 48)
    /* loaded from: input_file:io/qalipsis/plugins/netty/mqtt/MqttMessageHandler$WhenMappings.class */
    public /* synthetic */ class WhenMappings {
        public static final /* synthetic */ int[] $EnumSwitchMapping$0;
        public static final /* synthetic */ int[] $EnumSwitchMapping$1;

        static {
            int[] iArr = new int[MqttMessageType.values().length];
            try {
                iArr[MqttMessageType.CONNACK.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                iArr[MqttMessageType.SUBACK.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                iArr[MqttMessageType.PUBACK.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                iArr[MqttMessageType.PUBLISH.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                iArr[MqttMessageType.PUBREC.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                iArr[MqttMessageType.PUBREL.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
            try {
                iArr[MqttMessageType.PUBCOMP.ordinal()] = 7;
            } catch (NoSuchFieldError e7) {
            }
            try {
                iArr[MqttMessageType.DISCONNECT.ordinal()] = 8;
            } catch (NoSuchFieldError e8) {
            }
            $EnumSwitchMapping$0 = iArr;
            int[] iArr2 = new int[MqttQoS.values().length];
            try {
                iArr2[MqttQoS.AT_MOST_ONCE.ordinal()] = 1;
            } catch (NoSuchFieldError e9) {
            }
            try {
                iArr2[MqttQoS.AT_LEAST_ONCE.ordinal()] = 2;
            } catch (NoSuchFieldError e10) {
            }
            try {
                iArr2[MqttQoS.EXACTLY_ONCE.ordinal()] = 3;
            } catch (NoSuchFieldError e11) {
            }
            $EnumSwitchMapping$1 = iArr2;
        }
    }

    @Override // java.util.function.BiConsumer
    public void accept(@NotNull ChannelHandlerContext channelHandlerContext, @NotNull final MqttMessage mqttMessage) {
        Intrinsics.checkNotNullParameter(channelHandlerContext, "channelHandlerContext");
        Intrinsics.checkNotNullParameter(mqttMessage, "mqttMessage");
        log.debug(new Function0<Object>() { // from class: io.qalipsis.plugins.netty.mqtt.MqttMessageHandler$accept$1
            /* JADX INFO: Access modifiers changed from: package-private */
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super(0);
            }

            @Nullable
            public final Object invoke() {
                return "Receiving message type: " + mqttMessage.fixedHeader().messageType();
            }
        });
        MqttMessageType messageType = mqttMessage.fixedHeader().messageType();
        switch (messageType == null ? -1 : WhenMappings.$EnumSwitchMapping$0[messageType.ordinal()]) {
            case ChannelMonitoringHandler.DATA_SENT_PHASE /* 1 */:
                Channel channel = channelHandlerContext.channel();
                Intrinsics.checkNotNullExpressionValue(channel, "channelHandlerContext.channel()");
                handleConnectionAck(channel, (MqttConnAckMessage) mqttMessage);
                return;
            case ChannelMonitoringHandler.RECEIVING_PHASE /* 2 */:
                handleSubscribeAck((MqttSubAckMessage) mqttMessage);
                return;
            case 3:
                handlePublishAck((MqttPubAckMessage) mqttMessage);
                return;
            case 4:
                Channel channel2 = channelHandlerContext.channel();
                Intrinsics.checkNotNullExpressionValue(channel2, "channelHandlerContext.channel()");
                Object retain = ReferenceCountUtil.retain((MqttPublishMessage) mqttMessage);
                Intrinsics.checkNotNullExpressionValue(retain, "retain(mqttMessage as MqttPublishMessage)");
                handlePublish(channel2, (MqttPublishMessage) retain);
                return;
            case 5:
                handlePublishReceived(mqttMessage);
                return;
            case 6:
                handlePublishRelease(mqttMessage);
                return;
            case 7:
                handlePublishCompleted(mqttMessage);
                return;
            case 8:
                close();
                return;
            default:
                log.info(new Function0<Object>() { // from class: io.qalipsis.plugins.netty.mqtt.MqttMessageHandler$accept$2
                    /* JADX INFO: Access modifiers changed from: package-private */
                    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
                    {
                        super(0);
                    }

                    @Nullable
                    public final Object invoke() {
                        return "MQTT message type " + mqttMessage.fixedHeader().messageType() + " not supported";
                    }
                });
                return;
        }
    }

    public final void connectionListener(@NotNull ChannelFuture channelFuture) {
        Intrinsics.checkNotNullParameter(channelFuture, "channelFuture");
        channelFuture.addListener((v2) -> {
            connectionListener$lambda$1(r1, r2, v2);
        });
    }

    public final void subscribe(@NotNull MqttSubscriber mqttSubscriber, @NotNull MqttPendingMessage mqttPendingMessage) {
        Intrinsics.checkNotNullParameter(mqttSubscriber, "mqttSubscriber");
        Intrinsics.checkNotNullParameter(mqttPendingMessage, "pendingMessageSubscription");
        this.subscriber = mqttSubscriber;
        this.pendingMessages.add(mqttPendingMessage);
        if (sendAndFlushPacket(mqttPendingMessage.getPendingMessage()) != null) {
            Channel channel = this.channel;
            Intrinsics.checkNotNull(channel);
            EventExecutor next = channel.eventLoop().next();
            Intrinsics.checkNotNullExpressionValue(next, "this.channel!!.eventLoop().next()");
            mqttPendingMessage.start((EventLoop) next, mqttMessage -> {
                this.sendAndFlushPacket(mqttMessage);
            });
        }
    }

    public final void publish(@NotNull MqttPendingMessage mqttPendingMessage) {
        Intrinsics.checkNotNullParameter(mqttPendingMessage, "pendingPublishMessage");
        this.pendingMessages.add(mqttPendingMessage);
        if (sendAndFlushPacket(mqttPendingMessage.getPendingMessage()) != null) {
            Channel channel = this.channel;
            Intrinsics.checkNotNull(channel);
            EventExecutor next = channel.eventLoop().next();
            Intrinsics.checkNotNullExpressionValue(next, "this.channel!!.eventLoop().next()");
            mqttPendingMessage.start((EventLoop) next, mqttMessage -> {
                this.sendAndFlushPacket(mqttMessage);
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final ChannelFuture sendAndFlushPacket(final MqttMessage mqttMessage) {
        Channel channel = this.channel;
        if (!(channel != null ? channel.isActive() : false)) {
            Channel channel2 = this.channel;
            if (channel2 != null) {
                return channel2.newFailedFuture(new IllegalStateException("Channel is closed!"));
            }
            return null;
        }
        log.debug(new Function0<Object>() { // from class: io.qalipsis.plugins.netty.mqtt.MqttMessageHandler$sendAndFlushPacket$1
            /* JADX INFO: Access modifiers changed from: package-private */
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super(0);
            }

            @Nullable
            public final Object invoke() {
                return "Sending and flush channel with message type: " + mqttMessage.fixedHeader().messageType();
            }
        });
        Channel channel3 = this.channel;
        if (channel3 != null) {
            return channel3.writeAndFlush(ReferenceCountUtil.retain(mqttMessage));
        }
        return null;
    }

    private final void handleSubscribeAck(MqttSubAckMessage mqttSubAckMessage) {
        findAndRemovePendingMessage$default(this, mqttSubAckMessage.idAndPropertiesVariableHeader().messageId(), null, 2, null);
    }

    private final void handlePublishAck(MqttPubAckMessage mqttPubAckMessage) {
        findAndRemovePendingMessage$default(this, mqttPubAckMessage.variableHeader().messageId(), null, 2, null);
    }

    private final void handleConnectionAck(Channel channel, MqttConnAckMessage mqttConnAckMessage) {
        if (mqttConnAckMessage.variableHeader().connectReturnCode() == MqttConnectReturnCode.CONNECTION_ACCEPTED) {
            this.channel = channel;
            sendPendingMessages(channel);
            return;
        }
        channel.close();
        Channel channel2 = this.channel;
        if (channel2 != null) {
            channel2.close();
        }
    }

    private final void sendPendingMessages(Channel channel) {
        for (MqttPendingMessage mqttPendingMessage : this.pendingMessages) {
            sendAndFlushPacket(mqttPendingMessage.getPendingMessage());
            EventExecutor next = channel.eventLoop().next();
            Intrinsics.checkNotNullExpressionValue(next, "channel.eventLoop().next()");
            mqttPendingMessage.start((EventLoop) next, mqttMessage -> {
                this.sendAndFlushPacket(mqttMessage);
            });
        }
    }

    private final void handlePublish(Channel channel, MqttPublishMessage mqttPublishMessage) {
        MqttQoS qosLevel = mqttPublishMessage.fixedHeader().qosLevel();
        switch (qosLevel == null ? -1 : WhenMappings.$EnumSwitchMapping$1[qosLevel.ordinal()]) {
            case ChannelMonitoringHandler.DATA_SENT_PHASE /* 1 */:
                MqttSubscriber mqttSubscriber = this.subscriber;
                if (mqttSubscriber != null) {
                    mqttSubscriber.handleMessage(mqttPublishMessage);
                    return;
                }
                return;
            case ChannelMonitoringHandler.RECEIVING_PHASE /* 2 */:
                sendPubAckMessage(mqttPublishMessage);
                return;
            case 3:
                if (mqttPublishMessage.variableHeader().packetId() != -1) {
                    sendPublishReceivedMessage(mqttPublishMessage, channel);
                    return;
                }
                return;
            default:
                log.warn(new Function0<Object>() { // from class: io.qalipsis.plugins.netty.mqtt.MqttMessageHandler$handlePublish$1
                    @Nullable
                    public final Object invoke() {
                        return "Not supported Qos Level";
                    }
                });
                return;
        }
    }

    private final void sendPubAckMessage(MqttPublishMessage mqttPublishMessage) {
        MqttSubscriber mqttSubscriber = this.subscriber;
        if (mqttSubscriber != null) {
            mqttSubscriber.handleMessage(mqttPublishMessage);
        }
        sendAndFlushPacket(buildMessageByTypeAndId(MqttMessageType.PUBACK, mqttPublishMessage.variableHeader().packetId()));
    }

    private final void sendPublishReceivedMessage(MqttPublishMessage mqttPublishMessage, Channel channel) {
        int packetId = mqttPublishMessage.variableHeader().packetId();
        MqttMessage buildMessageByTypeAndId = buildMessageByTypeAndId(MqttMessageType.PUBREC, packetId);
        sendAndFlushPacket(buildMessageByTypeAndId);
        MqttPublishReceivedPendingMessage mqttPublishReceivedPendingMessage = new MqttPublishReceivedPendingMessage(mqttPublishMessage, buildMessageByTypeAndId, packetId);
        this.pendingMessages.add(mqttPublishReceivedPendingMessage);
        EventExecutor next = channel.eventLoop().next();
        Intrinsics.checkNotNullExpressionValue(next, "channel.eventLoop().next()");
        mqttPublishReceivedPendingMessage.start((EventLoop) next, mqttMessage -> {
            this.sendAndFlushPacket(mqttMessage);
        });
    }

    private final void handlePublishRelease(MqttMessage mqttMessage) {
        Object variableHeader = mqttMessage.variableHeader();
        Intrinsics.checkNotNull(variableHeader, "null cannot be cast to non-null type io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader");
        int messageId = ((MqttMessageIdVariableHeader) variableHeader).messageId();
        findAndRemovePendingMessage(messageId, new Function1<MqttPendingMessage, Unit>() { // from class: io.qalipsis.plugins.netty.mqtt.MqttMessageHandler$handlePublishRelease$1
            /* JADX INFO: Access modifiers changed from: package-private */
            {
                super(1);
            }

            public final void invoke(@NotNull MqttPendingMessage mqttPendingMessage) {
                MqttSubscriber mqttSubscriber;
                Intrinsics.checkNotNullParameter(mqttPendingMessage, "it");
                mqttSubscriber = MqttMessageHandler.this.subscriber;
                if (mqttSubscriber != null) {
                    mqttSubscriber.handleMessage(((MqttPublishReceivedPendingMessage) mqttPendingMessage).getRetainedPublishMessage());
                }
            }

            public /* bridge */ /* synthetic */ Object invoke(Object obj) {
                invoke((MqttPendingMessage) obj);
                return Unit.INSTANCE;
            }
        });
        sendPublishComplete(messageId);
    }

    private final void handlePublishCompleted(MqttMessage mqttMessage) {
        Object variableHeader = mqttMessage.variableHeader();
        Intrinsics.checkNotNull(variableHeader, "null cannot be cast to non-null type io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader");
        findAndRemovePendingMessage$default(this, ((MqttMessageIdVariableHeader) variableHeader).messageId(), null, 2, null);
    }

    private final void sendPublishComplete(int i) {
        sendAndFlushPacket(buildMessageByTypeAndId(MqttMessageType.PUBCOMP, i));
    }

    private final void handlePublishReceived(MqttMessage mqttMessage) {
        Object variableHeader = mqttMessage.variableHeader();
        Intrinsics.checkNotNull(variableHeader, "null cannot be cast to non-null type io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader");
        int messageId = ((MqttMessageIdVariableHeader) variableHeader).messageId();
        findAndRemovePendingMessage$default(this, messageId, null, 2, null);
        sendPublishReleaseMessage(messageId);
    }

    private final void sendPublishReleaseMessage(int i) {
        MqttMessage buildMessageByTypeAndId = buildMessageByTypeAndId(MqttMessageType.PUBREL, i);
        sendAndFlushPacket(buildMessageByTypeAndId);
        addPendingMessage(buildMessageByTypeAndId, i);
    }

    private final MqttMessage buildMessageByTypeAndId(MqttMessageType mqttMessageType, int i) {
        return new MqttMessage(new MqttFixedHeader(mqttMessageType, false, MqttQoS.AT_MOST_ONCE, false, 0), MqttMessageIdVariableHeader.from(i));
    }

    private final void findAndRemovePendingMessage(int i, Function1<? super MqttPendingMessage, Unit> function1) {
        int i2;
        int i3 = 0;
        Iterator<MqttPendingMessage> it = this.pendingMessages.iterator();
        while (true) {
            if (!it.hasNext()) {
                i2 = -1;
                break;
            }
            if (it.next().getPacketId() == i) {
                i2 = i3;
                break;
            }
            i3++;
        }
        int i4 = i2;
        if (i4 >= 0) {
            MqttPendingMessage remove = this.pendingMessages.remove(i4);
            remove.onResponse();
            if (function1 != null) {
                function1.invoke(remove);
            }
        }
    }

    static /* synthetic */ void findAndRemovePendingMessage$default(MqttMessageHandler mqttMessageHandler, int i, Function1 function1, int i2, Object obj) {
        if ((i2 & 2) != 0) {
            function1 = null;
        }
        mqttMessageHandler.findAndRemovePendingMessage(i, function1);
    }

    private final void addPendingMessage(MqttMessage mqttMessage, int i) {
        MqttPendingMessage mqttPendingMessage = new MqttPendingMessage(mqttMessage, i);
        this.pendingMessages.add(mqttPendingMessage);
        Channel channel = this.channel;
        Intrinsics.checkNotNull(channel);
        EventExecutor next = channel.eventLoop().next();
        Intrinsics.checkNotNullExpressionValue(next, "channel!!.eventLoop().next()");
        mqttPendingMessage.start((EventLoop) next, mqttMessage2 -> {
            this.sendAndFlushPacket(mqttMessage2);
        });
    }

    public final void close() {
        new CountDownLatch(this.pendingMessages.size()).await(CLOSE_TIMEOUT, TimeUnit.SECONDS);
        Logger logger = log;
        try {
            Channel channel = this.channel;
            ChannelFuture close = channel != null ? channel.close() : null;
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
            throw e;
        }
    }

    private static final void connectionListener$lambda$1$lambda$0(MqttMessageHandler mqttMessageHandler, Future future) {
        Intrinsics.checkNotNullParameter(mqttMessageHandler, "this$0");
        mqttMessageHandler.close();
        log.info(new Function0<Object>() { // from class: io.qalipsis.plugins.netty.mqtt.MqttMessageHandler$connectionListener$1$1$1
            @Nullable
            public final Object invoke() {
                return "Closing connection to MQTT broker";
            }
        });
    }

    private static final void connectionListener$lambda$1(ChannelFuture channelFuture, MqttMessageHandler mqttMessageHandler, Future future) {
        Intrinsics.checkNotNullParameter(channelFuture, "$channelFuture");
        Intrinsics.checkNotNullParameter(mqttMessageHandler, "this$0");
        if (future.isSuccess()) {
            Channel channel = channelFuture.channel();
            if (channel != null) {
                ChannelFuture closeFuture = channel.closeFuture();
                if (closeFuture != null) {
                    closeFuture.addListener((v1) -> {
                        connectionListener$lambda$1$lambda$0(r1, v1);
                    });
                }
            }
        }
    }

    static {
        LoggerHelper loggerHelper = LoggerHelper.INSTANCE;
        Companion companion = Companion;
        log = KotlinLogging.INSTANCE.logger(new Function0<Unit>() { // from class: io.qalipsis.plugins.netty.mqtt.MqttMessageHandler$special$$inlined$logger$1
            public final void invoke() {
            }

            /* renamed from: invoke, reason: collision with other method in class */
            public /* bridge */ /* synthetic */ Object m176invoke() {
                invoke();
                return Unit.INSTANCE;
            }
        });
    }
}
