package org.apache.pulsar.broker.service.persistent;

import com.google.common.annotations.VisibleForTesting;
import io.netty.util.Recycler;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.resourcegroup.ResourceGroupDispatchLimiter;
import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.broker.service.EntryBatchIndexesAcks;
import org.apache.pulsar.broker.service.EntryBatchSizes;
import org.apache.pulsar.broker.service.RedeliveryTracker;
import org.apache.pulsar.broker.service.RedeliveryTrackerDisabled;
import org.apache.pulsar.broker.service.SendMessageInfo;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.broker.transaction.exception.TransactionException;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.util.Backoff;
import org.apache.pulsar.common.util.Codec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.class */
public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcherSingleActiveConsumer implements Dispatcher, AsyncCallbacks.ReadEntriesCallback {
    private final AtomicBoolean isRescheduleReadInProgress;
    protected final PersistentTopic topic;
    protected final Executor topicExecutor;
    protected final String name;
    private Optional<DispatchRateLimiter> dispatchRateLimiter;
    protected volatile boolean havePendingRead;
    protected volatile int readBatchSize;
    protected final Backoff readFailureBackoff;
    private volatile ScheduledFuture<?> readOnActiveConsumerTask;
    private final Object lockForReadOnActiveConsumerTask;
    private final RedeliveryTracker redeliveryTracker;
    private static final Logger log = LoggerFactory.getLogger(PersistentDispatcherSingleActiveConsumer.class);

    /* loaded from: input_file:org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer$ReadEntriesCtx.class */
    public static class ReadEntriesCtx {
        private Consumer consumer;
        private long epoch;
        private final Recycler.Handle<ReadEntriesCtx> recyclerHandle;
        private static final Recycler<ReadEntriesCtx> RECYCLER = new Recycler<ReadEntriesCtx>() { // from class: org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer.ReadEntriesCtx.1
            protected ReadEntriesCtx newObject(Recycler.Handle<ReadEntriesCtx> handle) {
                return new ReadEntriesCtx(handle);
            }

            /* renamed from: newObject, reason: collision with other method in class */
            protected /* bridge */ /* synthetic */ Object m327newObject(Recycler.Handle handle) {
                return newObject((Recycler.Handle<ReadEntriesCtx>) handle);
            }
        };

        private ReadEntriesCtx(Recycler.Handle<ReadEntriesCtx> handle) {
            this.recyclerHandle = handle;
        }

        public static ReadEntriesCtx create(Consumer consumer, long j) {
            ReadEntriesCtx readEntriesCtx = (ReadEntriesCtx) RECYCLER.get();
            readEntriesCtx.consumer = consumer;
            readEntriesCtx.epoch = j;
            return readEntriesCtx;
        }

        Consumer getConsumer() {
            return this.consumer;
        }

        long getEpoch() {
            return this.epoch;
        }

        public void recycle() {
            this.consumer = null;
            this.epoch = 0L;
            this.recyclerHandle.recycle(this);
        }
    }

    public PersistentDispatcherSingleActiveConsumer(ManagedCursor managedCursor, CommandSubscribe.SubType subType, int i, PersistentTopic persistentTopic, Subscription subscription) {
        super(subType, i, persistentTopic.getName(), subscription, persistentTopic.getBrokerService().pulsar().getConfiguration(), managedCursor);
        this.isRescheduleReadInProgress = new AtomicBoolean(false);
        this.dispatchRateLimiter = Optional.empty();
        this.havePendingRead = false;
        this.readOnActiveConsumerTask = null;
        this.lockForReadOnActiveConsumerTask = new Object();
        this.topic = persistentTopic;
        this.topicExecutor = persistentTopic.getBrokerService().getTopicOrderedExecutor().chooseThread(this.topicName);
        this.name = persistentTopic.getName() + " / " + (managedCursor.getName() != null ? Codec.decode(managedCursor.getName()) : "");
        this.readBatchSize = this.serviceConfig.getDispatcherMaxReadBatchSize();
        this.readFailureBackoff = new Backoff(this.serviceConfig.getDispatcherReadFailureBackoffInitialTimeInMs(), TimeUnit.MILLISECONDS, this.serviceConfig.getDispatcherReadFailureBackoffMaxTimeInMs(), TimeUnit.MILLISECONDS, this.serviceConfig.getDispatcherReadFailureBackoffMandatoryStopTimeInMs(), TimeUnit.MILLISECONDS);
        this.redeliveryTracker = RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED;
        initializeDispatchRateLimiterIfNeeded();
    }

    @Override // org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer
    protected void scheduleReadOnActiveConsumer() {
        cancelPendingRead();
        if (this.havePendingRead) {
            return;
        }
        if (this.subscriptionType != CommandSubscribe.SubType.Failover || this.serviceConfig.getActiveConsumerFailoverDelayTimeMillis() <= 0) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Rewind cursor and read more entries without delay", this.name);
            }
            Consumer consumer = ACTIVE_CONSUMER_UPDATER.get(this);
            this.cursor.rewind(consumer != null && consumer.readCompacted());
            notifyActiveConsumerChanged(consumer);
            readMoreEntries(consumer);
            return;
        }
        if (this.readOnActiveConsumerTask != null) {
            return;
        }
        synchronized (this.lockForReadOnActiveConsumerTask) {
            if (this.readOnActiveConsumerTask != null) {
                return;
            }
            this.readOnActiveConsumerTask = this.topic.getBrokerService().executor().schedule(() -> {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Rewind cursor and read more entries after {} ms delay", this.name, Integer.valueOf(this.serviceConfig.getActiveConsumerFailoverDelayTimeMillis()));
                }
                Consumer consumer2 = ACTIVE_CONSUMER_UPDATER.get(this);
                this.cursor.rewind(consumer2 != null && consumer2.readCompacted());
                notifyActiveConsumerChanged(consumer2);
                readMoreEntries(consumer2);
                this.readOnActiveConsumerTask = null;
            }, this.serviceConfig.getActiveConsumerFailoverDelayTimeMillis(), TimeUnit.MILLISECONDS);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.pulsar.broker.service.AbstractBaseDispatcher
    public boolean isConsumersExceededOnSubscription() {
        return isConsumersExceededOnSubscription(this.topic, this.consumers.size());
    }

    @Override // org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer
    protected void cancelPendingRead() {
        if (this.havePendingRead && this.cursor.cancelPendingReadRequest()) {
            this.havePendingRead = false;
        }
    }

    public void readEntriesComplete(List<Entry> list, Object obj) {
        this.topicExecutor.execute(() -> {
            internalReadEntriesComplete(list, obj);
        });
    }

    public synchronized void internalReadEntriesComplete(List<Entry> list, Object obj) {
        ReadEntriesCtx readEntriesCtx = (ReadEntriesCtx) obj;
        Consumer consumer = readEntriesCtx.getConsumer();
        long epoch = readEntriesCtx.getEpoch();
        readEntriesCtx.recycle();
        if (log.isDebugEnabled()) {
            log.debug("[{}-{}] Got messages: {}", new Object[]{this.name, consumer, Integer.valueOf(list.size())});
        }
        this.havePendingRead = false;
        this.isFirstRead = false;
        if (this.readBatchSize < this.serviceConfig.getDispatcherMaxReadBatchSize()) {
            int min = Math.min(this.readBatchSize * 2, this.serviceConfig.getDispatcherMaxReadBatchSize());
            if (log.isDebugEnabled()) {
                log.debug("[{}-{}] Increasing read batch size from {} to {}", new Object[]{this.name, consumer, Integer.valueOf(this.readBatchSize), Integer.valueOf(min)});
            }
            this.readBatchSize = min;
        }
        this.readFailureBackoff.reduceToHalf();
        Consumer consumer2 = ACTIVE_CONSUMER_UPDATER.get(this);
        if (this.isKeyHashRangeFiltered) {
            Iterator<Entry> it = list.iterator();
            while (it.hasNext()) {
                Entry next = it.next();
                Consumer select = this.stickyKeyConsumerSelector.select(peekStickyKey(next.getDataBuffer()));
                if (select == null || consumer2 != select) {
                    next.release();
                    it.remove();
                }
            }
        }
        if (consumer2 != null && consumer == consumer2) {
            EntryBatchSizes entryBatchSizes = EntryBatchSizes.get(list.size());
            SendMessageInfo threadLocal = SendMessageInfo.getThreadLocal();
            EntryBatchIndexesAcks entryBatchIndexesAcks = EntryBatchIndexesAcks.get(list.size());
            filterEntriesForConsumer(list, entryBatchSizes, threadLocal, entryBatchIndexesAcks, this.cursor, false, consumer2);
            dispatchEntriesToConsumer(consumer2, list, entryBatchSizes, entryBatchIndexesAcks, threadLocal, epoch);
            return;
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}] rewind because no available consumer found", this.name);
        }
        list.forEach((v0) -> {
            v0.release();
        });
        this.cursor.rewind(consumer2 != null ? consumer2.readCompacted() : consumer.readCompacted());
        if (consumer2 != null) {
            notifyActiveConsumerChanged(consumer2);
            readMoreEntries(consumer2);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void dispatchEntriesToConsumer(Consumer consumer, List<Entry> list, EntryBatchSizes entryBatchSizes, EntryBatchIndexesAcks entryBatchIndexesAcks, SendMessageInfo sendMessageInfo, long j) {
        consumer.sendMessages(list, entryBatchSizes, entryBatchIndexesAcks, sendMessageInfo.getTotalMessages(), sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(), this.redeliveryTracker, j).addListener(future -> {
            if (future.isSuccess()) {
                acquirePermitsForDeliveredMessages(this.topic, this.cursor, list.size(), sendMessageInfo.getTotalMessages(), sendMessageInfo.getTotalBytes());
                this.topicExecutor.execute(() -> {
                    synchronized (this) {
                        readMoreEntries(getActiveConsumer());
                    }
                });
            }
        });
    }

    @Override // org.apache.pulsar.broker.service.Dispatcher
    public void consumerFlow(Consumer consumer, int i) {
        this.topicExecutor.execute(() -> {
            internalConsumerFlow(consumer);
        });
    }

    private synchronized void internalConsumerFlow(Consumer consumer) {
        if (this.havePendingRead) {
            if (log.isDebugEnabled()) {
                log.debug("[{}-{}] Ignoring flow control message since we already have a pending read req", this.name, consumer);
            }
        } else if (ACTIVE_CONSUMER_UPDATER.get(this) != consumer) {
            if (log.isDebugEnabled()) {
                log.debug("[{}-{}] Ignoring flow control message since consumer is not active partition consumer", this.name, consumer);
            }
        } else if (this.readOnActiveConsumerTask != null) {
            if (log.isDebugEnabled()) {
                log.debug("[{}-{}] Ignoring flow control message since consumer is waiting for cursor to be rewinded", this.name, consumer);
            }
        } else {
            if (log.isDebugEnabled()) {
                log.debug("[{}-{}] Trigger new read after receiving flow control message", this.name, consumer);
            }
            readMoreEntries(consumer);
        }
    }

    @Override // org.apache.pulsar.broker.service.Dispatcher
    public void redeliverUnacknowledgedMessages(Consumer consumer, long j) {
        this.topicExecutor.execute(() -> {
            internalRedeliverUnacknowledgedMessages(consumer, j);
        });
    }

    private synchronized void internalRedeliverUnacknowledgedMessages(Consumer consumer, long j) {
        if (j > consumer.getConsumerEpoch()) {
            if (log.isDebugEnabled()) {
                log.debug("[{}-{}] Update epoch, old epoch [{}], new epoch [{}]", new Object[]{this.name, consumer, Long.valueOf(consumer.getConsumerEpoch()), Long.valueOf(j)});
            }
            consumer.setConsumerEpoch(j);
        }
        if (consumer != ACTIVE_CONSUMER_UPDATER.get(this)) {
            log.info("[{}-{}] Ignoring reDeliverUnAcknowledgedMessages: Only the active consumer can call resend", this.name, consumer);
            return;
        }
        if (this.readOnActiveConsumerTask != null) {
            log.info("[{}-{}] Ignoring reDeliverUnAcknowledgedMessages: consumer is waiting for cursor to be rewinded", this.name, consumer);
            return;
        }
        this.cursor.cancelPendingReadRequest();
        this.havePendingRead = false;
        this.cursor.rewind(consumer.readCompacted());
        if (log.isDebugEnabled()) {
            log.debug("[{}-{}] Cursor rewinded, redelivering unacknowledged messages. ", this.name, consumer);
        }
        readMoreEntries(consumer);
    }

    @Override // org.apache.pulsar.broker.service.Dispatcher
    public void redeliverUnacknowledgedMessages(Consumer consumer, List<PositionImpl> list) {
        redeliverUnacknowledgedMessages(consumer, -1L);
    }

    @Override // org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer
    protected void readMoreEntries(Consumer consumer) {
        if (this.cursor.isClosed()) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Cursor is already closed, skipping read more entries", this.cursor.getName());
                return;
            }
            return;
        }
        if (null == consumer) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Skipping read for the topic, Due to the current consumer is null", this.topic.getName());
                return;
            }
            return;
        }
        if (this.havePendingRead) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Skipping read for the topic, Due to we have pending read.", this.topic.getName());
                return;
            }
            return;
        }
        if (consumer.getAvailablePermits() <= 0) {
            if (log.isDebugEnabled()) {
                log.debug("[{}-{}] Consumer buffer is full, pause reading", this.name, consumer);
                return;
            }
            return;
        }
        synchronized (this) {
            if (this.havePendingRead) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Skipping read for the topic, Due to we have pending read.", this.topic.getName());
                }
                return;
            }
            Pair<Integer, Long> calculateToRead = calculateToRead(consumer);
            int intValue = ((Integer) calculateToRead.getLeft()).intValue();
            long longValue = ((Long) calculateToRead.getRight()).longValue();
            if (-1 == intValue || longValue == -1) {
                return;
            }
            if (log.isDebugEnabled()) {
                log.debug("[{}-{}] Schedule read of {} messages", new Object[]{this.name, consumer, Integer.valueOf(intValue)});
            }
            this.havePendingRead = true;
            if (consumer.readCompacted()) {
                this.topic.getCompactedTopic().asyncReadEntriesOrWait(this.cursor, intValue, longValue, this.topic.getMaxReadPosition(), this.isFirstRead, this, consumer);
            } else {
                this.cursor.asyncReadEntriesOrWait(intValue, longValue, this, ReadEntriesCtx.create(consumer, consumer.getConsumerEpoch()), this.topic.getMaxReadPosition());
            }
        }
    }

    @Override // org.apache.pulsar.broker.service.AbstractBaseDispatcher
    protected void reScheduleRead() {
        if (this.isRescheduleReadInProgress.compareAndSet(false, true)) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] [{}] Reschedule message read in {} ms", new Object[]{this.topic.getName(), this.name, 1000});
            }
            this.topic.getBrokerService().executor().schedule(() -> {
                this.isRescheduleReadInProgress.set(false);
                readMoreEntries(ACTIVE_CONSUMER_UPDATER.get(this));
            }, 1000L, TimeUnit.MILLISECONDS);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Pair<Integer, Long> calculateToRead(Consumer consumer) {
        int availablePermits = consumer.getAvailablePermits();
        if (!consumer.isWritable()) {
            availablePermits = 1;
        }
        int min = Math.min(availablePermits, this.readBatchSize);
        long dispatcherMaxReadSizeBytes = this.serviceConfig.getDispatcherMaxReadSizeBytes();
        if (consumer.isPreciseDispatcherFlowControl()) {
            min = Math.min((int) Math.ceil((availablePermits * 1.0d) / Math.max(1, consumer.getAvgMessagesPerEntry())), this.readBatchSize);
        }
        if (this.serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !this.cursor.isActive()) {
            if (this.topic.getBrokerDispatchRateLimiter().isPresent()) {
                DispatchRateLimiter dispatchRateLimiter = this.topic.getBrokerDispatchRateLimiter().get();
                if (reachDispatchRateLimit(dispatchRateLimiter)) {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] message-read exceeded broker message-rate {}/{}, schedule after a {}", new Object[]{this.name, Long.valueOf(dispatchRateLimiter.getDispatchRateOnMsg()), Long.valueOf(dispatchRateLimiter.getDispatchRateOnByte()), 1000});
                    }
                    return Pair.of(-1, -1L);
                }
                Pair<Integer, Long> updateMessagesToRead = updateMessagesToRead(dispatchRateLimiter, min, dispatcherMaxReadSizeBytes);
                min = ((Integer) updateMessagesToRead.getLeft()).intValue();
                dispatcherMaxReadSizeBytes = ((Long) updateMessagesToRead.getRight()).longValue();
            }
            if (this.topic.getDispatchRateLimiter().isPresent()) {
                DispatchRateLimiter dispatchRateLimiter2 = this.topic.getDispatchRateLimiter().get();
                if (reachDispatchRateLimit(dispatchRateLimiter2)) {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] message-read exceeded topic message-rate {}/{}, schedule after a {}", new Object[]{this.name, Long.valueOf(dispatchRateLimiter2.getDispatchRateOnMsg()), Long.valueOf(dispatchRateLimiter2.getDispatchRateOnByte()), 1000});
                    }
                    return Pair.of(-1, -1L);
                }
                Pair<Integer, Long> updateMessagesToRead2 = updateMessagesToRead(dispatchRateLimiter2, min, dispatcherMaxReadSizeBytes);
                min = ((Integer) updateMessagesToRead2.getLeft()).intValue();
                dispatcherMaxReadSizeBytes = ((Long) updateMessagesToRead2.getRight()).longValue();
            }
            if (this.dispatchRateLimiter.isPresent()) {
                if (reachDispatchRateLimit(this.dispatchRateLimiter.get())) {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] message-read exceeded subscription message-rate {}/{}, schedule after a {}", new Object[]{this.name, Long.valueOf(this.dispatchRateLimiter.get().getDispatchRateOnMsg()), Long.valueOf(this.dispatchRateLimiter.get().getDispatchRateOnByte()), 1000});
                    }
                    return Pair.of(-1, -1L);
                }
                Pair<Integer, Long> updateMessagesToRead3 = updateMessagesToRead(this.dispatchRateLimiter.get(), min, dispatcherMaxReadSizeBytes);
                min = ((Integer) updateMessagesToRead3.getLeft()).intValue();
                dispatcherMaxReadSizeBytes = ((Long) updateMessagesToRead3.getRight()).longValue();
            }
            if (this.topic.getResourceGroupDispatchRateLimiter().isPresent()) {
                ResourceGroupDispatchLimiter resourceGroupDispatchLimiter = this.topic.getResourceGroupDispatchRateLimiter().get();
                long availableDispatchRateLimitOnMsg = resourceGroupDispatchLimiter.getAvailableDispatchRateLimitOnMsg();
                long availableDispatchRateLimitOnByte = resourceGroupDispatchLimiter.getAvailableDispatchRateLimitOnByte();
                if (availableDispatchRateLimitOnMsg == 0 || availableDispatchRateLimitOnByte == 0) {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] message-read exceeded resourcegroup message-rate {}/{}, schedule after a {}", new Object[]{this.name, Long.valueOf(availableDispatchRateLimitOnMsg), Long.valueOf(availableDispatchRateLimitOnByte), 1000});
                    }
                    reScheduleRead();
                    return Pair.of(-1, -1L);
                }
                Pair<Integer, Long> computeReadLimits = computeReadLimits(min, (int) availableDispatchRateLimitOnMsg, dispatcherMaxReadSizeBytes, availableDispatchRateLimitOnByte);
                min = ((Integer) computeReadLimits.getLeft()).intValue();
                dispatcherMaxReadSizeBytes = ((Long) computeReadLimits.getRight()).longValue();
            }
        }
        return Pair.of(Integer.valueOf(Math.max(min, 1)), Long.valueOf(Math.max(dispatcherMaxReadSizeBytes, 1L)));
    }

    public void readEntriesFailed(ManagedLedgerException managedLedgerException, Object obj) {
        this.topicExecutor.execute(() -> {
            internalReadEntriesFailed(managedLedgerException, obj);
        });
    }

    private synchronized void internalReadEntriesFailed(ManagedLedgerException managedLedgerException, Object obj) {
        this.havePendingRead = false;
        ReadEntriesCtx readEntriesCtx = (ReadEntriesCtx) obj;
        Consumer consumer = readEntriesCtx.getConsumer();
        readEntriesCtx.recycle();
        if (managedLedgerException instanceof ManagedLedgerException.CursorAlreadyClosedException) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Cursor was already closed, skipping read more entries", this.cursor.getName());
                return;
            }
            return;
        }
        if (managedLedgerException instanceof ManagedLedgerException.ConcurrentWaitCallbackException) {
            return;
        }
        long next = this.readFailureBackoff.next();
        if (managedLedgerException instanceof ManagedLedgerException.NoMoreEntriesToReadException) {
            if (this.cursor.getNumberOfEntriesInBacklog(false) == 0) {
                checkAndApplyReachedEndOfTopicOrTopicMigration(this.consumers);
            }
        } else if ((managedLedgerException.getCause() instanceof TransactionException.TransactionNotSealedException) || (managedLedgerException.getCause() instanceof ManagedLedgerException.OffloadReadHandleClosedException)) {
            next = 1;
            if (log.isDebugEnabled()) {
                log.debug("[{}] Error reading transaction entries : {}, - Retrying to read in {} seconds", new Object[]{this.name, managedLedgerException.getMessage(), Double.valueOf(1 / 1000.0d)});
            }
        } else if (!(managedLedgerException instanceof ManagedLedgerException.TooManyRequestsException)) {
            log.error("[{}-{}] Error reading entries at {} : {} - Retrying to read in {} seconds", new Object[]{this.name, consumer, this.cursor.getReadPosition(), managedLedgerException.getMessage(), Double.valueOf(next / 1000.0d)});
        } else if (log.isDebugEnabled()) {
            log.debug("[{}-{}] Got throttled by bookies while reading at {} : {} - Retrying to read in {} seconds", new Object[]{this.name, consumer, this.cursor.getReadPosition(), managedLedgerException.getMessage(), Double.valueOf(next / 1000.0d)});
        }
        Objects.requireNonNull(consumer);
        this.readBatchSize = this.serviceConfig.getDispatcherMinReadBatchSize();
        scheduleReadEntriesWithDelay(consumer, next);
    }

    @VisibleForTesting
    void scheduleReadEntriesWithDelay(Consumer consumer, long j) {
        this.topic.getBrokerService().executor().schedule(() -> {
            this.topicExecutor.execute(() -> {
                synchronized (this) {
                    Consumer consumer2 = ACTIVE_CONSUMER_UPDATER.get(this);
                    if (consumer2 == null || this.havePendingRead) {
                        log.info("[{}-{}] Skipping read retry: Current Consumer {}, havePendingRead {}", new Object[]{this.name, consumer, consumer2, Boolean.valueOf(this.havePendingRead)});
                    } else {
                        if (log.isDebugEnabled()) {
                            log.debug("[{}-{}] Retrying read operation", this.name, consumer);
                        }
                        if (consumer2 != consumer) {
                            notifyActiveConsumerChanged(consumer2);
                        }
                        readMoreEntries(consumer2);
                    }
                }
            });
        }, j, TimeUnit.MILLISECONDS);
    }

    @Override // org.apache.pulsar.broker.service.Dispatcher
    public void addUnAckedMessages(int i) {
    }

    @Override // org.apache.pulsar.broker.service.Dispatcher
    public RedeliveryTracker getRedeliveryTracker() {
        return this.redeliveryTracker;
    }

    @Override // org.apache.pulsar.broker.service.Dispatcher
    public Optional<DispatchRateLimiter> getRateLimiter() {
        return this.dispatchRateLimiter;
    }

    @Override // org.apache.pulsar.broker.service.Dispatcher
    public void updateRateLimiter() {
        if (initializeDispatchRateLimiterIfNeeded()) {
            return;
        }
        this.dispatchRateLimiter.ifPresent((v0) -> {
            v0.updateDispatchRate();
        });
    }

    @Override // org.apache.pulsar.broker.service.Dispatcher
    public boolean initializeDispatchRateLimiterIfNeeded() {
        if (this.dispatchRateLimiter.isPresent() || !DispatchRateLimiter.isDispatchRateEnabled(this.topic.getSubscriptionDispatchRate(getSubscriptionName()))) {
            return false;
        }
        this.dispatchRateLimiter = Optional.of(new DispatchRateLimiter(this.topic, getSubscriptionName(), DispatchRateLimiter.Type.SUBSCRIPTION));
        return true;
    }

    @Override // org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer, org.apache.pulsar.broker.service.Dispatcher
    public CompletableFuture<Void> close() {
        IS_CLOSED_UPDATER.set(this, 1);
        this.dispatchRateLimiter.ifPresent((v0) -> {
            v0.close();
        });
        return disconnectAllConsumers();
    }

    @Override // org.apache.pulsar.broker.service.Dispatcher
    public boolean checkAndUnblockIfStuck() {
        Consumer consumer = ACTIVE_CONSUMER_UPDATER.get(this);
        if (consumer == null || this.cursor.checkAndUpdateReadPositionChanged() || consumer.getAvailablePermits() <= 0 || this.havePendingRead || this.cursor.getNumberOfEntriesInBacklog(false) <= 0) {
            return false;
        }
        log.warn("{}-{} Dispatcher is stuck and unblocking by issuing reads", this.topic.getName(), this.name);
        readMoreEntries(consumer);
        return true;
    }
}
