package io.vertx.rabbitmq.impl;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.internal.logging.Logger;
import io.vertx.core.internal.logging.LoggerFactory;
import io.vertx.rabbitmq.RabbitMQChannel;
import io.vertx.rabbitmq.RabbitMQChannelBuilder;
import io.vertx.rabbitmq.RabbitMQConsumer;
import io.vertx.rabbitmq.RabbitMQConsumerOptions;
import io.vertx.rabbitmq.RabbitMQMessage;
import io.vertx.rabbitmq.RabbitMQMessageCodec;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import java.util.function.Supplier;

/* loaded from: input_file:io/vertx/rabbitmq/impl/RabbitMQConsumerImpl.class */
public class RabbitMQConsumerImpl<T> implements RabbitMQConsumer, Consumer {
    private static final Logger log = LoggerFactory.getLogger(RabbitMQConsumerImpl.class);
    private final RabbitMQConnectionImpl connection;
    private final Handler<RabbitMQConsumer> consumerOkHandler;
    private final Handler<RabbitMQConsumer> cancelOkHandler;
    private final Handler<RabbitMQConsumer> cancelHandler;
    private final Handler<RabbitMQConsumer> shutdownSignalHandler;
    private final Handler<RabbitMQConsumer> recoverOkHandler;
    private final long reconnectIntervalMs;
    private final boolean autoAck;
    private final boolean exclusive;
    private final Map<String, Object> arguments;
    private final Supplier<String> queueNameSupplier;
    private final RabbitMQMessageCodec<T> messageCodec;
    private final BiFunction<RabbitMQConsumer, RabbitMQMessage<T>, Future<Void>> handler;
    private RabbitMQChannel channel;
    private String consumerTag;
    private boolean cancelled;
    private final Context vertxContext;
    private final AtomicLong consumeCount = new AtomicLong();

    public static <U> Future<RabbitMQConsumer> create(RabbitMQChannelBuilder rabbitMQChannelBuilder, RabbitMQMessageCodec<U> rabbitMQMessageCodec, Supplier<String> supplier, RabbitMQConsumerOptions rabbitMQConsumerOptions, BiFunction<RabbitMQConsumer, RabbitMQMessage<U>, Future<Void>> biFunction) {
        return new RabbitMQConsumerImpl(rabbitMQChannelBuilder, rabbitMQMessageCodec, supplier, rabbitMQConsumerOptions, biFunction).start(rabbitMQChannelBuilder);
    }

    public RabbitMQConsumerImpl(RabbitMQChannelBuilder rabbitMQChannelBuilder, RabbitMQMessageCodec<T> rabbitMQMessageCodec, Supplier<String> supplier, RabbitMQConsumerOptions rabbitMQConsumerOptions, BiFunction<RabbitMQConsumer, RabbitMQMessage<T>, Future<Void>> biFunction) {
        this.connection = rabbitMQChannelBuilder.getConnection();
        this.consumerOkHandler = rabbitMQConsumerOptions.getConsumerOkHandler();
        this.cancelOkHandler = rabbitMQConsumerOptions.getCancelOkHandler();
        this.cancelHandler = rabbitMQConsumerOptions.getCancelHandler();
        this.shutdownSignalHandler = rabbitMQConsumerOptions.getShutdownSignalHandler();
        this.recoverOkHandler = rabbitMQConsumerOptions.getRecoverOkHandler();
        this.reconnectIntervalMs = rabbitMQConsumerOptions.getReconnectInterval();
        this.autoAck = rabbitMQConsumerOptions.isAutoAck();
        this.exclusive = rabbitMQConsumerOptions.isExclusive();
        this.consumerTag = rabbitMQConsumerOptions.getConsumerTag();
        this.queueNameSupplier = supplier;
        this.arguments = rabbitMQConsumerOptions.getArguments().isEmpty() ? Collections.emptyMap() : new HashMap<>(rabbitMQConsumerOptions.getArguments());
        this.messageCodec = rabbitMQMessageCodec;
        this.handler = biFunction;
        this.vertxContext = this.connection.getVertx().getOrCreateContext();
    }

    public Future<RabbitMQConsumer> start(RabbitMQChannelBuilder rabbitMQChannelBuilder) {
        return rabbitMQChannelBuilder.openChannel().compose(rabbitMQChannel -> {
            this.channel = rabbitMQChannel;
            return consume(Promise.promise());
        }).map(this);
    }

    @Override // io.vertx.rabbitmq.RabbitMQConsumer
    public RabbitMQChannel getChannel() {
        return this.channel;
    }

    @Override // io.vertx.rabbitmq.RabbitMQConsumer
    public String getConsumerTag() {
        return this.consumerTag;
    }

    @Override // io.vertx.rabbitmq.RabbitMQConsumer
    public Future<Void> cancel() {
        this.cancelled = true;
        return this.channel.basicCancel(this.consumerTag);
    }

    public void handleConsumeOk(String str) {
        this.consumerTag = str;
        if (this.consumerOkHandler != null) {
            this.consumerOkHandler.handle(this);
        }
    }

    public void handleCancelOk(String str) {
        if (this.cancelOkHandler != null) {
            this.cancelOkHandler.handle(this);
        }
    }

    public void handleCancel(String str) throws IOException {
        if (this.cancelHandler != null) {
            this.cancelHandler.handle(this);
        }
    }

    public void handleShutdownSignal(String str, ShutdownSignalException shutdownSignalException) {
        if (this.reconnectIntervalMs > 0 && !this.cancelled) {
            consume(Promise.promise());
        }
        if (this.shutdownSignalHandler != null) {
            this.shutdownSignalHandler.handle(this);
        }
    }

    public void handleRecoverOk(String str) {
        if (this.recoverOkHandler != null) {
            this.recoverOkHandler.handle(this);
        }
    }

    public void handleDelivery(String str, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
        CompletableFuture completableFuture = new CompletableFuture();
        RabbitMQMessageImpl rabbitMQMessageImpl = new RabbitMQMessageImpl(this.channel, this.channel.getChannelNumber(), this.messageCodec.decodeFromBytes(bArr), str, envelope, basicProperties);
        this.vertxContext.runOnContext(r7 -> {
            this.handler.apply(this, rabbitMQMessageImpl).andThen(asyncResult -> {
                completableFuture.complete(null);
            });
        });
        try {
            completableFuture.get();
        } catch (Throwable th) {
        }
    }

    private Future<Void> consume(Promise<Void> promise) {
        String str = this.queueNameSupplier.get();
        this.channel.basicConsume(str, false, this.channel.getChannelId(), false, this.exclusive, this.arguments, this).onSuccess(str2 -> {
            promise.complete();
        }).onFailure(th -> {
            if (this.reconnectIntervalMs <= 0 || this.cancelled || this.connection.isClosed()) {
                log.debug("Failed to consume: ", th);
                promise.fail(th);
            } else {
                log.debug("Failed to consume " + str + " (" + String.valueOf(th.getClass()) + ", \"" + th.getMessage() + "\"), will try again after " + this.reconnectIntervalMs + "ms");
                this.connection.getVertx().setTimer(this.reconnectIntervalMs, l -> {
                    consume(promise);
                });
            }
        });
        return promise.future();
    }
}
