package io.vertx.rabbitmq.impl;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Address;
import com.rabbitmq.client.AlreadyClosedException;
import com.rabbitmq.client.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.GetResponse;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;
import io.vertx.core.AsyncResult;
import io.vertx.core.Completable;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.VertxException;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.ClientAuth;
import io.vertx.core.internal.ContextInternal;
import io.vertx.core.internal.VertxInternal;
import io.vertx.core.internal.logging.Logger;
import io.vertx.core.internal.logging.LoggerFactory;
import io.vertx.core.internal.tls.SslContextManager;
import io.vertx.core.internal.tls.SslContextProvider;
import io.vertx.core.json.JsonObject;
import io.vertx.core.net.JdkSSLEngineOptions;
import io.vertx.core.streams.ReadStream;
import io.vertx.rabbitmq.QueueOptions;
import io.vertx.rabbitmq.RabbitMQClient;
import io.vertx.rabbitmq.RabbitMQConfirmation;
import io.vertx.rabbitmq.RabbitMQConsumer;
import io.vertx.rabbitmq.RabbitMQMessage;
import io.vertx.rabbitmq.RabbitMQOptions;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.TrustManager;

/* loaded from: input_file:io/vertx/rabbitmq/impl/RabbitMQClientImpl.class */
public class RabbitMQClientImpl implements RabbitMQClient, ShutdownListener {
    private static final Logger log = LoggerFactory.getLogger(RabbitMQClientImpl.class);
    private static final JsonObject emptyConfig = new JsonObject();
    private final Vertx vertx;
    private final RabbitMQOptions config;
    private final int retries;
    private Connection connection;
    private Channel channel;
    private long channelInstance;
    private boolean channelConfirms = false;
    private boolean hasConnected = false;
    private AtomicBoolean isReconnecting = new AtomicBoolean(false);
    private List<Handler<Promise<Void>>> connectionEstablishedCallbacks = new ArrayList();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/vertx/rabbitmq/impl/RabbitMQClientImpl$ChannelHandler.class */
    public interface ChannelHandler<T> {
        T handle(Channel channel) throws Exception;
    }

    public RabbitMQClientImpl(Vertx vertx, RabbitMQOptions rabbitMQOptions) {
        this.vertx = vertx;
        this.config = rabbitMQOptions;
        this.retries = rabbitMQOptions.getReconnectAttempts();
    }

    public long getChannelInstance() {
        return this.channelInstance;
    }

    @Override // io.vertx.rabbitmq.RabbitMQClient
    public void addConnectionEstablishedCallback(Handler<Promise<Void>> handler) {
        this.connectionEstablishedCallbacks.add(handler);
    }

    private static Connection newConnection(Vertx vertx, RabbitMQOptions rabbitMQOptions) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        String uri = rabbitMQOptions.getUri();
        List<Address> list = null;
        if (uri != null) {
            try {
                connectionFactory.setUri(uri);
                log.info("Connecting to " + connectionFactory.getHost());
            } catch (Exception e) {
                throw new IllegalArgumentException("Invalid rabbitmq connection uri ", e);
            }
        } else {
            connectionFactory.setUsername(rabbitMQOptions.getUser());
            connectionFactory.setPassword(rabbitMQOptions.getPassword());
            list = rabbitMQOptions.getAddresses().isEmpty() ? Collections.singletonList(new Address(rabbitMQOptions.getHost(), rabbitMQOptions.getPort())) : rabbitMQOptions.getAddresses();
            connectionFactory.setVirtualHost(rabbitMQOptions.getVirtualHost());
            log.info("Connecting to " + String.valueOf(list));
        }
        connectionFactory.setConnectionTimeout(rabbitMQOptions.getConnectionTimeout());
        connectionFactory.setRequestedHeartbeat(rabbitMQOptions.getRequestedHeartbeat());
        connectionFactory.setHandshakeTimeout(rabbitMQOptions.getHandshakeTimeout());
        connectionFactory.setRequestedChannelMax(rabbitMQOptions.getRequestedChannelMax());
        connectionFactory.setNetworkRecoveryInterval(rabbitMQOptions.getNetworkRecoveryInterval());
        connectionFactory.setAutomaticRecoveryEnabled(rabbitMQOptions.isAutomaticRecoveryEnabled());
        if (rabbitMQOptions.isSsl()) {
            rabbitMQOptions.setSslEngineOptions(new JdkSSLEngineOptions());
            try {
                connectionFactory.useSslProtocol(((SslContextProvider) new SslContextManager(SslContextManager.resolveEngineOptions(rabbitMQOptions.getSslEngineOptions(), rabbitMQOptions.isUseAlpn())).resolveSslContextProvider(rabbitMQOptions.getSslOptions().copy(), rabbitMQOptions.getHostnameVerificationAlgorithm(), (ClientAuth) null, (List) null, ((VertxInternal) vertx).createEventLoopContext()).toCompletionStage().toCompletableFuture().get(1L, TimeUnit.MINUTES)).createContext(false, (KeyManagerFactory) null, (TrustManager[]) null, (String) null, false).unwrap().context());
            } catch (InterruptedException e2) {
                throw new VertxException(e2);
            } catch (ExecutionException e3) {
                throw new VertxException(e3.getCause());
            }
        }
        if (rabbitMQOptions.isNioEnabled()) {
            connectionFactory.useNio();
        }
        if (rabbitMQOptions.getSaslConfig() != null) {
            connectionFactory.setSaslConfig(rabbitMQOptions.getSaslConfig());
        }
        if (rabbitMQOptions.getCredentialsProvider() != null) {
            connectionFactory.setCredentialsProvider(rabbitMQOptions.getCredentialsProvider());
        }
        if (rabbitMQOptions.getCredentialsRefreshService() != null) {
            connectionFactory.setCredentialsRefreshService(rabbitMQOptions.getCredentialsRefreshService());
        }
        if (rabbitMQOptions.getMetricsCollector() != null) {
            connectionFactory.setMetricsCollector(rabbitMQOptions.getMetricsCollector());
        }
        return list == null ? connectionFactory.newConnection(rabbitMQOptions.getConnectionName()) : connectionFactory.newConnection(list, rabbitMQOptions.getConnectionName());
    }

    @Override // io.vertx.rabbitmq.RabbitMQClient
    public boolean isConnected() {
        boolean z = false;
        if (this.connection != null && this.connection.isOpen()) {
            z = true;
        }
        return z;
    }

    @Override // io.vertx.rabbitmq.RabbitMQClient
    public boolean isOpenChannel() {
        return this.channel != null && this.channel.isOpen();
    }

    @Override // io.vertx.rabbitmq.RabbitMQClient
    public Future<Void> basicAck(long j, boolean z) {
        return forChannel(channel -> {
            channel.basicAck(j, z);
            return null;
        });
    }

    @Override // io.vertx.rabbitmq.RabbitMQClient
    public Future<Void> basicNack(long j, boolean z, boolean z2) {
        return forChannel(channel -> {
            channel.basicNack(j, z, z2);
            return null;
        });
    }

    private void restartConsumer(int i, QueueConsumerHandler queueConsumerHandler, QueueOptions queueOptions) {
        if (queueConsumerHandler.queue().isCancelled()) {
            return;
        }
        restartConnect(0, (r10, th) -> {
            forChannel(channel -> {
                RabbitMQConsumer queue = queueConsumerHandler.queue();
                channel.basicConsume(queue.queueName(), queueOptions.isAutoAck(), queueOptions.getConsumerTag(), queueOptions.isNoLocal(), queueOptions.isConsumerExclusive(), queueOptions.getConsumerArguments(), queueConsumerHandler);
                log.info("Reconsume queue: " + queue.queueName() + " success");
                return queue.mo2resume();
            }).onComplete(asyncResult -> {
                if (asyncResult.failed()) {
                    log.error("Failed to restart consumer: ", asyncResult.cause());
                    this.vertx.setTimer(this.config.getReconnectInterval(), l -> {
                        restartConsumer(i + 1, queueConsumerHandler, queueOptions);
                    });
                }
            });
        });
    }

    @Override // io.vertx.rabbitmq.RabbitMQClient
    public Future<Void> restartConnect(int i) {
        return Future.future(promise -> {
            restartConnect(i, promise);
        });
    }

    private void restartConnect(int i, Completable<Void> completable) {
        if (this.retries == 0) {
            log.error("Retries disabled. Will not attempt to restart");
            completable.fail("Retries disabled. Will not attempt to restart");
            return;
        }
        if (!this.isReconnecting.compareAndSet(false, true)) {
            log.debug("Other consumers or producers are reconnecting. Continue to wait for reconnection");
            this.vertx.setTimer(this.config.getReconnectInterval(), l -> {
                restartConnect(i, completable);
            });
        } else if (this.channel == null || !this.channel.isOpen()) {
            log.debug("Start to reconnect...");
            execRestart(i, completable);
        } else {
            log.debug("Other consumers or producers reconnect successfully. Reuse their channel");
            completable.succeed();
            this.isReconnecting.set(false);
        }
    }

    private void execRestart(int i, Completable<Void> completable) {
        stop().onComplete(asyncResult -> {
            if (!asyncResult.succeeded()) {
                log.error("Failed to stop client, will attempt to restart: ", asyncResult.cause());
                this.vertx.setTimer(this.config.getReconnectInterval(), l -> {
                    execRestart(i + 1, completable);
                });
            } else if (i >= this.retries) {
                log.error("Max number of consumer restart attempts (" + this.retries + ") reached. Will not attempt to restart again");
            } else {
                start().onComplete(asyncResult -> {
                    if (!asyncResult.succeeded()) {
                        log.error("Failed to restart client: ", asyncResult.cause());
                        this.vertx.setTimer(this.config.getReconnectInterval(), l2 -> {
                            execRestart(i + 1, completable);
                        });
                    } else {
                        if (this.channelConfirms) {
                            confirmSelect();
                        }
                        log.info("Successed to restart client. ");
                        this.isReconnecting.set(false);
                        completable.succeed();
                    }
                });
            }
        });
    }

    @Override // io.vertx.rabbitmq.RabbitMQClient
    public Future<RabbitMQConsumer> basicConsumer(String str, QueueOptions queueOptions) {
        return forChannel(channel -> {
            log.debug("Created new QueueConsumer");
            QueueConsumerHandler queueConsumerHandler = new QueueConsumerHandler(this.vertx, channel, queueOptions, str);
            if (this.retries > 0) {
                queueConsumerHandler.setShutdownHandler(shutdownSignalException -> {
                    restartConsumer(0, queueConsumerHandler, queueOptions);
                });
            }
            try {
                channel.basicConsume(str, queueOptions.isAutoAck(), queueOptions.getConsumerTag(), queueOptions.isNoLocal(), queueOptions.isConsumerExclusive(), queueOptions.getConsumerArguments(), queueConsumerHandler);
            } catch (Throwable th) {
                log.warn("Failed to consume: ", th);
                restartConsumer(0, queueConsumerHandler, queueOptions);
            }
            return queueConsumerHandler;
        }).map(queueConsumerHandler -> {
            RabbitMQConsumer queue = queueConsumerHandler.queue();
            queue.mo2resume();
            return queue;
        });
    }

    @Override // io.vertx.rabbitmq.RabbitMQClient
    public Future<RabbitMQMessage> basicGet(String str, boolean z) {
        return forChannel(channel -> {
            GetResponse basicGet = channel.basicGet(str, z);
            if (basicGet == null) {
                return null;
            }
            return new RabbitMQMessageImpl(basicGet.getBody(), null, basicGet.getEnvelope(), basicGet.getProps(), Integer.valueOf(basicGet.getMessageCount()));
        });
    }

    @Override // io.vertx.rabbitmq.RabbitMQClient
    public Future<Void> basicPublish(String str, String str2, Buffer buffer) {
        return basicPublishWithDeliveryTag(str, str2, new AMQP.BasicProperties(), buffer, null);
    }

    @Override // io.vertx.rabbitmq.RabbitMQClient
    public Future<Void> basicPublish(String str, String str2, BasicProperties basicProperties, Buffer buffer) {
        return basicPublishWithDeliveryTag(str, str2, basicProperties, buffer, null);
    }

    @Override // io.vertx.rabbitmq.RabbitMQClient
    public Future<Void> basicPublishWithDeliveryTag(String str, String str2, BasicProperties basicProperties, Buffer buffer, Handler<Long> handler) {
        return forChannel(channel -> {
            if (handler != null) {
                handler.handle(Long.valueOf(channel.getNextPublishSeqNo()));
            }
            channel.basicPublish(str, str2, (AMQP.BasicProperties) basicProperties, buffer.getBytes());
            return null;
        });
    }

    @Override // io.vertx.rabbitmq.RabbitMQClient
    public Future<ReadStream<RabbitMQConfirmation>> addConfirmListener(int i) {
        return forChannel(channel -> {
            ChannelConfirmHandler channelConfirmHandler = new ChannelConfirmHandler(this.vertx, this, i);
            channel.addConfirmListener(channelConfirmHandler);
            channel.confirmSelect();
            this.channelConfirms = true;
            return channelConfirmHandler.getListener();
        });
    }

    @Override // io.vertx.rabbitmq.RabbitMQClient
    public Future<Void> confirmSelect() {
        return forChannel(channel -> {
            channel.confirmSelect();
            this.channelConfirms = true;
            return null;
        });
    }

    @Override // io.vertx.rabbitmq.RabbitMQClient
    public Future<Void> waitForConfirms() {
        return forChannel(channel -> {
            channel.waitForConfirmsOrDie();
            return null;
        });
    }

    @Override // io.vertx.rabbitmq.RabbitMQClient
    public Future<Void> waitForConfirms(long j) {
        return forChannel(channel -> {
            channel.waitForConfirmsOrDie(j);
            return null;
        });
    }

    @Override // io.vertx.rabbitmq.RabbitMQClient
    public Future<Void> basicQos(int i, int i2, boolean z) {
        return forChannel(channel -> {
            channel.basicQos(i, i2, z);
            return null;
        });
    }

    @Override // io.vertx.rabbitmq.RabbitMQClient
    public Future<Void> exchangeDeclare(String str, String str2, boolean z, boolean z2) {
        return exchangeDeclare(str, str2, z, z2, emptyConfig);
    }

    @Override // io.vertx.rabbitmq.RabbitMQClient
    public Future<Void> exchangeDeclare(String str, String str2, boolean z, boolean z2, JsonObject jsonObject) {
        return forChannel(channel -> {
            channel.exchangeDeclare(str, str2, z, z2, new LinkedHashMap(jsonObject.getMap()));
            return null;
        });
    }

    @Override // io.vertx.rabbitmq.RabbitMQClient
    public Future<Void> exchangeDelete(String str) {
        return forChannel(channel -> {
            channel.exchangeDelete(str);
            return null;
        });
    }

    @Override // io.vertx.rabbitmq.RabbitMQClient
    public Future<Void> exchangeBind(String str, String str2, String str3) {
        return forChannel(channel -> {
            channel.exchangeBind(str, str2, str3);
            return null;
        });
    }

    @Override // io.vertx.rabbitmq.RabbitMQClient
    public Future<Void> exchangeBind(String str, String str2, String str3, Map<String, Object> map) {
        return forChannel(channel -> {
            channel.exchangeBind(str, str2, str3, map);
            return null;
        });
    }

    @Override // io.vertx.rabbitmq.RabbitMQClient
    public Future<Void> exchangeUnbind(String str, String str2, String str3) {
        return forChannel(channel -> {
            channel.exchangeUnbind(str, str2, str3);
            return null;
        });
    }

    @Override // io.vertx.rabbitmq.RabbitMQClient
    public Future<Void> exchangeUnbind(String str, String str2, String str3, Map<String, Object> map) {
        return forChannel(channel -> {
            channel.exchangeUnbind(str, str2, str3, map);
            return null;
        });
    }

    @Override // io.vertx.rabbitmq.RabbitMQClient
    public Future<JsonObject> queueDeclareAuto() {
        return forChannel(channel -> {
            return Utils.toJson(channel.queueDeclare());
        });
    }

    @Override // io.vertx.rabbitmq.RabbitMQClient
    public Future<AMQP.Queue.DeclareOk> queueDeclare(String str, boolean z, boolean z2, boolean z3) {
        return queueDeclare(str, z, z2, z3, emptyConfig);
    }

    @Override // io.vertx.rabbitmq.RabbitMQClient
    public Future<AMQP.Queue.DeclareOk> queueDeclare(String str, boolean z, boolean z2, boolean z3, JsonObject jsonObject) {
        return forChannel(channel -> {
            return channel.queueDeclare(str, z, z2, z3, new LinkedHashMap(jsonObject.getMap()));
        });
    }

    @Override // io.vertx.rabbitmq.RabbitMQClient
    public Future<AMQP.Queue.DeleteOk> queueDelete(String str) {
        return forChannel(channel -> {
            return channel.queueDelete(str);
        });
    }

    @Override // io.vertx.rabbitmq.RabbitMQClient
    public Future<AMQP.Queue.DeleteOk> queueDeleteIf(String str, boolean z, boolean z2) {
        return forChannel(channel -> {
            return channel.queueDelete(str, z, z2);
        });
    }

    @Override // io.vertx.rabbitmq.RabbitMQClient
    public Future<Void> queueBind(String str, String str2, String str3) {
        return forChannel(channel -> {
            channel.queueBind(str, str2, str3);
            return null;
        });
    }

    @Override // io.vertx.rabbitmq.RabbitMQClient
    public Future<Void> queueBind(String str, String str2, String str3, Map<String, Object> map) {
        return forChannel(channel -> {
            channel.queueBind(str, str2, str3, map);
            return null;
        });
    }

    @Override // io.vertx.rabbitmq.RabbitMQClient
    public Future<Void> queueUnbind(String str, String str2, String str3) {
        return forChannel(channel -> {
            channel.queueUnbind(str, str2, str3);
            return null;
        });
    }

    @Override // io.vertx.rabbitmq.RabbitMQClient
    public Future<Void> queueUnbind(String str, String str2, String str3, Map<String, Object> map) {
        return forChannel(channel -> {
            channel.queueUnbind(str, str2, str3, map);
            return null;
        });
    }

    @Override // io.vertx.rabbitmq.RabbitMQClient
    public Future<Long> messageCount(String str) {
        return forChannel(channel -> {
            return Long.valueOf(channel.messageCount(str));
        });
    }

    @Override // io.vertx.rabbitmq.RabbitMQClient
    public Future<Void> start() {
        log.info("Starting rabbitmq client");
        return start((ContextInternal) this.vertx.getOrCreateContext(), 0);
    }

    private Future<Void> start(ContextInternal contextInternal, int i) {
        Promise<Void> promise = Promise.promise();
        tryConnect(contextInternal, i, promise);
        return promise.future();
    }

    public void tryConnect(ContextInternal contextInternal, int i, Promise<Void> promise) {
        contextInternal.executeBlocking(() -> {
            try {
                connect();
                return null;
            } catch (IOException | TimeoutException e) {
                log.error("Could not connect to rabbitmq", e);
                throw e;
            }
        }).onSuccess(r3 -> {
            promise.complete();
        }).onFailure(th -> {
            if (this.retries == 0 || !(this.hasConnected || this.config.isAutomaticRecoveryOnInitialConnection())) {
                log.error("Retries disabled. Will not attempt to restart");
                promise.fail(th);
            } else if (i >= this.retries) {
                log.info("Max number of connect attempts (" + this.retries + ") reached. Will not attempt to connect again");
                promise.fail(th);
            } else {
                long reconnectInterval = this.config.getReconnectInterval();
                log.info("Attempting to reconnect to rabbitmq...");
                this.vertx.setTimer(reconnectInterval, l -> {
                    log.debug("Reconnect attempt # " + i);
                    tryConnect(contextInternal, i + 1, promise);
                });
            }
        });
    }

    @Override // io.vertx.rabbitmq.RabbitMQClient
    public Future<Void> stop() {
        log.info("Stopping rabbitmq client");
        return this.vertx.executeBlocking(() -> {
            disconnect();
            return null;
        });
    }

    private <T> Future<T> forChannel(ChannelHandler<T> channelHandler) {
        ContextInternal orCreateContext = this.vertx.getOrCreateContext();
        if (this.connection == null || this.channel == null) {
            return orCreateContext.failedFuture("Not connected");
        }
        if (!this.channel.isOpen()) {
            try {
                log.debug("channel is close, try create Channel");
                this.channelInstance++;
                this.channel = this.connection.createChannel();
                if (this.channelConfirms) {
                    this.channel.confirmSelect();
                }
            } catch (IOException e) {
                log.debug("create channel error");
                return orCreateContext.failedFuture(e);
            }
        }
        return this.vertx.executeBlocking(() -> {
            return channelHandler.handle(this.channel);
        });
    }

    private Future<Void> connect() throws IOException, TimeoutException {
        log.debug("Connecting to rabbitmq...");
        this.connection = newConnection(this.vertx, this.config);
        this.connection.addShutdownListener(this);
        this.channelInstance++;
        this.channel = this.connection.createChannel();
        Promise<Void> promise = Promise.promise();
        if (this.connectionEstablishedCallbacks.isEmpty()) {
            promise.complete();
        } else {
            connectCallbackHandler(Future.succeededFuture(), this.connectionEstablishedCallbacks.iterator(), promise);
        }
        log.debug("Connected to rabbitmq !");
        this.hasConnected = true;
        return promise.future();
    }

    private void connectCallbackHandler(AsyncResult<Void> asyncResult, Iterator<Handler<Promise<Void>>> it, Promise<Void> promise) {
        try {
            if (asyncResult.failed()) {
                promise.fail(asyncResult.cause());
            } else if (it.hasNext()) {
                Handler<Promise<Void>> next = it.next();
                Promise promise2 = Promise.promise();
                next.handle(promise2);
                promise2.future().onComplete(asyncResult2 -> {
                    connectCallbackHandler(asyncResult2, it, promise);
                });
            } else {
                promise.complete();
            }
        } catch (Throwable th) {
            log.error("Exception whilst running connection stablished callback: ", th);
            promise.fail(th);
        }
    }

    private void disconnect() throws IOException {
        try {
            log.debug("Disconnecting from rabbitmq...");
            if (this.connection != null) {
                this.connection.close();
            }
            log.debug("Disconnected from rabbitmq !");
        } catch (AlreadyClosedException e) {
            log.debug("Already disconnected from rabbitmq !");
        } finally {
            this.connection = null;
            this.channel = null;
        }
    }

    public void shutdownCompleted(ShutdownSignalException shutdownSignalException) {
        if (shutdownSignalException.isInitiatedByApplication()) {
            return;
        }
        log.info("RabbitMQ connection shutdown! The client will attempt to reconnect automatically", shutdownSignalException);
        restartConnect(0, (r3, th) -> {
            log.info("reconnect success");
        });
    }
}
