package tech.smartboot.mqtt.broker;

import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tech.smartboot.mqtt.common.message.MqttPublishMessage;
import tech.smartboot.mqtt.common.message.variable.MqttPublishVariableHeader;
import tech.smartboot.mqtt.plugin.spec.BrokerContext;
import tech.smartboot.mqtt.plugin.spec.Message;
import tech.smartboot.mqtt.plugin.spec.MqttSession;
import tech.smartboot.mqtt.plugin.spec.bus.MessageBus;
import tech.smartboot.mqtt.plugin.spec.bus.MessageBusConsumer;

/* loaded from: input_file:tech/smartboot/mqtt/broker/MessageBusImpl.class */
public class MessageBusImpl implements MessageBus {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) MessageBusImpl.class);
    private final List<MessageBusConsumer> messageBuses = new ArrayList();
    private final BrokerContext brokerContext;

    public MessageBusImpl(BrokerContext brokerContext) {
        this.brokerContext = brokerContext;
    }

    @Override // tech.smartboot.mqtt.plugin.spec.bus.MessageBus
    public void consumer(MessageBusConsumer messageBusConsumer) {
        this.messageBuses.add(messageBusConsumer);
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void publish(MqttSession mqttSession, MqttPublishMessage mqttPublishMessage) {
        Message message = new Message(mqttPublishMessage, this.brokerContext.getOrCreateTopic(((MqttPublishVariableHeader) mqttPublishMessage.getVariableHeader()).getTopicName()));
        boolean z = false;
        for (MessageBusConsumer messageBusConsumer : this.messageBuses) {
            try {
                if (messageBusConsumer.enable()) {
                    messageBusConsumer.consume(mqttSession, message);
                } else {
                    z = true;
                }
            } catch (Throwable th) {
                LOGGER.info("messageBus consume exception", th);
            }
        }
        if (z) {
            this.messageBuses.removeIf(messageBusConsumer2 -> {
                return !messageBusConsumer2.enable();
            });
        }
    }
}
