package io.vertx.rabbitmq.impl;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AlreadyClosedException;
import com.rabbitmq.client.Channel;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.internal.logging.Logger;
import io.vertx.core.internal.logging.LoggerFactory;
import io.vertx.rabbitmq.RabbitMQChannel;
import io.vertx.rabbitmq.RabbitMQChannelBuilder;
import io.vertx.rabbitmq.RabbitMQConfirmation;
import io.vertx.rabbitmq.RabbitMQMessageCodec;
import io.vertx.rabbitmq.RabbitMQPublishOptions;
import io.vertx.rabbitmq.RabbitMQPublisher;
import io.vertx.rabbitmq.RabbitMQPublisherOptions;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Deque;
import java.util.Iterator;
import java.util.Objects;

/* loaded from: input_file:io/vertx/rabbitmq/impl/RabbitMQPublisherImpl.class */
public class RabbitMQPublisherImpl<T> implements RabbitMQPublisher<T> {
    private static final Logger log = LoggerFactory.getLogger(RabbitMQPublisherImpl.class);
    private final String exchange;
    private final boolean resendOnReconnect;
    private final RabbitMQMessageCodec<T> messageCodec;
    private RabbitMQChannel channel;
    private RabbitMQCodecManager codecManager;
    private String lastChannelId = null;
    private final Deque<RabbitMQPublisherImpl<T>.MessageDetails> promises = new ArrayDeque();
    private final Deque<RabbitMQPublisherImpl<T>.MessageDetails> resend = new ArrayDeque();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/vertx/rabbitmq/impl/RabbitMQPublisherImpl$MessageDetails.class */
    public class MessageDetails {
        final String channelId;
        final long deliveryTag;
        final Promise<Void> promise;
        final String routingKey;
        final AMQP.BasicProperties properties;
        final byte[] message;

        MessageDetails(String str, long j, Promise<Void> promise, String str2, AMQP.BasicProperties basicProperties, byte[] bArr) {
            this.channelId = str;
            this.deliveryTag = j;
            this.promise = promise;
            this.routingKey = str2;
            this.properties = basicProperties;
            this.message = bArr;
        }

        public MessageDetails(String str, long j, Promise<Void> promise) {
            this.channelId = str;
            this.deliveryTag = j;
            this.promise = promise;
            this.routingKey = null;
            this.properties = null;
            this.message = null;
        }

        public int hashCode() {
            return (41 * ((41 * 3) + Objects.hashCode(this.channelId))) + ((int) (this.deliveryTag ^ (this.deliveryTag >>> 32)));
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            MessageDetails messageDetails = (MessageDetails) obj;
            return this.deliveryTag == messageDetails.deliveryTag && Objects.equals(this.channelId, messageDetails.channelId);
        }
    }

    public static <T> Future<RabbitMQPublisher<T>> create(RabbitMQChannelBuilder rabbitMQChannelBuilder, RabbitMQMessageCodec<T> rabbitMQMessageCodec, String str, RabbitMQPublisherOptions rabbitMQPublisherOptions) {
        return new RabbitMQPublisherImpl(rabbitMQChannelBuilder, rabbitMQMessageCodec, str, rabbitMQPublisherOptions).start(rabbitMQChannelBuilder);
    }

    private Future<Void> performResends(RabbitMQChannel rabbitMQChannel) {
        if (this.lastChannelId == null) {
            this.lastChannelId = rabbitMQChannel.getChannelId();
            return Future.succeededFuture();
        }
        if (this.lastChannelId.equals(rabbitMQChannel.getChannelId())) {
            return Future.succeededFuture();
        }
        if (this.resendOnReconnect) {
            copyPromises(this.resend);
            Promise<Void> promise = Promise.promise();
            doResendAsync(promise);
            return promise.future();
        }
        this.lastChannelId = rabbitMQChannel.getChannelId();
        ArrayList arrayList = new ArrayList();
        copyPromises(arrayList);
        arrayList.forEach(messageDetails -> {
            messageDetails.promise.fail("Channel reconnected");
        });
        return Future.succeededFuture();
    }

    public RabbitMQPublisherImpl(RabbitMQChannelBuilder rabbitMQChannelBuilder, RabbitMQMessageCodec<T> rabbitMQMessageCodec, String str, RabbitMQPublisherOptions rabbitMQPublisherOptions) {
        this.exchange = str;
        this.resendOnReconnect = rabbitMQPublisherOptions.isResendOnReconnect();
        this.messageCodec = rabbitMQMessageCodec;
        this.codecManager = rabbitMQChannelBuilder.getCodecManager();
    }

    public Future<RabbitMQPublisher<T>> start(RabbitMQChannelBuilder rabbitMQChannelBuilder) {
        return rabbitMQChannelBuilder.withChannelRecoveryCallback(this::channelRecoveryCallback).withConfirmHandler(rabbitMQConfirmation -> {
            handleConfirmation(rabbitMQConfirmation);
        }).withChannelOpenHandler(channel -> {
            if (this.channel != null) {
                performResends(this.channel);
            }
        }).openChannel().onSuccess(rabbitMQChannel -> {
            this.channel = rabbitMQChannel;
        }).map(this);
    }

    @Override // io.vertx.rabbitmq.RabbitMQPublisher
    public RabbitMQChannel getChannel() {
        return this.channel;
    }

    private Future<Void> doResendAsync(Promise<Void> promise) {
        RabbitMQPublisherImpl<T>.MessageDetails pollFirst;
        synchronized (this.promises) {
            pollFirst = this.resend.pollFirst();
        }
        if (pollFirst == null) {
            promise.complete();
        } else {
            Promise<Void> promise2 = pollFirst.promise;
            this.channel.basicPublish(new RabbitMQPublishOptions().setDeliveryTagHandler(l -> {
                synchronized (this.promises) {
                    this.promises.addLast(new MessageDetails(pollFirst.channelId, pollFirst.deliveryTag, promise2, pollFirst.routingKey, pollFirst.properties, pollFirst.message));
                }
            }), this.exchange, pollFirst.routingKey, false, pollFirst.properties, pollFirst.message).onFailure(th -> {
                promise2.fail(th);
            }).onComplete(asyncResult -> {
                doResendAsync(promise);
            });
        }
        return promise.future();
    }

    private void channelRecoveryCallback(Channel channel) {
        copyPromises(this.resend);
        synchronized (this.promises) {
            log.debug("Connection recovered, resending " + this.resend.size() + " messages");
            for (RabbitMQPublisherImpl<T>.MessageDetails messageDetails : this.resend) {
                long nextPublishSeqNo = channel.getNextPublishSeqNo();
                try {
                    channel.basicPublish(this.exchange, messageDetails.routingKey, messageDetails.properties, messageDetails.message);
                    this.promises.addLast(new MessageDetails(messageDetails.channelId, nextPublishSeqNo, messageDetails.promise, messageDetails.routingKey, messageDetails.properties, messageDetails.message));
                } catch (IOException e) {
                    this.resend.addFirst(messageDetails);
                }
            }
        }
    }

    private void copyPromises(Collection<RabbitMQPublisherImpl<T>.MessageDetails> collection) {
        synchronized (this.promises) {
            Iterator<RabbitMQPublisherImpl<T>.MessageDetails> it = this.promises.iterator();
            while (it.hasNext()) {
                collection.add(it.next());
            }
            this.promises.clear();
        }
    }

    private void handleConfirmation(RabbitMQConfirmation rabbitMQConfirmation) {
        Logger logger = log;
        long channelNumber = rabbitMQConfirmation.getChannelNumber();
        rabbitMQConfirmation.getDeliveryTag();
        logger.debug("handleConfirmation(" + channelNumber + ":" + logger + ")");
        ArrayList arrayList = new ArrayList();
        synchronized (this.promises) {
            if (this.promises.isEmpty()) {
                log.error("Confirmation received whilst there are no pending promises!");
                return;
            }
            RabbitMQPublisherImpl<T>.MessageDetails first = this.promises.getFirst();
            if (!rabbitMQConfirmation.isMultiple() && first.deliveryTag != rabbitMQConfirmation.getDeliveryTag()) {
                Logger logger2 = log;
                long deliveryTag = rabbitMQConfirmation.getDeliveryTag();
                long j = this.promises.getFirst().deliveryTag;
                logger2.warn("Searching for promise for " + deliveryTag + " where leading promise has " + logger2);
                Iterator<RabbitMQPublisherImpl<T>.MessageDetails> it = this.promises.iterator();
                while (true) {
                    if (!it.hasNext()) {
                        break;
                    }
                    RabbitMQPublisherImpl<T>.MessageDetails next = it.next();
                    if (next.deliveryTag == rabbitMQConfirmation.getDeliveryTag()) {
                        arrayList.add(next);
                        this.promises.remove(next);
                        break;
                    }
                }
            } else {
                while (!this.promises.isEmpty() && this.promises.getFirst().deliveryTag <= rabbitMQConfirmation.getDeliveryTag()) {
                    arrayList.add(this.promises.removeFirst());
                }
            }
            log.info("Found " + arrayList.size() + " promises to complete, out of " + this.promises.size());
            Iterator it2 = arrayList.iterator();
            while (it2.hasNext()) {
                completePromise(((MessageDetails) it2.next()).promise, rabbitMQConfirmation);
            }
        }
    }

    private void completePromise(Promise<Void> promise, RabbitMQConfirmation rabbitMQConfirmation) {
        if (promise != null) {
            if (rabbitMQConfirmation.isSucceeded()) {
                promise.tryComplete();
            } else {
                promise.tryFail("Negative confirmation received");
            }
        }
    }

    @Override // io.vertx.rabbitmq.RabbitMQPublisher
    public Future<Void> publish(String str, AMQP.BasicProperties basicProperties, T t) {
        Promise promise = Promise.promise();
        RabbitMQMessageCodec lookupCodec = this.messageCodec == null ? this.codecManager.lookupCodec(t, null) : this.messageCodec;
        if ((lookupCodec.getContentEncoding() != null && !Objects.equals(lookupCodec.getContentEncoding(), basicProperties.getContentEncoding())) || (lookupCodec.getContentType() != null && !Objects.equals(lookupCodec.getContentType(), basicProperties.getContentType()))) {
            basicProperties = RabbitMQChannelImpl.setTypeAndEncoding(basicProperties, lookupCodec.getContentType(), lookupCodec.getContentEncoding());
        }
        AMQP.BasicProperties basicProperties2 = basicProperties;
        byte[] encodeToBytes = lookupCodec.encodeToBytes(t);
        this.channel.basicPublish(new RabbitMQPublishOptions().setDeliveryTagHandler(l -> {
            synchronized (this.promises) {
                if (this.resendOnReconnect) {
                    this.promises.addLast(new MessageDetails(this.channel.getChannelId(), l.longValue(), promise, str, basicProperties2, encodeToBytes));
                } else {
                    this.promises.addLast(new MessageDetails(this.channel.getChannelId(), l.longValue(), promise));
                }
            }
        }), this.exchange, str, false, basicProperties, encodeToBytes).onFailure(th -> {
            if (!this.resendOnReconnect || !(th instanceof AlreadyClosedException)) {
                promise.fail(th);
                return;
            }
            synchronized (this.promises) {
                this.resend.addLast(new MessageDetails(this.channel.getChannelId(), -1L, promise, str, basicProperties2, encodeToBytes));
            }
        });
        return promise.future();
    }

    @Override // io.vertx.rabbitmq.RabbitMQPublisher
    public Future<Void> pause() {
        synchronized (this.promises) {
            this.promises.clear();
        }
        return Future.succeededFuture();
    }

    @Override // io.vertx.rabbitmq.RabbitMQPublisher
    public Future<Void> cancel() {
        synchronized (this.promises) {
            this.promises.clear();
        }
        return this.channel.close();
    }
}
