package io.vertx.rabbitmq.impl;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.Recoverable;
import com.rabbitmq.client.RecoveryListener;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.internal.logging.Logger;
import io.vertx.core.internal.logging.LoggerFactory;
import io.vertx.rabbitmq.ChannelFunction;
import io.vertx.rabbitmq.RabbitMQChannel;
import io.vertx.rabbitmq.RabbitMQChannelBuilder;
import io.vertx.rabbitmq.RabbitMQManagementChannel;
import io.vertx.rabbitmq.RabbitMQMessageCodec;
import io.vertx.rabbitmq.RabbitMQPublishOptions;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;

/* loaded from: input_file:io/vertx/rabbitmq/impl/RabbitMQChannelImpl.class */
public class RabbitMQChannelImpl implements RabbitMQChannel, RabbitMQManagementChannel, ShutdownListener {
    private static final Logger log = LoggerFactory.getLogger(RabbitMQChannelImpl.class);
    private final RabbitMQConnectionImpl connection;
    private final Context context;
    private final RabbitMQCodecManager codecManager;
    private volatile int channelNumber;
    private final List<Handler<Channel>> channelRecoveryCallbacks;
    private final List<Handler<ShutdownSignalException>> shutdownHandlers;
    private final Object publishLock = new Object();
    private long knownConnectionInstance;
    private final int retries;
    private volatile boolean closed;
    private volatile boolean confirmSelected;
    private final CreateLock<Channel> createLock;

    public RabbitMQChannelImpl(RabbitMQChannelBuilder rabbitMQChannelBuilder) {
        this.connection = rabbitMQChannelBuilder.getConnection();
        this.retries = this.connection.getConfiguredReconnectAttempts();
        this.context = this.connection.getVertx().getOrCreateContext();
        this.codecManager = rabbitMQChannelBuilder.getCodecManager();
        this.channelRecoveryCallbacks = rabbitMQChannelBuilder.getChannelRecoveryCallbacks();
        this.shutdownHandlers = rabbitMQChannelBuilder.getShutdownHandlers();
        this.createLock = new CreateLock<>(channel -> {
            return channel.isOpen();
        }, channel2 -> {
            onChannel(rabbitMQChannelBuilder.getChannelOpenHandler());
        });
    }

    public RabbitMQCodecManager getCodecManager() {
        return this.codecManager;
    }

    @Override // io.vertx.rabbitmq.RabbitMQChannel
    public RabbitMQManagementChannel getManagementChannel() {
        return this;
    }

    @Override // io.vertx.rabbitmq.RabbitMQManagementChannel
    public Future<Void> confirmSelect() {
        this.confirmSelected = true;
        return onChannel(channel -> {
            channel.confirmSelect();
            return null;
        });
    }

    @Override // io.vertx.rabbitmq.RabbitMQChannel
    public Future<Void> waitForConfirms(long j) {
        return onChannel(channel -> {
            channel.waitForConfirmsOrDie(j);
            return null;
        });
    }

    @Override // io.vertx.rabbitmq.RabbitMQChannel
    public int getChannelNumber() {
        return this.channelNumber;
    }

    @Override // io.vertx.rabbitmq.RabbitMQChannel, io.vertx.rabbitmq.RabbitMQManagementChannel
    public String getChannelId() {
        return this.connection.getConnectionName() + ":" + Long.toString(this.knownConnectionInstance) + ":" + this.channelNumber;
    }

    public Future<RabbitMQChannel> connect() {
        return onChannel(channel -> {
            return this;
        });
    }

    private void connect(Promise<Channel> promise) {
        this.connection.openChannel(this.knownConnectionInstance).onComplete(asyncResult -> {
            if (asyncResult.failed()) {
                promise.fail(asyncResult.cause());
                return;
            }
            this.knownConnectionInstance = this.connection.getConnectionInstance();
            final Recoverable recoverable = (Channel) asyncResult.result();
            this.channelNumber = recoverable.getChannelNumber();
            recoverable.addShutdownListener(this);
            if (recoverable instanceof Recoverable) {
                recoverable.addRecoveryListener(new RecoveryListener() { // from class: io.vertx.rabbitmq.impl.RabbitMQChannelImpl.1
                    public void handleRecovery(Recoverable recoverable2) {
                        ArrayList arrayList;
                        RabbitMQChannelImpl.log.info("Channel " + String.valueOf(recoverable2) + " recovered");
                        synchronized (RabbitMQChannelImpl.this.channelRecoveryCallbacks) {
                            arrayList = new ArrayList(RabbitMQChannelImpl.this.channelRecoveryCallbacks);
                        }
                        Iterator it = arrayList.iterator();
                        while (it.hasNext()) {
                            ((Handler) it.next()).handle(recoverable);
                        }
                    }

                    public void handleRecoveryStarted(Recoverable recoverable2) {
                    }
                });
            }
            promise.complete(recoverable);
        });
    }

    public void shutdownCompleted(ShutdownSignalException shutdownSignalException) {
        if (this.retries > 0) {
            this.knownConnectionInstance = -1L;
        }
        log.trace("Channel " + String.valueOf(this) + " Shutdown: " + shutdownSignalException.getMessage());
        Iterator<Handler<ShutdownSignalException>> it = this.shutdownHandlers.iterator();
        while (it.hasNext()) {
            it.next().handle(shutdownSignalException);
        }
    }

    @Override // io.vertx.rabbitmq.RabbitMQChannel
    public <T> Future<T> onChannel(ChannelFunction<T> channelFunction) {
        return (this.closed || this.connection.isClosed()) ? Future.failedFuture("Channel closed") : (Future<T>) this.createLock.create(promise -> {
            connect(promise);
        }, channel -> {
            return this.context.executeBlocking(() -> {
                try {
                    return channelFunction.handle(channel);
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            });
        });
    }

    @Override // io.vertx.rabbitmq.RabbitMQManagementChannel
    public Future<Void> abort(int i, String str) {
        return onChannel(channel -> {
            channel.abort();
            return null;
        });
    }

    public Future<Void> addConfirmListener(ConfirmCallback confirmCallback, ConfirmCallback confirmCallback2) {
        return onChannel(channel -> {
            return channel.addConfirmListener(confirmCallback, confirmCallback2);
        }).mapEmpty();
    }

    @Override // io.vertx.rabbitmq.RabbitMQChannel
    public Future<Void> basicAck(long j, long j2, boolean z) {
        return onChannel(channel -> {
            if (j != this.channelNumber) {
                return null;
            }
            channel.basicAck(j2, z);
            return null;
        });
    }

    @Override // io.vertx.rabbitmq.RabbitMQChannel
    public Future<Void> basicNack(long j, long j2, boolean z, boolean z2) {
        return onChannel(channel -> {
            if (j != this.channelNumber) {
                return null;
            }
            channel.basicNack(j2, z, z2);
            return null;
        });
    }

    @Override // io.vertx.rabbitmq.RabbitMQChannel
    public Future<String> basicConsume(String str, boolean z, String str2, boolean z2, boolean z3, Map<String, Object> map, Consumer consumer) {
        return onChannel(channel -> {
            return channel.basicConsume(str, z, str2, z2, z3, map, consumer);
        });
    }

    @Override // io.vertx.rabbitmq.RabbitMQChannel
    public Future<Void> basicCancel(String str) {
        return onChannel(channel -> {
            channel.basicCancel(str);
            return null;
        });
    }

    public static AMQP.BasicProperties setTypeAndEncoding(AMQP.BasicProperties basicProperties, String str, String str2) {
        return new AMQP.BasicProperties.Builder().appId(basicProperties.getAppId()).clusterId(basicProperties.getClusterId()).contentEncoding(str2 == null ? basicProperties.getContentEncoding() : str2).contentType(str == null ? basicProperties.getContentType() : str).correlationId(basicProperties.getCorrelationId()).deliveryMode(basicProperties.getDeliveryMode()).expiration(basicProperties.getExpiration()).headers(basicProperties.getHeaders()).messageId(basicProperties.getMessageId()).priority(basicProperties.getPriority()).replyTo(basicProperties.getReplyTo()).type(basicProperties.getType()).timestamp(basicProperties.getTimestamp()).userId(basicProperties.getUserId()).build();
    }

    @Override // io.vertx.rabbitmq.RabbitMQChannel
    public Future<Void> basicPublish(RabbitMQPublishOptions rabbitMQPublishOptions, String str, String str2, boolean z, AMQP.BasicProperties basicProperties, Object obj) {
        Channel channel;
        Future<Void> succeededFuture;
        RabbitMQMessageCodec lookupCodec = this.codecManager.lookupCodec(obj, rabbitMQPublishOptions == null ? null : rabbitMQPublishOptions.getCodec());
        if ((lookupCodec.getContentEncoding() != null && !Objects.equals(lookupCodec.getContentEncoding(), basicProperties.getContentEncoding())) || (lookupCodec.getContentType() != null && !Objects.equals(lookupCodec.getContentType(), basicProperties.getContentType()))) {
            basicProperties = setTypeAndEncoding(basicProperties, lookupCodec.getContentType(), lookupCodec.getContentEncoding());
        }
        try {
            channel = this.createLock.get();
        } catch (IOException e) {
            log.warn("Synchronous send of basicPublish(" + str + ", " + str2 + ", " + z + ": ", e);
        }
        if (this.confirmSelected || channel == null || !channel.isOpen()) {
            boolean z2 = rabbitMQPublishOptions != null && rabbitMQPublishOptions.isWaitForConfirm();
            if (z2 && !this.confirmSelected) {
                return Future.failedFuture(new IllegalStateException("Must call confirmSelect before basicPublishWithConfirm"));
            }
            AMQP.BasicProperties basicProperties2 = basicProperties;
            return onChannel(channel2 -> {
                synchronized (this.publishLock) {
                    if (rabbitMQPublishOptions != null) {
                        if (rabbitMQPublishOptions.getDeliveryTagHandler() != null) {
                            rabbitMQPublishOptions.getDeliveryTagHandler().handle(Long.valueOf(channel2.getNextPublishSeqNo()));
                        }
                    }
                    channel2.basicPublish(str, str2, z, basicProperties2, lookupCodec.encodeToBytes(obj));
                    if (z2) {
                        channel2.waitForConfirms();
                    }
                }
                return null;
            }).mapEmpty();
        }
        synchronized (this.publishLock) {
            if (rabbitMQPublishOptions != null) {
                if (rabbitMQPublishOptions.getDeliveryTagHandler() != null) {
                    rabbitMQPublishOptions.getDeliveryTagHandler().handle(Long.valueOf(channel.getNextPublishSeqNo()));
                }
            }
            channel.basicPublish(str, str2, z, basicProperties, lookupCodec.encodeToBytes(obj));
            succeededFuture = Future.succeededFuture();
        }
        return succeededFuture;
    }

    @Override // io.vertx.rabbitmq.RabbitMQManagementChannel
    public Future<Void> exchangeDeclare(String str, BuiltinExchangeType builtinExchangeType, boolean z, boolean z2, Map<String, Object> map) {
        return onChannel(channel -> {
            return channel.exchangeDeclare(str, builtinExchangeType, z, z2, map);
        }).mapEmpty();
    }

    @Override // io.vertx.rabbitmq.RabbitMQManagementChannel
    public Future<Void> exchangeDeclarePassive(String str) {
        return onChannel(channel -> {
            return channel.exchangeDeclarePassive(str);
        }).mapEmpty();
    }

    @Override // io.vertx.rabbitmq.RabbitMQManagementChannel
    public Future<Void> exchangeBind(String str, String str2, String str3, Map<String, Object> map) {
        return onChannel(channel -> {
            return channel.exchangeBind(str, str2, str3, map);
        }).mapEmpty();
    }

    @Override // io.vertx.rabbitmq.RabbitMQManagementChannel
    public Future<String> queueDeclare(String str, boolean z, boolean z2, boolean z3, Map<String, Object> map) {
        return onChannel(channel -> {
            return channel.queueDeclare(str, z, z2, z3, map);
        }).map(declareOk -> {
            return declareOk.getQueue();
        });
    }

    @Override // io.vertx.rabbitmq.RabbitMQManagementChannel
    public Future<Void> queueDeclarePassive(String str) {
        return onChannel(channel -> {
            return channel.queueDeclarePassive(str);
        }).mapEmpty();
    }

    @Override // io.vertx.rabbitmq.RabbitMQManagementChannel
    public Future<Void> queueBind(String str, String str2, String str3, Map<String, Object> map) {
        return onChannel(channel -> {
            return channel.queueBind(str, str2, str3, map);
        }).mapEmpty();
    }

    @Override // io.vertx.rabbitmq.RabbitMQChannel, io.vertx.rabbitmq.RabbitMQManagementChannel
    public Future<Void> close() {
        return close(200, "OK");
    }

    @Override // io.vertx.rabbitmq.RabbitMQChannel, io.vertx.rabbitmq.RabbitMQManagementChannel
    public Future<Void> close(int i, String str) {
        Channel channel = this.createLock.get();
        this.closed = true;
        return (channel == null || !channel.isOpen()) ? Future.succeededFuture() : this.context.executeBlocking(() -> {
            try {
                channel.close(i, str);
                return null;
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
    }
}
