package tech.smartboot.mqtt.broker;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.smartboot.socket.timer.TimerTask;
import org.smartboot.socket.transport.AioSession;
import tech.smartboot.mqtt.broker.topic.BrokerTopicImpl;
import tech.smartboot.mqtt.broker.topic.DeliverGroup;
import tech.smartboot.mqtt.broker.topic.deliver.AbstractMessageDeliver;
import tech.smartboot.mqtt.broker.topic.deliver.Qos0MessageDeliver;
import tech.smartboot.mqtt.broker.topic.deliver.Qos12MessageDeliver;
import tech.smartboot.mqtt.common.AbstractSession;
import tech.smartboot.mqtt.common.AsyncTask;
import tech.smartboot.mqtt.common.MqttWriter;
import tech.smartboot.mqtt.common.TopicToken;
import tech.smartboot.mqtt.common.enums.MqttQoS;
import tech.smartboot.mqtt.common.message.MqttMessage;
import tech.smartboot.mqtt.common.message.MqttPublishMessage;
import tech.smartboot.mqtt.common.message.variable.properties.ConnectProperties;
import tech.smartboot.mqtt.common.util.ValidateUtils;
import tech.smartboot.mqtt.plugin.spec.MessageDeliver;
import tech.smartboot.mqtt.plugin.spec.MqttSession;
import tech.smartboot.mqtt.plugin.spec.bus.EventObject;
import tech.smartboot.mqtt.plugin.spec.bus.EventType;
import tech.smartboot.mqtt.plugin.spec.provider.SessionState;

/* loaded from: input_file:tech/smartboot/mqtt/broker/MqttSessionImpl.class */
public class MqttSessionImpl extends AbstractSession implements MqttSession {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) MqttSessionImpl.class);
    private final Map<String, TopicSubscription> subscribers = new ConcurrentHashMap();
    private final BrokerContextImpl mqttContext;
    private boolean authorized;
    private MqttPublishMessage willMessage;
    private boolean cleanSession;
    private ConnectProperties properties;
    TimerTask idleConnectTimer;
    private long latestReceiveMessageTime;

    public MqttSessionImpl(BrokerContextImpl brokerContextImpl, AioSession aioSession, MqttWriter mqttWriter) {
        this.mqttContext = brokerContextImpl;
        this.session = aioSession;
        this.mqttWriter = mqttWriter;
        this.idleConnectTimer = brokerContextImpl.getTimer().schedule(new AsyncTask() { // from class: tech.smartboot.mqtt.broker.MqttSessionImpl.1
            @Override // tech.smartboot.mqtt.common.AsyncTask
            public void execute() {
                if (MqttSessionImpl.this.isAuthorized()) {
                    return;
                }
                MqttSessionImpl.LOGGER.info("长时间未收到客户端：{} 的Connect消息，连接断开！", MqttSessionImpl.this.getClientId());
                MqttSessionImpl.this.disconnect();
            }
        }, brokerContextImpl.Options().getNoConnectIdleTimeout(), TimeUnit.MILLISECONDS);
        brokerContextImpl.getEventBus().publish(EventType.SESSION_CREATE, this);
    }

    public ConnectProperties getProperties() {
        return this.properties;
    }

    public void setProperties(ConnectProperties connectProperties) {
        this.properties = connectProperties;
    }

    public boolean isCleanSession() {
        return this.cleanSession;
    }

    public void setCleanSession(boolean z) {
        this.cleanSession = z;
    }

    @Override // tech.smartboot.mqtt.common.AbstractSession
    public void accepted(MqttPublishMessage mqttPublishMessage) {
        this.mqttContext.getMessageBus().publish(this, mqttPublishMessage);
    }

    @Override // tech.smartboot.mqtt.common.AbstractSession, tech.smartboot.mqtt.plugin.spec.MqttSession
    public void write(MqttMessage mqttMessage, boolean z) {
        super.write(mqttMessage, z);
        if (EventBusImpl.WRITE_MESSAGE_SUBSCRIBER_LIST.isEmpty()) {
            return;
        }
        this.mqttContext.getEventBus().publish(EventType.WRITE_MESSAGE, EventObject.newEventObject(this, mqttMessage), EventBusImpl.WRITE_MESSAGE_SUBSCRIBER_LIST);
    }

    @Override // tech.smartboot.mqtt.common.AbstractSession, tech.smartboot.mqtt.plugin.spec.MqttSession
    public synchronized void disconnect() {
        if (isDisconnect()) {
            return;
        }
        if (isAuthorized()) {
            if (this.cleanSession) {
                this.mqttContext.getProviders().getSessionStateProvider().remove(this.clientId);
            } else {
                SessionState sessionState = new SessionState();
                this.subscribers.values().forEach(topicSubscription -> {
                    sessionState.getSubscribers().put(topicSubscription.getTopicFilterToken().getTopicFilter(), topicSubscription.getMqttQoS());
                });
                this.mqttContext.getProviders().getSessionStateProvider().store(this.clientId, sessionState);
            }
        }
        if (this.willMessage != null) {
            this.mqttContext.getMessageBus().publish(this, this.willMessage);
        }
        this.subscribers.keySet().forEach(this::unsubscribe);
        MqttSession removeSession = this.mqttContext.removeSession(getClientId());
        if (removeSession != null && removeSession != this) {
            LOGGER.error("remove old session success:{}", removeSession);
            removeSession.disconnect();
        }
        LOGGER.debug("remove mqttSession success:{}", removeSession);
        this.disconnect = true;
        try {
            this.session.close(false);
        } finally {
            this.mqttContext.getEventBus().publish(EventType.DISCONNECT, this);
        }
    }

    public void setClientId(String str) {
        this.clientId = str;
    }

    public MqttQoS subscribe(String str, MqttQoS mqttQoS) {
        if (!this.mqttContext.getProviders().getSubscribeProvider().subscribeTopic(str, this)) {
            return MqttQoS.FAILURE;
        }
        subscribe0(str, mqttQoS);
        return mqttQoS;
    }

    private void subscribe0(String str, MqttQoS mqttQoS) {
        TopicSubscription topicSubscription = this.subscribers.get(str);
        if (topicSubscription != null) {
            topicSubscription.setMqttQoS(mqttQoS);
            return;
        }
        TopicToken topicToken = new TopicToken(str);
        if (!topicToken.isWildcards()) {
            if (topicToken.isShared()) {
                String[] split = str.split("/", 3);
                if (split.length >= 3) {
                    this.mqttContext.getOrCreateTopic(split[2]);
                }
            } else {
                this.mqttContext.getOrCreateTopic(str);
            }
        }
        TopicSubscription topicSubscription2 = new TopicSubscription(topicToken, mqttQoS);
        ValidateUtils.isTrue(this.subscribers.put(str, topicSubscription2) == null, "duplicate topic filter");
        this.mqttContext.getTopicSubscribeTree().subscribeTopic(this, topicSubscription2);
        this.mqttContext.getPublishTopicTree().matchSubscriptionToTopics(topicSubscription2, brokerTopicImpl -> {
            subscribeSuccess(topicSubscription2, brokerTopicImpl);
        });
    }

    public void subscribeSuccess(TopicSubscription topicSubscription, BrokerTopicImpl brokerTopicImpl) {
        TopicToken topicFilterToken = topicSubscription.getTopicFilterToken();
        if (this.mqttContext.getProviders().getSubscribeProvider().matchTopic(brokerTopicImpl, this)) {
            DeliverGroup subscriberGroup = brokerTopicImpl.getSubscriberGroup(topicFilterToken);
            MessageDeliver subscriber = subscriberGroup.getSubscriber(this);
            if (subscriber == null) {
                Qos0MessageDeliver newConsumerRecord = newConsumerRecord(brokerTopicImpl, topicSubscription, brokerTopicImpl.getMessageQueue().getLatestOffset() + 1);
                this.mqttContext.getEventBus().publish(EventType.SUBSCRIBE_TOPIC, EventObject.newEventObject(this, newConsumerRecord));
                subscriberGroup.addSubscriber(newConsumerRecord);
                this.subscribers.get(topicFilterToken.getTopicFilter()).getTopicSubscribers().put(brokerTopicImpl, newConsumerRecord);
                return;
            }
            TopicToken topicFilterToken2 = subscriber.getTopicFilterToken();
            if (topicFilterToken.isShared()) {
                ValidateUtils.isTrue(topicFilterToken2.getTopicFilter().equals(topicSubscription.getTopicFilterToken().getTopicFilter()), "invalid subscriber");
                Qos0MessageDeliver qos0MessageDeliver = new Qos0MessageDeliver(brokerTopicImpl, this, topicSubscription, brokerTopicImpl.getMessageQueue().getLatestOffset() + 1) { // from class: tech.smartboot.mqtt.broker.MqttSessionImpl.2
                    @Override // tech.smartboot.mqtt.broker.topic.deliver.Qos0MessageDeliver, tech.smartboot.mqtt.broker.topic.deliver.AbstractMessageDeliver
                    public void pushToClient() {
                        throw new IllegalStateException();
                    }
                };
                subscriberGroup.addSubscriber(qos0MessageDeliver);
                this.subscribers.get(topicFilterToken.getTopicFilter()).getTopicSubscribers().put(brokerTopicImpl, qos0MessageDeliver);
                return;
            }
            if (topicFilterToken2.isWildcards()) {
                if (!topicFilterToken.isWildcards() || topicFilterToken.getTopicFilter().length() > topicFilterToken2.getTopicFilter().length()) {
                    AbstractMessageDeliver remove = this.subscribers.get(topicFilterToken2.getTopicFilter()).getTopicSubscribers().remove(brokerTopicImpl);
                    ValidateUtils.isTrue(remove == subscriber, "invalid consumerRecord");
                    remove.disable();
                    Qos0MessageDeliver newConsumerRecord2 = newConsumerRecord(brokerTopicImpl, topicSubscription, remove.getNextConsumerOffset());
                    this.subscribers.get(topicFilterToken.getTopicFilter()).getTopicSubscribers().put(brokerTopicImpl, newConsumerRecord2);
                    this.mqttContext.getEventBus().publish(EventType.SUBSCRIBE_REFRESH_TOPIC, newConsumerRecord2);
                }
            }
        }
    }

    private Qos0MessageDeliver newConsumerRecord(BrokerTopicImpl brokerTopicImpl, TopicSubscription topicSubscription, long j) {
        return topicSubscription.getMqttQoS() == MqttQoS.AT_MOST_ONCE ? new Qos0MessageDeliver(brokerTopicImpl, this, topicSubscription, j) : new Qos12MessageDeliver(brokerTopicImpl, this, topicSubscription, j);
    }

    public void resubscribe() {
        this.subscribers.values().stream().filter(topicSubscription -> {
            return topicSubscription.getTopicFilterToken().isWildcards();
        }).forEach(topicSubscription2 -> {
            this.mqttContext.getPublishTopicTree().matchSubscriptionToTopics(topicSubscription2, brokerTopicImpl -> {
                subscribeSuccess(topicSubscription2, brokerTopicImpl);
            });
        });
    }

    public void unsubscribe(String str) {
        TopicSubscription remove = this.subscribers.remove(str);
        if (remove == null) {
            LOGGER.warn("unsubscribe waring! topic:{} is not exists", str);
        } else {
            remove.getTopicSubscribers().forEach((brokerTopicImpl, abstractMessageDeliver) -> {
                DeliverGroup subscriberGroup = brokerTopicImpl.getSubscriberGroup(remove.getTopicFilterToken());
                AbstractMessageDeliver removeSubscriber = subscriberGroup.removeSubscriber(this);
                if (brokerTopicImpl.subscribeCount() == 0) {
                    LOGGER.info("clear topic: {} message queue", brokerTopicImpl.getTopicFilter());
                    brokerTopicImpl.getMessageQueue().clear();
                }
                if (abstractMessageDeliver != removeSubscriber) {
                    LOGGER.error("remove subscriber:{} error!", subscriberGroup);
                    return;
                }
                removeSubscriber.disable();
                this.mqttContext.getEventBus().publish(EventType.UNSUBSCRIBE_TOPIC, removeSubscriber);
                LOGGER.debug("remove subscriber:{} success!", brokerTopicImpl.getTopicFilter());
            });
            this.mqttContext.getTopicSubscribeTree().unsubscribe(this, remove);
        }
    }

    @Override // tech.smartboot.mqtt.plugin.spec.MqttSession
    public boolean isAuthorized() {
        return this.authorized;
    }

    @Override // tech.smartboot.mqtt.plugin.spec.MqttSession
    public void setAuthorized(boolean z) {
        this.authorized = z;
    }

    public void setWillMessage(MqttPublishMessage mqttPublishMessage) {
        this.willMessage = mqttPublishMessage;
    }

    @Override // tech.smartboot.mqtt.plugin.spec.MqttSession
    public long getLatestReceiveMessageTime() {
        return this.latestReceiveMessageTime;
    }

    public void setLatestReceiveMessageTime(long j) {
        this.latestReceiveMessageTime = j;
    }
}
