package io.vertx.rabbitmq.impl;

import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.internal.logging.Logger;
import io.vertx.core.internal.logging.LoggerFactory;
import io.vertx.core.streams.ReadStream;
import io.vertx.core.streams.StreamBase;
import io.vertx.core.streams.impl.InboundBuffer;
import io.vertx.rabbitmq.QueueOptions;
import io.vertx.rabbitmq.RabbitMQConsumer;
import io.vertx.rabbitmq.RabbitMQMessage;
import java.io.IOException;

/* loaded from: input_file:io/vertx/rabbitmq/impl/RabbitMQConsumerImpl.class */
public class RabbitMQConsumerImpl implements RabbitMQConsumer {
    private static final Logger log = LoggerFactory.getLogger(RabbitMQConsumerImpl.class);
    private Handler<Throwable> exceptionHandler;
    private Handler<Void> endHandler;
    private String queueName;
    private final QueueConsumerHandler consumerHandler;
    private final boolean keepMostRecent;
    private final InboundBuffer<RabbitMQMessage> pending;
    private final int maxQueueSize;
    private volatile boolean cancelled;

    /* JADX INFO: Access modifiers changed from: package-private */
    public RabbitMQConsumerImpl(Context context, QueueConsumerHandler queueConsumerHandler, QueueOptions queueOptions, String str) {
        this.consumerHandler = queueConsumerHandler;
        this.keepMostRecent = queueOptions.isKeepMostRecent();
        this.maxQueueSize = queueOptions.maxInternalQueueSize();
        this.pending = new InboundBuffer(context, this.maxQueueSize).pause();
        this.queueName = str;
    }

    @Override // io.vertx.rabbitmq.RabbitMQConsumer
    public String queueName() {
        return this.queueName;
    }

    @Override // io.vertx.rabbitmq.RabbitMQConsumer
    public RabbitMQConsumer setQueueName(String str) {
        this.queueName = str;
        return this;
    }

    @Override // io.vertx.rabbitmq.RabbitMQConsumer
    public RabbitMQConsumer exceptionHandler(Handler<Throwable> handler) {
        this.exceptionHandler = handler;
        return this;
    }

    @Override // io.vertx.rabbitmq.RabbitMQConsumer
    public RabbitMQConsumer handler(Handler<RabbitMQMessage> handler) {
        if (handler != null) {
            this.pending.handler(rabbitMQMessage -> {
                try {
                    handler.handle(rabbitMQMessage);
                } catch (Exception e) {
                    handleException(e);
                }
            });
        } else {
            this.pending.handler((Handler) null);
        }
        return this;
    }

    @Override // io.vertx.rabbitmq.RabbitMQConsumer
    /* renamed from: pause */
    public RabbitMQConsumer mo3pause() {
        this.pending.pause();
        return this;
    }

    @Override // io.vertx.rabbitmq.RabbitMQConsumer
    /* renamed from: resume */
    public RabbitMQConsumer mo2resume() {
        this.pending.resume();
        return this;
    }

    @Override // io.vertx.rabbitmq.RabbitMQConsumer
    /* renamed from: fetch */
    public RabbitMQConsumer mo1fetch(long j) {
        this.pending.fetch(j);
        return this;
    }

    @Override // io.vertx.rabbitmq.RabbitMQConsumer
    public RabbitMQConsumer endHandler(Handler<Void> handler) {
        this.endHandler = handler;
        return this;
    }

    @Override // io.vertx.rabbitmq.RabbitMQConsumer
    public String consumerTag() {
        return this.consumerHandler.getConsumerTag();
    }

    @Override // io.vertx.rabbitmq.RabbitMQConsumer
    public Future<Void> cancel() {
        Future<Void> failedFuture;
        try {
            log.debug("Cancelling " + consumerTag());
            this.cancelled = true;
            this.consumerHandler.getChannel().basicCancel(consumerTag());
            failedFuture = Future.succeededFuture();
        } catch (IOException e) {
            failedFuture = Future.failedFuture(e);
        }
        handleEnd();
        return failedFuture;
    }

    @Override // io.vertx.rabbitmq.RabbitMQConsumer
    public boolean isCancelled() {
        return this.cancelled;
    }

    @Override // io.vertx.rabbitmq.RabbitMQConsumer
    public boolean isPaused() {
        return false;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void handleMessage(RabbitMQMessage rabbitMQMessage) {
        if (this.pending.size() >= this.maxQueueSize) {
            if (!this.keepMostRecent) {
                log.debug("Discard a received message since stream is paused and buffer flag is false");
                return;
            }
            this.pending.read();
        }
        this.pending.write(rabbitMQMessage);
    }

    private void handleException(Throwable th) {
        if (this.exceptionHandler != null) {
            this.exceptionHandler.handle(th);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void handleEnd() {
        if (this.endHandler != null) {
            this.endHandler.handle((Object) null);
        }
    }

    @Override // io.vertx.rabbitmq.RabbitMQConsumer
    /* renamed from: endHandler */
    public /* bridge */ /* synthetic */ ReadStream mo0endHandler(Handler handler) {
        return endHandler((Handler<Void>) handler);
    }

    @Override // io.vertx.rabbitmq.RabbitMQConsumer
    /* renamed from: handler */
    public /* bridge */ /* synthetic */ ReadStream mo4handler(Handler handler) {
        return handler((Handler<RabbitMQMessage>) handler);
    }

    @Override // io.vertx.rabbitmq.RabbitMQConsumer
    /* renamed from: exceptionHandler */
    public /* bridge */ /* synthetic */ ReadStream mo5exceptionHandler(Handler handler) {
        return exceptionHandler((Handler<Throwable>) handler);
    }

    @Override // io.vertx.rabbitmq.RabbitMQConsumer
    /* renamed from: exceptionHandler */
    public /* bridge */ /* synthetic */ StreamBase mo6exceptionHandler(Handler handler) {
        return exceptionHandler((Handler<Throwable>) handler);
    }
}
