package tech.smartboot.mqtt.broker.processor;

import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tech.smartboot.mqtt.broker.BrokerContextImpl;
import tech.smartboot.mqtt.broker.MqttSessionImpl;
import tech.smartboot.mqtt.common.InflightQueue;
import tech.smartboot.mqtt.common.enums.MqttConnectReturnCode;
import tech.smartboot.mqtt.common.enums.MqttProtocolEnum;
import tech.smartboot.mqtt.common.enums.MqttQoS;
import tech.smartboot.mqtt.common.enums.MqttVersion;
import tech.smartboot.mqtt.common.message.MqttConnectMessage;
import tech.smartboot.mqtt.common.message.payload.MqttConnectPayload;
import tech.smartboot.mqtt.common.message.payload.WillMessage;
import tech.smartboot.mqtt.common.message.variable.MqttConnectVariableHeader;
import tech.smartboot.mqtt.common.message.variable.properties.ConnectAckProperties;
import tech.smartboot.mqtt.common.message.variable.properties.PublishProperties;
import tech.smartboot.mqtt.common.util.MqttUtil;
import tech.smartboot.mqtt.common.util.ValidateUtils;
import tech.smartboot.mqtt.plugin.spec.BrokerContext;
import tech.smartboot.mqtt.plugin.spec.MqttProcessor;
import tech.smartboot.mqtt.plugin.spec.MqttSession;
import tech.smartboot.mqtt.plugin.spec.PublishBuilder;
import tech.smartboot.mqtt.plugin.spec.bus.EventObject;
import tech.smartboot.mqtt.plugin.spec.bus.EventType;
import tech.smartboot.mqtt.plugin.spec.provider.SessionState;

/* loaded from: input_file:tech/smartboot/mqtt/broker/processor/ConnectProcessor.class */
public class ConnectProcessor implements MqttProcessor<BrokerContextImpl, MqttConnectMessage, MqttSessionImpl> {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) ConnectProcessor.class);
    private static final int MAX_CLIENT_ID_LENGTH = 23;

    @Override // tech.smartboot.mqtt.plugin.spec.MqttProcessor
    public void process(BrokerContextImpl brokerContextImpl, MqttSessionImpl mqttSessionImpl, MqttConnectMessage mqttConnectMessage) {
        String clientId = mqttConnectMessage.getPayload().clientId();
        if (clientId.isEmpty()) {
            clientId = MqttUtil.createClientId();
        }
        mqttSessionImpl.setClientId(clientId);
        mqttSessionImpl.setMqttVersion(mqttConnectMessage.getVersion());
        checkMessage(mqttSessionImpl, mqttConnectMessage);
        brokerContextImpl.getEventBus().publish(EventType.CONNECT, EventObject.newEventObject(mqttSessionImpl, mqttConnectMessage));
        if (mqttSessionImpl.isDisconnect()) {
            LOGGER.warn("session is disconnected when consume CONNECT event");
            return;
        }
        mqttSessionImpl.setAuthorized(true);
        refreshSession(brokerContextImpl, mqttSessionImpl, mqttConnectMessage);
        storeWillMessage(brokerContextImpl, mqttSessionImpl, mqttConnectMessage);
        mqttSessionImpl.setProperties(mqttConnectMessage.getVariableHeader().getProperties());
        int min = mqttSessionImpl.getMqttVersion() == MqttVersion.MQTT_5 ? Math.min(mqttConnectMessage.getVariableHeader().getProperties().getReceiveMaximum(), brokerContextImpl.Options().getMaxInflight()) : brokerContextImpl.Options().getMaxInflight();
        mqttSessionImpl.setInflightQueue(new InflightQueue(mqttSessionImpl, min, brokerContextImpl.getTimer()));
        ConnectAckProperties connectAckProperties = null;
        if (mqttSessionImpl.getMqttVersion() == MqttVersion.MQTT_5) {
            connectAckProperties = new ConnectAckProperties();
            connectAckProperties.setReceiveMaximum(min);
        }
        mqttSessionImpl.write(MqttSession.connAck(MqttConnectReturnCode.CONNECTION_ACCEPTED, !mqttConnectMessage.getVariableHeader().isCleanSession(), connectAckProperties));
        LOGGER.debug("CONNECT message processed CId={}", mqttSessionImpl.getClientId());
    }

    private void checkMessage(MqttSession mqttSession, MqttConnectMessage mqttConnectMessage) {
        MqttConnectVariableHeader variableHeader = mqttConnectMessage.getVariableHeader();
        MqttProtocolEnum byName = MqttProtocolEnum.getByName(variableHeader.protocolName());
        ValidateUtils.notNull(byName, "invalid protocol", () -> {
            LOGGER.error("invalid protocol:{}", variableHeader.protocolName());
            if (mqttSession.getMqttVersion() == MqttVersion.MQTT_5) {
                MqttSession.connFailAck(MqttConnectReturnCode.UNSUPPORTED_PROTOCOL_VERSION, mqttSession);
            }
            mqttSession.disconnect();
        });
        MqttConnectPayload payload = mqttConnectMessage.getPayload();
        String clientId = payload.clientId();
        MqttVersion byProtocolWithVersion = MqttVersion.getByProtocolWithVersion(byName, variableHeader.getProtocolLevel());
        ValidateUtils.notNull(byProtocolWithVersion, "invalid version", () -> {
            MqttSession.connFailAck(MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION, mqttSession);
        });
        boolean z = variableHeader.getReserved() == 0;
        mqttSession.getClass();
        ValidateUtils.isTrue(z, "", mqttSession::disconnect);
        ValidateUtils.isTrue(!(StringUtils.isNotBlank(clientId) && byProtocolWithVersion == MqttVersion.MQTT_3_1 && clientId.length() > 23), "", () -> {
            MqttSession.connFailAck(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED, mqttSession);
            LOGGER.error("The MQTT client ID cannot be empty. Username={}", payload.userName());
        });
        ValidateUtils.isTrue(variableHeader.isCleanSession() || !StringUtils.isBlank(clientId), "", () -> {
            MqttSession.connFailAck(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED, mqttSession);
            LOGGER.error("The MQTT client ID cannot be empty. Username={}", payload.userName());
        });
    }

    private void refreshSession(BrokerContextImpl brokerContextImpl, MqttSessionImpl mqttSessionImpl, MqttConnectMessage mqttConnectMessage) {
        mqttSessionImpl.setCleanSession(mqttConnectMessage.getVariableHeader().isCleanSession());
        MqttSessionImpl session = brokerContextImpl.getSession(mqttSessionImpl.getClientId());
        if (session != null) {
            if (mqttSessionImpl.isCleanSession()) {
                session.setCleanSession(true);
                LOGGER.info("disconnect session:{}", session);
                session.disconnect();
            } else {
                session.disconnect();
                SessionState sessionState = brokerContextImpl.getProviders().getSessionStateProvider().get(mqttSessionImpl.getClientId());
                if (sessionState != null) {
                    Map<String, MqttQoS> subscribers = sessionState.getSubscribers();
                    mqttSessionImpl.getClass();
                    subscribers.forEach(mqttSessionImpl::subscribe);
                }
            }
        }
        brokerContextImpl.addSession(mqttSessionImpl);
        LOGGER.debug("add session for client:{}", mqttSessionImpl);
    }

    private void storeWillMessage(BrokerContext brokerContext, MqttSessionImpl mqttSessionImpl, MqttConnectMessage mqttConnectMessage) {
        if (mqttConnectMessage.getVariableHeader().isWillFlag()) {
            WillMessage willMessage = mqttConnectMessage.getPayload().getWillMessage();
            PublishBuilder retained = PublishBuilder.builder().topic(brokerContext.getOrCreateTopic(willMessage.getTopic())).qos(MqttQoS.valueOf(mqttConnectMessage.getVariableHeader().willQos())).payload(willMessage.getPayload()).retained(mqttConnectMessage.getFixedHeader().isRetain());
            if (mqttSessionImpl.getMqttVersion() == MqttVersion.MQTT_5) {
                retained.publishProperties(new PublishProperties());
            }
            mqttSessionImpl.setWillMessage(retained.build());
        }
    }
}
