package tech.smartboot.mqtt.broker.topic.deliver;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import tech.smartboot.mqtt.broker.MqttSessionImpl;
import tech.smartboot.mqtt.broker.TopicSubscription;
import tech.smartboot.mqtt.broker.topic.BrokerTopicImpl;
import tech.smartboot.mqtt.common.enums.MqttQoS;
import tech.smartboot.mqtt.common.enums.MqttVersion;
import tech.smartboot.mqtt.common.message.MqttPacketIdentifierMessage;
import tech.smartboot.mqtt.common.message.variable.MqttPacketIdVariableHeader;
import tech.smartboot.mqtt.common.message.variable.properties.PublishProperties;
import tech.smartboot.mqtt.common.util.ValidateUtils;
import tech.smartboot.mqtt.plugin.spec.Message;
import tech.smartboot.mqtt.plugin.spec.PublishBuilder;

/* loaded from: input_file:tech/smartboot/mqtt/broker/topic/deliver/Qos12MessageDeliver.class */
public class Qos12MessageDeliver extends Qos0MessageDeliver {
    private final AtomicBoolean semaphore;

    public Qos12MessageDeliver(BrokerTopicImpl brokerTopicImpl, MqttSessionImpl mqttSessionImpl, TopicSubscription topicSubscription, long j) {
        super(brokerTopicImpl, mqttSessionImpl, topicSubscription, j);
        this.semaphore = new AtomicBoolean(false);
        ValidateUtils.isTrue(topicSubscription.getMqttQoS() != MqttQoS.AT_MOST_ONCE, "invalid qos");
    }

    @Override // tech.smartboot.mqtt.broker.topic.deliver.Qos0MessageDeliver, tech.smartboot.mqtt.broker.topic.deliver.AbstractMessageDeliver
    public void pushToClient() {
        if (!getMqttSession().isDisconnect() && this.enable && this.semaphore.compareAndSet(false, true)) {
            push0();
            getMqttSession().flush();
        }
    }

    private void push0() {
        Message message = this.topic.getMessageQueue().get(this.nextConsumerOffset);
        if (message == null) {
            if (this.semaphore.compareAndSet(true, false)) {
                this.topic.addSubscriber(this);
                if (this.topic.getMessageQueue().get(this.nextConsumerOffset) != null) {
                    this.topic.push();
                    return;
                }
                return;
            }
            return;
        }
        int available = getMqttSession().getInflightQueue().available();
        if (available == 0) {
            if (this.semaphore.compareAndSet(true, false)) {
                this.topic.addSubscriber(this);
                if (getMqttSession().getInflightQueue().available() > 0) {
                    this.topic.push();
                    return;
                }
                return;
            }
            return;
        }
        PublishBuilder publishBuilder = PublishBuilder.builder().payload(message.getPayload()).qos(getMqttQoS()).topic(message.getTopic());
        if (getMqttSession().getMqttVersion() == MqttVersion.MQTT_5) {
            publishBuilder.publishProperties(new PublishProperties());
        }
        CompletableFuture<MqttPacketIdentifierMessage<? extends MqttPacketIdVariableHeader>> offer = getMqttSession().getInflightQueue().offer(publishBuilder);
        if (offer != null) {
            this.topic.getMessageQueue().commit(message.getOffset());
            this.nextConsumerOffset = message.getOffset() + 1;
            if (available == 1) {
                offer.whenComplete((mqttPacketIdentifierMessage, th) -> {
                    push0();
                });
            }
        }
        push0();
    }
}
