package tech.smartboot.mqtt.broker.plugin.ws;

import com.alibaba.fastjson2.JSONObject;
import com.alibaba.fastjson2.JSONWriter;
import java.nio.ByteBuffer;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.smartboot.socket.transport.AioSession;
import tech.smartboot.feat.Feat;
import tech.smartboot.feat.core.server.HttpHandler;
import tech.smartboot.feat.core.server.HttpRequest;
import tech.smartboot.feat.core.server.HttpServer;
import tech.smartboot.feat.core.server.ServerOptions;
import tech.smartboot.feat.core.server.WebSocketRequest;
import tech.smartboot.feat.core.server.WebSocketResponse;
import tech.smartboot.feat.core.server.upgrade.websocket.WebSocketUpgrade;
import tech.smartboot.mqtt.broker.BrokerContextImpl;
import tech.smartboot.mqtt.broker.MqttSessionImpl;
import tech.smartboot.mqtt.common.MqttProtocol;
import tech.smartboot.mqtt.common.message.MqttMessage;
import tech.smartboot.mqtt.plugin.spec.BrokerContext;
import tech.smartboot.mqtt.plugin.spec.MqttProcessor;
import tech.smartboot.mqtt.plugin.spec.Plugin;
import tech.smartboot.mqtt.plugin.spec.bus.EventObject;
import tech.smartboot.mqtt.plugin.spec.bus.EventType;

/* loaded from: input_file:tech/smartboot/mqtt/broker/plugin/ws/WebSocketPlugin.class */
public class WebSocketPlugin extends Plugin {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) WebSocketPlugin.class);
    private HttpServer httpBootstrap;

    @Override // tech.smartboot.mqtt.plugin.spec.Plugin
    protected void initPlugin(final BrokerContext brokerContext) throws Throwable {
        Config config = (Config) brokerContext.parseConfig("$.broker.ws", Config.class);
        if (config == null) {
            LOGGER.error("mqtt over websocket is disable.");
            return;
        }
        LOGGER.debug("websocket config:{}", JSONObject.toJSONString(config, new JSONWriter.Feature[0]));
        final MqttProtocol mqttProtocol = new MqttProtocol(brokerContext.Options().getMaxPacketSize());
        this.httpBootstrap = Feat.httpServer((Consumer<ServerOptions>) serverOptions -> {
            serverOptions.debug(true).bannerEnabled(false).readBufferSize(1048576);
        }).httpHandler(new HttpHandler() { // from class: tech.smartboot.mqtt.broker.plugin.ws.WebSocketPlugin.1
            @Override // tech.smartboot.feat.core.server.HttpHandler
            public void handle(HttpRequest httpRequest) throws Throwable {
                httpRequest.getResponse().setHeader("Sec-WebSocket-Protocol", "mqtt");
                httpRequest.upgrade(new WebSocketUpgrade() { // from class: tech.smartboot.mqtt.broker.plugin.ws.WebSocketPlugin.1.1
                    ProxySession proxySession;
                    ByteBuffer readBuffer;
                    MqttSessionImpl mqttSession;

                    @Override // tech.smartboot.feat.core.server.upgrade.websocket.WebSocketUpgrade
                    public void handleBinaryMessage(WebSocketRequest webSocketRequest, WebSocketResponse webSocketResponse, byte[] bArr) {
                        MqttMessage decode;
                        if (this.proxySession == null) {
                            this.proxySession = new ProxySession(this.request.getAioSession());
                            this.mqttSession = new MqttSessionImpl((BrokerContextImpl) brokerContext, this.proxySession, new ByteArrayMqttOutputStream(webSocketResponse));
                            this.proxySession.setAttachment(this.mqttSession);
                            this.readBuffer = ByteBuffer.allocate(brokerContext.Options().getBufferSize());
                        }
                        this.readBuffer.put(bArr);
                        this.readBuffer.flip();
                        while (this.readBuffer.hasRemaining() && (decode = mqttProtocol.decode(this.readBuffer, (AioSession) this.proxySession)) != null) {
                            MqttProcessor<?, ?, ?> mqttProcessor = brokerContext.getMessageProcessors().get(decode.getClass());
                            if (mqttProcessor != null) {
                                brokerContext.getEventBus().publish(EventType.RECEIVE_MESSAGE, EventObject.newEventObject(this.mqttSession, decode));
                                this.mqttSession.setLatestReceiveMessageTime(System.currentTimeMillis());
                                mqttProcessor.process(brokerContext, this.mqttSession, decode);
                            } else {
                                System.err.println("unSupport message: " + bArr);
                            }
                        }
                        this.readBuffer.compact();
                    }
                });
            }
        });
        this.httpBootstrap.listen(config.getPort());
    }

    @Override // tech.smartboot.mqtt.plugin.spec.Plugin
    protected void destroyPlugin() {
        if (this.httpBootstrap != null) {
            this.httpBootstrap.shutdown();
        }
    }
}
