package tech.smartboot.mqtt.broker;

import com.alibaba.fastjson2.JSONObject;
import com.alibaba.fastjson2.JSONPath;
import com.alibaba.fastjson2.JSONReader;
import com.alibaba.fastjson2.JSONWriter;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.smartboot.socket.buffer.BufferPagePool;
import org.smartboot.socket.enhance.EnhanceAsynchronousChannelProvider;
import org.smartboot.socket.timer.HashedWheelTimer;
import org.smartboot.socket.timer.Timer;
import org.smartboot.socket.transport.AioQuickServer;
import org.yaml.snakeyaml.Yaml;
import tech.smartboot.mqtt.broker.bus.event.KeepAliveMonitorSubscriber;
import tech.smartboot.mqtt.broker.bus.message.RetainPersistenceConsumer;
import tech.smartboot.mqtt.broker.processor.ConnectProcessor;
import tech.smartboot.mqtt.broker.processor.DisConnectProcessor;
import tech.smartboot.mqtt.broker.processor.MqttAckProcessor;
import tech.smartboot.mqtt.broker.processor.PingReqProcessor;
import tech.smartboot.mqtt.broker.processor.PubRelProcessor;
import tech.smartboot.mqtt.broker.processor.PublishProcessor;
import tech.smartboot.mqtt.broker.processor.SubscribeProcessor;
import tech.smartboot.mqtt.broker.processor.UnSubscribeProcessor;
import tech.smartboot.mqtt.broker.provider.impl.session.MemorySessionStateProvider;
import tech.smartboot.mqtt.broker.topic.BrokerTopicImpl;
import tech.smartboot.mqtt.broker.topic.BrokerTopicRegistry;
import tech.smartboot.mqtt.broker.topic.TopicSubscriptionRegistry;
import tech.smartboot.mqtt.broker.topic.deliver.AbstractMessageDeliver;
import tech.smartboot.mqtt.common.AsyncTask;
import tech.smartboot.mqtt.common.MqttProtocol;
import tech.smartboot.mqtt.common.enums.MqttQoS;
import tech.smartboot.mqtt.common.enums.MqttVersion;
import tech.smartboot.mqtt.common.message.MqttConnectMessage;
import tech.smartboot.mqtt.common.message.MqttDisconnectMessage;
import tech.smartboot.mqtt.common.message.MqttMessage;
import tech.smartboot.mqtt.common.message.MqttPingReqMessage;
import tech.smartboot.mqtt.common.message.MqttPubAckMessage;
import tech.smartboot.mqtt.common.message.MqttPubCompMessage;
import tech.smartboot.mqtt.common.message.MqttPubRecMessage;
import tech.smartboot.mqtt.common.message.MqttPubRelMessage;
import tech.smartboot.mqtt.common.message.MqttPublishMessage;
import tech.smartboot.mqtt.common.message.MqttSubscribeMessage;
import tech.smartboot.mqtt.common.message.MqttUnsubscribeMessage;
import tech.smartboot.mqtt.common.message.variable.properties.PublishProperties;
import tech.smartboot.mqtt.common.util.MqttUtil;
import tech.smartboot.mqtt.common.util.ValidateUtils;
import tech.smartboot.mqtt.plugin.spec.BrokerContext;
import tech.smartboot.mqtt.plugin.spec.Message;
import tech.smartboot.mqtt.plugin.spec.MqttProcessor;
import tech.smartboot.mqtt.plugin.spec.MqttSession;
import tech.smartboot.mqtt.plugin.spec.Options;
import tech.smartboot.mqtt.plugin.spec.Plugin;
import tech.smartboot.mqtt.plugin.spec.PublishBuilder;
import tech.smartboot.mqtt.plugin.spec.bus.EventType;
import tech.smartboot.mqtt.plugin.spec.provider.Providers;

/* loaded from: input_file:tech/smartboot/mqtt/broker/BrokerContextImpl.class */
public class BrokerContextImpl implements BrokerContext {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) BrokerContextImpl.class);
    private Options options;
    private ExecutorService pushThreadPool;
    private ExecutorService retainPushThreadPool;
    private AioQuickServer server;
    private String configJson;
    private final Map<Class<? extends MqttMessage>, MqttProcessor<?, ?, ?>> processors;
    private final ConcurrentMap<String, MqttSessionImpl> grantSessions = new ConcurrentHashMap();
    private final ConcurrentMap<String, BrokerTopicImpl> topicMap = new ConcurrentHashMap();
    private final BrokerTopicRegistry topicRegistry = new BrokerTopicRegistry();
    private final TopicSubscriptionRegistry subscribeTopicTree = new TopicSubscriptionRegistry(this);
    private final Timer timer = new HashedWheelTimer(runnable -> {
        return new Thread(runnable, "broker-timer");
    }, 50, 1024);
    private final MessageBusImpl messageBus = new MessageBusImpl(this);
    private final EventBusImpl eventBus = new EventBusImpl();
    private final List<Plugin> plugins = new ArrayList();
    private final Providers providers = new Providers();
    private final MqttBrokerMessageProcessor processor = new MqttBrokerMessageProcessor(this);
    private final BufferPagePool bufferPagePool = new BufferPagePool(1, true);

    public BrokerContextImpl() {
        HashMap hashMap = new HashMap();
        hashMap.put(MqttPingReqMessage.class, new PingReqProcessor());
        hashMap.put(MqttConnectMessage.class, new ConnectProcessor());
        hashMap.put(MqttPublishMessage.class, new PublishProcessor());
        hashMap.put(MqttSubscribeMessage.class, new SubscribeProcessor());
        hashMap.put(MqttUnsubscribeMessage.class, new UnSubscribeProcessor());
        hashMap.put(MqttPubAckMessage.class, new MqttAckProcessor());
        hashMap.put(MqttPubRelMessage.class, new PubRelProcessor());
        hashMap.put(MqttPubRecMessage.class, new MqttAckProcessor());
        hashMap.put(MqttPubCompMessage.class, new MqttAckProcessor());
        hashMap.put(MqttDisconnectMessage.class, new DisConnectProcessor());
        this.processors = Collections.unmodifiableMap(hashMap);
    }

    public void init() throws Throwable {
        this.providers.setSessionStateProvider(new MemorySessionStateProvider());
        updateBrokerConfigure();
        subscribeEventBus();
        subscribeMessageBus();
        initPushThread();
        loadAndInstallPlugins();
        try {
            List<org.smartboot.socket.extension.plugins.Plugin<MqttMessage>> plugins = this.options.getPlugins();
            MqttBrokerMessageProcessor mqttBrokerMessageProcessor = this.processor;
            mqttBrokerMessageProcessor.getClass();
            plugins.forEach(mqttBrokerMessageProcessor::addPlugin);
            this.server = new AioQuickServer(this.options.getHost(), this.options.getPort(), new MqttProtocol(this.options.getMaxPacketSize()), this.processor);
            this.server.setBannerEnabled(false).setReadBufferSize(this.options.getBufferSize()).setWriteBuffer(this.options.getBufferSize(), Math.min(this.options.getMaxInflight(), 16)).setBufferPagePool(this.bufferPagePool).setThreadNum(Math.max(2, this.options.getThreadNum()));
            if (!this.options.isLowMemory()) {
                this.server.disableLowMemory();
            }
            this.server.start(this.options.getChannelGroup());
            this.eventBus.publish(EventType.BROKER_STARTED, this);
            this.configJson = null;
            System.out.println("\n                               _                         _    _       _                  _                  \n                              ( )_                      ( )_ ( )_    ( )                ( )                 \n  ___   ___ ___     _ _  _ __ | ,_)     ___ ___     _ _ | ,_)| ,_)   | |_    _ __   _   | |/')    __   _ __ \n/',__)/' _ ` _ `\\ /'_` )( '__)| |     /' _ ` _ `\\ /'_` )| |  | |     | '_`\\ ( '__)/'_`\\ | , <   /'__`\\( '__)\n\\__, \\| ( ) ( ) |( (_| || |   | |_    | ( ) ( ) |( (_) || |_ | |_    | |_) )| |  ( (_) )| |\\`\\ (  ___/| |   \n(____/(_) (_) (_)`\\__,_)(_)   `\\__)   (_) (_) (_)`\\__, |`\\__)`\\__)   (_,__/'(_)  `\\___/'(_) (_)`\\____)(_)   \n                                                     | |                                                    \n                                                     (_)                                                    \r\n ::smart-mqtt broker::\t(v1.0.0)");
            System.out.println("Gitee: https://gitee.com/smartboot/smart-mqtt");
            System.out.println("Github: https://github.com/smartboot/smart-mqtt");
            System.out.println("Document: https://smartboot.tech/smart-mqtt");
            System.out.println("Support: zhengjunweimail@163.com");
            if (StringUtils.isBlank(this.options.getHost())) {
                System.out.println("��start smart-mqtt success! [port:" + this.options.getPort() + "]");
            } else {
                System.out.println("��start smart-mqtt success! [host:" + this.options.getHost() + " port:" + this.options.getPort() + "]");
            }
        } catch (Exception e) {
            destroy();
            throw e;
        }
    }

    private void initPushThread() {
        if (this.options.getTopicLimit() <= 0) {
            this.options.setTopicLimit(10);
        }
        this.retainPushThreadPool = Executors.newFixedThreadPool(Options().getPushThreadNum());
        this.pushThreadPool = Executors.newFixedThreadPool(Options().getPushThreadNum(), new ThreadFactory() { // from class: tech.smartboot.mqtt.broker.BrokerContextImpl.1
            int index = 0;

            @Override // java.util.concurrent.ThreadFactory
            public Thread newThread(Runnable runnable) {
                StringBuilder append = new StringBuilder().append("broker-push-");
                int i = this.index;
                this.index = i + 1;
                return new Thread(runnable, append.append(i).toString());
            }
        });
    }

    private void subscribeMessageBus() {
        this.messageBus.consumer((mqttSession, message) -> {
            BrokerTopicImpl brokerTopicImpl = (BrokerTopicImpl) message.getTopic();
            int subscribeCount = brokerTopicImpl.subscribeCount();
            if (subscribeCount == 0) {
                LOGGER.debug("none subscriber,ignore message");
                return;
            }
            message.setPushSemaphore(subscribeCount);
            brokerTopicImpl.getMessageQueue().put(message);
            brokerTopicImpl.addVersion();
            brokerTopicImpl.push();
        });
        this.messageBus.consumer(new RetainPersistenceConsumer(), (v0) -> {
            return v0.isRetained();
        });
    }

    private void subscribeEventBus() {
        this.eventBus.subscribe(EventType.CONNECT, new KeepAliveMonitorSubscriber(this));
        this.eventBus.subscribe(EventType.CONNECT, (eventType, eventObject) -> {
            MqttSessionImpl mqttSessionImpl = (MqttSessionImpl) eventObject.getSession();
            mqttSessionImpl.idleConnectTimer.cancel();
            mqttSessionImpl.idleConnectTimer = null;
        });
        this.eventBus.subscribe(EventType.SUBSCRIBE_TOPIC, (eventType2, eventObject2) -> {
            this.retainPushThreadPool.execute(new AsyncTask() { // from class: tech.smartboot.mqtt.broker.BrokerContextImpl.2
                @Override // tech.smartboot.mqtt.common.AsyncTask
                public void execute() {
                    AbstractMessageDeliver abstractMessageDeliver = (AbstractMessageDeliver) eventObject2.getObject();
                    BrokerTopicImpl orCreateTopic = BrokerContextImpl.this.getOrCreateTopic(abstractMessageDeliver.getTopic().getTopic());
                    Message retainMessage = orCreateTopic.getRetainMessage();
                    if (retainMessage == null || retainMessage.getCreateTime() > abstractMessageDeliver.getLatestSubscribeTime()) {
                        orCreateTopic.addSubscriber(abstractMessageDeliver);
                        return;
                    }
                    MqttSessionImpl mqttSessionImpl = (MqttSessionImpl) eventObject2.getSession();
                    PublishBuilder publishBuilder = PublishBuilder.builder().payload(retainMessage.getPayload()).qos(abstractMessageDeliver.getMqttQoS()).topic(retainMessage.getTopic());
                    if (mqttSessionImpl.getMqttVersion() == MqttVersion.MQTT_5) {
                        publishBuilder.publishProperties(new PublishProperties());
                    }
                    if (abstractMessageDeliver.getMqttQoS() == MqttQoS.AT_MOST_ONCE) {
                        mqttSessionImpl.write(publishBuilder.build());
                        orCreateTopic.addSubscriber(abstractMessageDeliver);
                    } else {
                        mqttSessionImpl.getInflightQueue().offer(publishBuilder).whenComplete((mqttPacketIdentifierMessage, th) -> {
                            BrokerContextImpl.LOGGER.info("publish retain to client:{} success  ", mqttSessionImpl.getClientId());
                            orCreateTopic.addSubscriber(abstractMessageDeliver);
                        });
                        mqttSessionImpl.flush();
                    }
                }
            });
        });
        this.eventBus.subscribe(EventType.TOPIC_CREATE, (eventType3, str) -> {
            this.subscribeTopicTree.refreshWhenTopicCreated(str);
        });
    }

    private void updateBrokerConfigure() throws IOException {
        loadYamlConfig();
        this.options = (Options) parseConfig("$.broker", Options.class);
        MqttUtil.updateConfig(this.options, "broker");
        this.options.setChannelGroup(new EnhanceAsynchronousChannelProvider(false).openAsynchronousChannelGroup(Runtime.getRuntime().availableProcessors(), new ThreadFactory() { // from class: tech.smartboot.mqtt.broker.BrokerContextImpl.3
            int i;

            @Override // java.util.concurrent.ThreadFactory
            public Thread newThread(Runnable runnable) {
                StringBuilder append = new StringBuilder().append("smart-mqtt-broker-");
                int i = this.i + 1;
                this.i = i;
                return new Thread(runnable, append.append(i).toString());
            }
        }));
        this.eventBus.publish(EventType.BROKER_CONFIGURE_LOADED, this.options);
    }

    private void loadAndInstallPlugins() throws Throwable {
        Iterator it = ServiceLoader.load(Plugin.class, Providers.class.getClassLoader()).iterator();
        while (it.hasNext()) {
            Plugin plugin = (Plugin) it.next();
            LOGGER.debug("load plugin: " + plugin.pluginName());
            this.plugins.add(plugin);
        }
        this.plugins.sort(Comparator.comparingInt((v0) -> {
            return v0.order();
        }));
        for (Plugin plugin2 : this.plugins) {
            LOGGER.debug("install plugin: " + plugin2.pluginName());
            plugin2.install(this);
        }
    }

    @Override // tech.smartboot.mqtt.plugin.spec.BrokerContext
    public Options Options() {
        return this.options;
    }

    public void addSession(MqttSessionImpl mqttSessionImpl) {
        this.grantSessions.putIfAbsent(mqttSessionImpl.getClientId(), mqttSessionImpl);
    }

    @Override // tech.smartboot.mqtt.plugin.spec.BrokerContext
    public BrokerTopicImpl getOrCreateTopic(String str) {
        BrokerTopicImpl brokerTopicImpl = this.topicMap.get(str);
        if (brokerTopicImpl == null) {
            synchronized (this) {
                brokerTopicImpl = this.topicMap.get(str);
                if (brokerTopicImpl == null) {
                    ValidateUtils.isTrue(!MqttUtil.containsTopicWildcards(str), "invalid topicName: " + str);
                    brokerTopicImpl = new BrokerTopicImpl(str, this.options.getMaxMessageQueueLength(), this.pushThreadPool);
                    LOGGER.info("create topic: {} capacity is {}", str, Integer.valueOf(brokerTopicImpl.getMessageQueue().capacity()));
                    this.topicRegistry.registerTopic(brokerTopicImpl);
                    this.topicMap.put(str, brokerTopicImpl);
                    this.eventBus.publish(EventType.TOPIC_CREATE, str);
                }
            }
        }
        return brokerTopicImpl;
    }

    @Override // tech.smartboot.mqtt.plugin.spec.BrokerContext
    public MessageBusImpl getMessageBus() {
        return this.messageBus;
    }

    @Override // tech.smartboot.mqtt.plugin.spec.BrokerContext
    public EventBusImpl getEventBus() {
        return this.eventBus;
    }

    public MqttSession removeSession(String str) {
        if (!StringUtils.isBlank(str)) {
            return this.grantSessions.remove(str);
        }
        LOGGER.warn("clientId is blank, ignore remove grantSession");
        return null;
    }

    @Override // tech.smartboot.mqtt.plugin.spec.BrokerContext
    public MqttSessionImpl getSession(String str) {
        return this.grantSessions.get(str);
    }

    @Override // tech.smartboot.mqtt.plugin.spec.BrokerContext
    public Timer getTimer() {
        return this.timer;
    }

    @Override // tech.smartboot.mqtt.plugin.spec.BrokerContext
    public Providers getProviders() {
        return this.providers;
    }

    @Override // tech.smartboot.mqtt.plugin.spec.BrokerContext
    public <T> T parseConfig(String str, Class<T> cls) {
        Object extract = JSONPath.of(str).extract(JSONReader.of(this.configJson));
        if (extract instanceof JSONObject) {
            return (T) ((JSONObject) extract).to((Class) cls, new JSONReader.Feature[0]);
        }
        return null;
    }

    public BrokerTopicRegistry getPublishTopicTree() {
        return this.topicRegistry;
    }

    public TopicSubscriptionRegistry getTopicSubscribeTree() {
        return this.subscribeTopicTree;
    }

    private void loadYamlConfig() throws IOException {
        InputStream newInputStream;
        String defaultString = StringUtils.defaultString(System.getProperty(Options.SystemProperty.BrokerConfig), System.getenv(Options.SystemProperty.BrokerConfig));
        if (StringUtils.isBlank(defaultString)) {
            newInputStream = BrokerContext.class.getClassLoader().getResourceAsStream("smart-mqtt.yaml");
            LOGGER.debug("load smart-mqtt.yaml from classpath.");
        } else {
            newInputStream = Files.newInputStream(Paths.get(defaultString, new String[0]), new OpenOption[0]);
            LOGGER.debug("load external yaml config.");
        }
        this.configJson = JSONObject.toJSONString(new Yaml().load(newInputStream), new JSONWriter.Feature[0]);
        if (newInputStream != null) {
            newInputStream.close();
        }
    }

    @Override // tech.smartboot.mqtt.plugin.spec.BrokerContext
    public Map<Class<? extends MqttMessage>, MqttProcessor<?, ?, ?>> getMessageProcessors() {
        return this.processors;
    }

    public void destroy() {
        LOGGER.info("destroy broker...");
        this.eventBus.publish(EventType.BROKER_DESTROY, this);
        this.topicMap.values().forEach((v0) -> {
            v0.disable();
        });
        this.pushThreadPool.shutdown();
        if (this.server != null) {
            this.server.shutdown();
        }
        this.options.getChannelGroup().shutdown();
        this.timer.shutdown();
        this.bufferPagePool.release();
        this.plugins.forEach((v0) -> {
            v0.uninstall();
        });
        this.plugins.clear();
    }
}
