package tech.smartboot.mqtt.broker.topic;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tech.smartboot.mqtt.broker.topic.deliver.AbstractMessageDeliver;
import tech.smartboot.mqtt.common.AsyncTask;
import tech.smartboot.mqtt.common.TopicToken;
import tech.smartboot.mqtt.common.message.MqttCodecUtil;
import tech.smartboot.mqtt.plugin.spec.BrokerTopic;
import tech.smartboot.mqtt.plugin.spec.Message;

/* loaded from: input_file:tech/smartboot/mqtt/broker/topic/BrokerTopicImpl.class */
public class BrokerTopicImpl extends TopicToken implements BrokerTopic {
    private final DeliverGroup defaultGroup;
    private final Map<String, DeliverGroup> shareSubscribers;
    private final Semaphore semaphore;
    private final ExecutorService executorService;
    private boolean enabled;
    private int version;
    private final byte[] encodedTopic;
    private final AsyncTask asyncTask;
    private Message retainMessage;
    private final MemoryMessageStoreQueue messageQueue;
    private final ConcurrentLinkedQueue<AbstractMessageDeliver> queue;
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) BrokerTopicImpl.class);
    private static final AbstractMessageDeliver BREAK = new AbstractMessageDeliver(null, null, null, -1) { // from class: tech.smartboot.mqtt.broker.topic.BrokerTopicImpl.2
        @Override // tech.smartboot.mqtt.broker.topic.deliver.AbstractMessageDeliver
        public void pushToClient() {
            throw new UnsupportedOperationException();
        }
    };

    public BrokerTopicImpl(String str) {
        this(str, 64, null);
    }

    public BrokerTopicImpl(String str, int i, ExecutorService executorService) {
        super(str);
        this.defaultGroup = new DeliverGroup();
        this.shareSubscribers = new ConcurrentHashMap();
        this.semaphore = new Semaphore(1);
        this.enabled = true;
        this.version = 0;
        this.asyncTask = new AsyncTask() { // from class: tech.smartboot.mqtt.broker.topic.BrokerTopicImpl.1
            @Override // tech.smartboot.mqtt.common.AsyncTask
            public void execute() {
                BrokerTopicImpl.this.queue.offer(BrokerTopicImpl.BREAK);
                int i2 = BrokerTopicImpl.this.version;
                while (true) {
                    AbstractMessageDeliver abstractMessageDeliver = (AbstractMessageDeliver) BrokerTopicImpl.this.queue.poll();
                    if (abstractMessageDeliver == BrokerTopicImpl.BREAK) {
                        break;
                    }
                    try {
                        abstractMessageDeliver.pushToClient();
                    } catch (Exception e) {
                        BrokerTopicImpl.LOGGER.error("batch publish exception:{}", e.getMessage(), e);
                    }
                }
                BrokerTopicImpl.this.semaphore.release();
                if (i2 == BrokerTopicImpl.this.version || BrokerTopicImpl.this.queue.isEmpty()) {
                    return;
                }
                BrokerTopicImpl.this.push();
            }
        };
        this.queue = new ConcurrentLinkedQueue<>();
        this.executorService = executorService;
        this.messageQueue = new MemoryMessageStoreQueue(i);
        this.encodedTopic = MqttCodecUtil.encodeUTF8(str);
    }

    public DeliverGroup getSubscriberGroup(TopicToken topicToken) {
        return topicToken.isShared() ? this.shareSubscribers.computeIfAbsent(topicToken.getTopicFilter(), str -> {
            return new SharedDeliverGroup(this);
        }) : this.defaultGroup;
    }

    public void removeShareGroup(String str) {
        this.shareSubscribers.remove(str);
    }

    @Override // tech.smartboot.mqtt.plugin.spec.BrokerTopic
    public int subscribeCount() {
        return this.shareSubscribers.size() + this.defaultGroup.count();
    }

    public void addSubscriber(AbstractMessageDeliver abstractMessageDeliver) {
        this.queue.offer(abstractMessageDeliver);
    }

    @Override // tech.smartboot.mqtt.plugin.spec.BrokerTopic
    public String getTopic() {
        return getTopicFilter();
    }

    @Override // tech.smartboot.mqtt.plugin.spec.BrokerTopic
    public TopicToken toTopicToken() {
        return this;
    }

    @Override // tech.smartboot.mqtt.plugin.spec.BrokerTopic
    public byte[] encodedTopicBytes() {
        return this.encodedTopic;
    }

    public Message getRetainMessage() {
        return this.retainMessage;
    }

    public void setRetainMessage(Message message) {
        this.retainMessage = message;
    }

    public MemoryMessageStoreQueue getMessageQueue() {
        return this.messageQueue;
    }

    public String toString() {
        return getTopic();
    }

    public void push() {
        if (this.enabled && this.semaphore.tryAcquire()) {
            this.executorService.execute(this.asyncTask);
        }
    }

    public void addVersion() {
        this.version++;
    }

    public void disable() {
        this.enabled = false;
    }

    public void dump() {
    }
}
