package org.dromara.mica.mqtt.core.client;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import org.dromara.mica.mqtt.codec.MqttConnAckMessage;
import org.dromara.mica.mqtt.codec.MqttConnectReasonCode;
import org.dromara.mica.mqtt.codec.MqttFixedHeader;
import org.dromara.mica.mqtt.codec.MqttMessage;
import org.dromara.mica.mqtt.codec.MqttMessageBuilders;
import org.dromara.mica.mqtt.codec.MqttMessageIdVariableHeader;
import org.dromara.mica.mqtt.codec.MqttMessageType;
import org.dromara.mica.mqtt.codec.MqttPubAckMessage;
import org.dromara.mica.mqtt.codec.MqttPublishMessage;
import org.dromara.mica.mqtt.codec.MqttPublishVariableHeader;
import org.dromara.mica.mqtt.codec.MqttQoS;
import org.dromara.mica.mqtt.codec.MqttSubAckMessage;
import org.dromara.mica.mqtt.codec.MqttSubscribeMessage;
import org.dromara.mica.mqtt.codec.MqttTopicSubscription;
import org.dromara.mica.mqtt.codec.MqttUnsubAckMessage;
import org.dromara.mica.mqtt.core.common.MqttPendingPublish;
import org.dromara.mica.mqtt.core.common.MqttPendingQos2Publish;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.core.ChannelContext;
import org.tio.core.Node;
import org.tio.core.Tio;
import org.tio.utils.hutool.CollUtil;
import org.tio.utils.timer.TimerTaskService;

/* loaded from: input_file:org/dromara/mica/mqtt/core/client/DefaultMqttClientProcessor.class */
public class DefaultMqttClientProcessor implements IMqttClientProcessor {
    private static final Logger logger = LoggerFactory.getLogger(DefaultMqttClientProcessor.class);
    private final MqttClientCreator mqttClientCreator;
    private final IMqttClientSession clientSession;
    private final IMqttClientConnectListener connectListener;
    private final IMqttClientGlobalMessageListener globalMessageListener;
    private final IMqttClientMessageIdGenerator messageIdGenerator;
    private final TimerTaskService taskService;
    private final ExecutorService executor;

    /* renamed from: org.dromara.mica.mqtt.core.client.DefaultMqttClientProcessor$1, reason: invalid class name */
    /* loaded from: input_file:org/dromara/mica/mqtt/core/client/DefaultMqttClientProcessor$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$dromara$mica$mqtt$codec$MqttConnectReasonCode;
        static final /* synthetic */ int[] $SwitchMap$org$dromara$mica$mqtt$codec$MqttQoS = new int[MqttQoS.values().length];

        static {
            try {
                $SwitchMap$org$dromara$mica$mqtt$codec$MqttQoS[MqttQoS.QOS0.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$dromara$mica$mqtt$codec$MqttQoS[MqttQoS.QOS1.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$dromara$mica$mqtt$codec$MqttQoS[MqttQoS.QOS2.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$dromara$mica$mqtt$codec$MqttQoS[MqttQoS.FAILURE.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            $SwitchMap$org$dromara$mica$mqtt$codec$MqttConnectReasonCode = new int[MqttConnectReasonCode.values().length];
            try {
                $SwitchMap$org$dromara$mica$mqtt$codec$MqttConnectReasonCode[MqttConnectReasonCode.CONNECTION_ACCEPTED.ordinal()] = 1;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$dromara$mica$mqtt$codec$MqttConnectReasonCode[MqttConnectReasonCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD.ordinal()] = 2;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$org$dromara$mica$mqtt$codec$MqttConnectReasonCode[MqttConnectReasonCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED.ordinal()] = 3;
            } catch (NoSuchFieldError e7) {
            }
            try {
                $SwitchMap$org$dromara$mica$mqtt$codec$MqttConnectReasonCode[MqttConnectReasonCode.CONNECTION_REFUSED_NOT_AUTHORIZED.ordinal()] = 4;
            } catch (NoSuchFieldError e8) {
            }
            try {
                $SwitchMap$org$dromara$mica$mqtt$codec$MqttConnectReasonCode[MqttConnectReasonCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE.ordinal()] = 5;
            } catch (NoSuchFieldError e9) {
            }
            try {
                $SwitchMap$org$dromara$mica$mqtt$codec$MqttConnectReasonCode[MqttConnectReasonCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION.ordinal()] = 6;
            } catch (NoSuchFieldError e10) {
            }
        }
    }

    public DefaultMqttClientProcessor(MqttClientCreator mqttClientCreator) {
        this.mqttClientCreator = mqttClientCreator;
        this.clientSession = mqttClientCreator.getClientSession();
        this.connectListener = mqttClientCreator.getConnectListener();
        this.globalMessageListener = mqttClientCreator.getGlobalMessageListener();
        this.messageIdGenerator = mqttClientCreator.getMessageIdGenerator();
        this.taskService = mqttClientCreator.getTaskService();
        this.executor = mqttClientCreator.getMqttExecutor();
    }

    @Override // org.dromara.mica.mqtt.core.client.IMqttClientProcessor
    public void processDecodeFailure(ChannelContext channelContext, MqttMessage mqttMessage, Throwable th) {
        logger.error(th.getMessage(), th);
    }

    @Override // org.dromara.mica.mqtt.core.client.IMqttClientProcessor
    public void processConAck(ChannelContext channelContext, MqttConnAckMessage mqttConnAckMessage) {
        MqttConnectReasonCode connectReturnCode = mqttConnAckMessage.variableHeader().connectReturnCode();
        switch (AnonymousClass1.$SwitchMap$org$dromara$mica$mqtt$codec$MqttConnectReasonCode[connectReturnCode.ordinal()]) {
            case 1:
                channelContext.setAccepted(true);
                if (logger.isInfoEnabled()) {
                    Node serverNode = channelContext.getServerNode();
                    logger.info("MqttClient contextId:{} connection:{}:{} succeeded!", new Object[]{channelContext.getId(), serverNode.getIp(), Integer.valueOf(serverNode.getPort())});
                }
                publishConnectEvent(channelContext);
                reSendSubscription(channelContext);
                return;
            case 2:
            case 3:
            case 4:
            case 5:
            case 6:
            default:
                Tio.close(channelContext, "MqttClient connect error error ReturnCode:" + connectReturnCode);
                return;
        }
    }

    private void publishConnectEvent(ChannelContext channelContext) {
        if (this.connectListener == null) {
            return;
        }
        this.executor.submit(() -> {
            try {
                this.connectListener.onConnected(channelContext, channelContext.isReconnect());
            } catch (Throwable th) {
                logger.error(th.getMessage(), th);
            }
        });
    }

    private void reSendSubscription(ChannelContext channelContext) {
        Set<MqttTopicSubscription> globalSubscribe = this.mqttClientCreator.getGlobalSubscribe();
        if (globalSubscribe != null && !globalSubscribe.isEmpty()) {
            globalReSendSubscription(channelContext, globalSubscribe);
        }
        List<MqttClientSubscription> subscriptions = this.clientSession.getSubscriptions();
        if (subscriptions.isEmpty()) {
            return;
        }
        int size = subscriptions.size();
        int reSubscribeBatchSize = this.mqttClientCreator.getReSubscribeBatchSize();
        if (size <= reSubscribeBatchSize) {
            reSendSubscription(channelContext, subscriptions);
            return;
        }
        Iterator it = CollUtil.partition(subscriptions, reSubscribeBatchSize).iterator();
        while (it.hasNext()) {
            reSendSubscription(channelContext, (List) it.next());
        }
    }

    private void globalReSendSubscription(ChannelContext channelContext, Set<MqttTopicSubscription> set) {
        int id = this.messageIdGenerator.getId();
        logger.info("MQTT globalReSubscriptionList:{} packetId:{} resubscribing result:{}", new Object[]{set, Integer.valueOf(id), Boolean.valueOf(Tio.send(channelContext, MqttMessageBuilders.subscribe().addSubscriptions(set).messageId(id).build()))});
    }

    private void reSendSubscription(ChannelContext channelContext, List<MqttClientSubscription> list) {
        List list2 = (List) list.stream().map((v0) -> {
            return v0.toTopicSubscription();
        }).collect(Collectors.toList());
        int id = this.messageIdGenerator.getId();
        MqttSubscribeMessage build = MqttMessageBuilders.subscribe().addSubscriptions(list2).messageId(id).build();
        MqttPendingSubscription mqttPendingSubscription = new MqttPendingSubscription(list, build);
        mqttPendingSubscription.startRetransmitTimer(this.taskService, channelContext);
        this.clientSession.addPaddingSubscribe(id, mqttPendingSubscription);
        logger.info("MQTT subscriptionList:{} packetId:{} resubscribing result:{}", new Object[]{list, Integer.valueOf(id), Boolean.valueOf(Tio.send(channelContext, build))});
    }

    @Override // org.dromara.mica.mqtt.core.client.IMqttClientProcessor
    public void processSubAck(ChannelContext channelContext, MqttSubAckMessage mqttSubAckMessage) {
        int messageId = mqttSubAckMessage.variableHeader().messageId();
        logger.debug("MqttClient SubAck packetId:{}", Integer.valueOf(messageId));
        MqttPendingSubscription paddingSubscribe = this.clientSession.getPaddingSubscribe(messageId);
        if (paddingSubscribe == null) {
            return;
        }
        List<MqttClientSubscription> subscriptionList = paddingSubscribe.getSubscriptionList();
        List reasonCodes = mqttSubAckMessage.payload().reasonCodes();
        if (reasonCodes.isEmpty()) {
            logger.error("MqttClient subscriptionList:{} subscribe failed reasonCodes is empty packetId:{}", subscriptionList, Integer.valueOf(messageId));
            return;
        }
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < subscriptionList.size(); i++) {
            MqttClientSubscription mqttClientSubscription = subscriptionList.get(i);
            String topicFilter = mqttClientSubscription.getTopicFilter();
            Integer num = (Integer) reasonCodes.get(i);
            if (num == null || num.intValue() < 0 || num.intValue() > 2) {
                logger.error("MqttClient topicFilter:{} subscribe failed reasonCodes:{} packetId:{}", new Object[]{topicFilter, num, Integer.valueOf(messageId)});
            } else {
                arrayList.add(mqttClientSubscription);
            }
        }
        logger.info("MQTT subscriptionList:{} subscribed successfully packetId:{}", arrayList, Integer.valueOf(messageId));
        paddingSubscribe.onSubAckReceived();
        this.clientSession.removePaddingSubscribe(messageId);
        this.clientSession.addSubscriptionList(arrayList);
        arrayList.forEach(mqttClientSubscription2 -> {
            String topicFilter2 = mqttClientSubscription2.getTopicFilter();
            MqttQoS mqttQoS = mqttClientSubscription2.getMqttQoS();
            IMqttClientMessageListener listener = mqttClientSubscription2.getListener();
            this.executor.execute(() -> {
                try {
                    listener.onSubscribed(channelContext, topicFilter2, mqttQoS, mqttSubAckMessage);
                } catch (Throwable th) {
                    logger.error("MQTT topicFilter:{} subscribed onSubscribed event error.", arrayList, th);
                }
            });
        });
    }

    @Override // org.dromara.mica.mqtt.core.client.IMqttClientProcessor
    public void processPublish(ChannelContext channelContext, MqttPublishMessage mqttPublishMessage) {
        MqttFixedHeader fixedHeader = mqttPublishMessage.fixedHeader();
        MqttPublishVariableHeader variableHeader = mqttPublishMessage.variableHeader();
        String str = variableHeader.topicName();
        MqttQoS qosLevel = fixedHeader.qosLevel();
        int packetId = variableHeader.packetId();
        logger.debug("MqttClient received publish topic:{} qoS:{} packetId:{}", new Object[]{str, qosLevel, Integer.valueOf(packetId)});
        switch (AnonymousClass1.$SwitchMap$org$dromara$mica$mqtt$codec$MqttQoS[fixedHeader.qosLevel().ordinal()]) {
            case 1:
                invokeListenerForPublish(channelContext, str, mqttPublishMessage);
                return;
            case 2:
                invokeListenerForPublish(channelContext, str, mqttPublishMessage);
                if (packetId != -1) {
                    logger.debug("Publish - PubAck send topicName:{} mqttQoS:{} packetId:{} result:{}", new Object[]{str, qosLevel, Integer.valueOf(packetId), Boolean.valueOf(Tio.send(channelContext, MqttMessageBuilders.pubAck().packetId(packetId).build()))});
                    return;
                }
                return;
            case 3:
                if (packetId != -1) {
                    MqttMessage mqttMessage = new MqttMessage(new MqttFixedHeader(MqttMessageType.PUBREC, false, MqttQoS.QOS0, false, 0), MqttMessageIdVariableHeader.from(packetId));
                    MqttPendingQos2Publish mqttPendingQos2Publish = new MqttPendingQos2Publish(mqttPublishMessage, mqttMessage);
                    this.clientSession.addPendingQos2Publish(packetId, mqttPendingQos2Publish);
                    mqttPendingQos2Publish.startPubRecRetransmitTimer(this.taskService, channelContext);
                    logger.debug("Publish - PubRec send topicName:{} mqttQoS:{} packetId:{} result:{}", new Object[]{str, qosLevel, Integer.valueOf(packetId), Boolean.valueOf(Tio.send(channelContext, mqttMessage))});
                    return;
                }
                return;
            case 4:
            default:
                return;
        }
    }

    @Override // org.dromara.mica.mqtt.core.client.IMqttClientProcessor
    public void processUnSubAck(MqttUnsubAckMessage mqttUnsubAckMessage) {
        int messageId = mqttUnsubAckMessage.variableHeader().messageId();
        logger.debug("MqttClient UnSubAck packetId:{}", Integer.valueOf(messageId));
        MqttPendingUnSubscription paddingUnSubscribe = this.clientSession.getPaddingUnSubscribe(messageId);
        if (paddingUnSubscribe == null) {
            return;
        }
        List<String> topics = paddingUnSubscribe.getTopics();
        logger.info("MQTT Topic:{} successfully unSubscribed packetId:{}", topics, Integer.valueOf(messageId));
        paddingUnSubscribe.onUnSubAckReceived();
        this.clientSession.removePaddingUnSubscribe(messageId);
        this.clientSession.removeSubscriptions(topics);
    }

    @Override // org.dromara.mica.mqtt.core.client.IMqttClientProcessor
    public void processPubAck(MqttPubAckMessage mqttPubAckMessage) {
        int messageId = mqttPubAckMessage.variableHeader().messageId();
        logger.debug("MqttClient PubAck packetId:{}", Integer.valueOf(messageId));
        MqttPendingPublish pendingPublish = this.clientSession.getPendingPublish(messageId);
        if (pendingPublish == null) {
            return;
        }
        if (logger.isInfoEnabled()) {
            logger.info("MQTT Topic:{} successfully PubAck packetId:{}", pendingPublish.getMessage().variableHeader().topicName(), Integer.valueOf(messageId));
        }
        pendingPublish.onPubAckReceived();
        this.clientSession.removePendingPublish(messageId);
    }

    @Override // org.dromara.mica.mqtt.core.client.IMqttClientProcessor
    public void processPubRec(ChannelContext channelContext, MqttMessage mqttMessage) {
        int messageId = ((MqttMessageIdVariableHeader) mqttMessage.variableHeader()).messageId();
        logger.debug("MqttClient PubRec packetId:{}", Integer.valueOf(messageId));
        MqttPendingPublish pendingPublish = this.clientSession.getPendingPublish(messageId);
        if (pendingPublish == null) {
            return;
        }
        pendingPublish.onPubAckReceived();
        MqttMessage mqttMessage2 = new MqttMessage(new MqttFixedHeader(MqttMessageType.PUBREL, false, MqttQoS.QOS1, false, 0), (MqttMessageIdVariableHeader) mqttMessage.variableHeader());
        pendingPublish.setPubRelMessage(mqttMessage2);
        pendingPublish.startPubRelRetransmissionTimer(this.taskService, channelContext);
        logger.debug("Publish - PubRec send packetId:{} result:{}", Integer.valueOf(messageId), Boolean.valueOf(Tio.send(channelContext, mqttMessage2)));
    }

    @Override // org.dromara.mica.mqtt.core.client.IMqttClientProcessor
    public void processPubRel(ChannelContext channelContext, MqttMessage mqttMessage) {
        int messageId = ((MqttMessageIdVariableHeader) mqttMessage.variableHeader()).messageId();
        logger.debug("MqttClient PubRel packetId:{}", Integer.valueOf(messageId));
        MqttPendingQos2Publish pendingQos2Publish = this.clientSession.getPendingQos2Publish(messageId);
        if (pendingQos2Publish != null) {
            MqttPublishMessage incomingPublish = pendingQos2Publish.getIncomingPublish();
            invokeListenerForPublish(channelContext, incomingPublish.variableHeader().topicName(), incomingPublish);
            pendingQos2Publish.onPubRelReceived();
            this.clientSession.removePendingQos2Publish(messageId);
        }
        logger.debug("Publish - PubRel send packetId:{} result:{}", Integer.valueOf(messageId), Boolean.valueOf(Tio.send(channelContext, new MqttMessage(new MqttFixedHeader(MqttMessageType.PUBCOMP, false, MqttQoS.QOS0, false, 0), MqttMessageIdVariableHeader.from(messageId)))));
    }

    @Override // org.dromara.mica.mqtt.core.client.IMqttClientProcessor
    public void processPubComp(MqttMessage mqttMessage) {
        int messageId = ((MqttMessageIdVariableHeader) mqttMessage.variableHeader()).messageId();
        MqttPendingPublish pendingPublish = this.clientSession.getPendingPublish(messageId);
        if (pendingPublish == null) {
            return;
        }
        if (logger.isInfoEnabled()) {
            logger.info("MQTT Topic:{} successfully PubComp", pendingPublish.getMessage().variableHeader().topicName());
        }
        pendingPublish.onPubCompReceived();
        this.clientSession.removePendingPublish(messageId);
    }

    private void invokeListenerForPublish(ChannelContext channelContext, String str, MqttPublishMessage mqttPublishMessage) {
        byte[] payload = mqttPublishMessage.payload();
        if (this.globalMessageListener != null) {
            this.executor.submit(() -> {
                try {
                    this.globalMessageListener.onMessage(channelContext, str, mqttPublishMessage, payload);
                } catch (Throwable th) {
                    logger.error(th.getMessage(), th);
                }
            });
        }
        List<MqttClientSubscription> matchedSubscription = this.clientSession.getMatchedSubscription(str);
        if (!matchedSubscription.isEmpty()) {
            matchedSubscription.forEach(mqttClientSubscription -> {
                IMqttClientMessageListener listener = mqttClientSubscription.getListener();
                this.executor.submit(() -> {
                    try {
                        listener.onMessage(channelContext, str, mqttPublishMessage, payload);
                    } catch (Throwable th) {
                        logger.error(th.getMessage(), th);
                    }
                });
            });
        } else if (this.globalMessageListener == null || this.mqttClientCreator.isDebug()) {
            logger.warn("Mqtt message to accept topic:{} subscriptionList is empty.", str);
        } else {
            logger.debug("Mqtt message to accept topic:{} subscriptionList is empty.", str);
        }
    }
}
