package org.apache.bookkeeper.mledger.impl.cache;

import com.google.common.annotations.VisibleForTesting;
import io.prometheus.client.Gauge;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.ObjectMethods;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import lombok.Generated;
import org.jctools.queues.SpscArrayQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter.class */
public class InflightReadsLimiter {
    private final long maxReadsInFlightSize;
    private long remainingBytes;
    private final long acquireTimeoutMillis;
    private final ScheduledExecutorService timeOutExecutor;
    private final boolean enabled;
    private final Queue<QueuedHandle> queuedHandles;
    private boolean timeoutCheckRunning = false;

    @Generated
    private static final Logger log = LoggerFactory.getLogger(InflightReadsLimiter.class);
    private static final Gauge PULSAR_ML_READS_BUFFER_SIZE = Gauge.build().name("pulsar_ml_reads_inflight_bytes").help("Estimated number of bytes retained by data read from storage or cache").register();
    private static final Gauge PULSAR_ML_READS_AVAILABLE_BUFFER_SIZE = Gauge.build().name("pulsar_ml_reads_available_inflight_bytes").help("Available space for inflight data read from storage or cache").register();
    private static final Handle DISABLED = new Handle(0, 0, true);
    private static final Optional<Handle> DISABLED_OPTIONAL = Optional.of(DISABLED);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter$Handle.class */
    public static final class Handle extends Record {
        private final long permits;
        private final long creationTime;
        private final boolean success;

        Handle(long j, long j2, boolean z) {
            this.permits = j;
            this.creationTime = j2;
            this.success = z;
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, Handle.class), Handle.class, "permits;creationTime;success", "FIELD:Lorg/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter$Handle;->permits:J", "FIELD:Lorg/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter$Handle;->creationTime:J", "FIELD:Lorg/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter$Handle;->success:Z").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, Handle.class), Handle.class, "permits;creationTime;success", "FIELD:Lorg/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter$Handle;->permits:J", "FIELD:Lorg/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter$Handle;->creationTime:J", "FIELD:Lorg/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter$Handle;->success:Z").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, Handle.class, Object.class), Handle.class, "permits;creationTime;success", "FIELD:Lorg/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter$Handle;->permits:J", "FIELD:Lorg/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter$Handle;->creationTime:J", "FIELD:Lorg/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter$Handle;->success:Z").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        public long permits() {
            return this.permits;
        }

        public long creationTime() {
            return this.creationTime;
        }

        public boolean success() {
            return this.success;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter$QueuedHandle.class */
    public static final class QueuedHandle extends Record {
        private final Handle handle;
        private final Consumer<Handle> callback;

        QueuedHandle(Handle handle, Consumer<Handle> consumer) {
            this.handle = handle;
            this.callback = consumer;
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, QueuedHandle.class), QueuedHandle.class, "handle;callback", "FIELD:Lorg/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter$QueuedHandle;->handle:Lorg/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter$Handle;", "FIELD:Lorg/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter$QueuedHandle;->callback:Ljava/util/function/Consumer;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, QueuedHandle.class), QueuedHandle.class, "handle;callback", "FIELD:Lorg/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter$QueuedHandle;->handle:Lorg/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter$Handle;", "FIELD:Lorg/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter$QueuedHandle;->callback:Ljava/util/function/Consumer;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, QueuedHandle.class, Object.class), QueuedHandle.class, "handle;callback", "FIELD:Lorg/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter$QueuedHandle;->handle:Lorg/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter$Handle;", "FIELD:Lorg/apache/bookkeeper/mledger/impl/cache/InflightReadsLimiter$QueuedHandle;->callback:Ljava/util/function/Consumer;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        public Handle handle() {
            return this.handle;
        }

        public Consumer<Handle> callback() {
            return this.callback;
        }
    }

    public InflightReadsLimiter(long j, int i, long j2, ScheduledExecutorService scheduledExecutorService) {
        this.maxReadsInFlightSize = j;
        this.remainingBytes = j;
        this.acquireTimeoutMillis = j2;
        this.timeOutExecutor = scheduledExecutorService;
        if (j > 0) {
            this.enabled = true;
            this.queuedHandles = new SpscArrayQueue(i);
        } else {
            this.enabled = false;
            this.queuedHandles = null;
            PULSAR_ML_READS_BUFFER_SIZE.set(-1.0d);
            PULSAR_ML_READS_AVAILABLE_BUFFER_SIZE.set(-1.0d);
        }
    }

    @VisibleForTesting
    public synchronized long getRemainingBytes() {
        return this.remainingBytes;
    }

    public Optional<Handle> acquire(long j, Consumer<Handle> consumer) {
        return isDisabled() ? DISABLED_OPTIONAL : internalAcquire(j, consumer);
    }

    private synchronized Optional<Handle> internalAcquire(long j, Consumer<Handle> consumer) {
        Handle handle = new Handle(j, System.currentTimeMillis(), true);
        if (this.remainingBytes >= j) {
            this.remainingBytes -= j;
            if (log.isDebugEnabled()) {
                log.debug("acquired permits: {}, creationTime: {}, remainingBytes:{}", new Object[]{Long.valueOf(j), Long.valueOf(handle.creationTime), Long.valueOf(this.remainingBytes)});
            }
            updateMetrics();
            return Optional.of(handle);
        }
        if (j <= this.maxReadsInFlightSize || this.remainingBytes != this.maxReadsInFlightSize) {
            if (this.queuedHandles.offer(new QueuedHandle(handle, consumer))) {
                scheduleTimeOutCheck(this.acquireTimeoutMillis);
                return Optional.empty();
            }
            log.warn("Failed to queue handle for acquiring permits: {}, creationTime: {}, remainingBytes:{}", new Object[]{Long.valueOf(j), Long.valueOf(handle.creationTime), Long.valueOf(this.remainingBytes)});
            return Optional.of(new Handle(0L, handle.creationTime, false));
        }
        this.remainingBytes = 0L;
        if (log.isInfoEnabled()) {
            log.info("Requested permits {} exceeded maxReadsInFlightSize {}, creationTime: {}, remainingBytes:{}. Allowing request with permits set to maxReadsInFlightSize.", new Object[]{Long.valueOf(j), Long.valueOf(this.maxReadsInFlightSize), Long.valueOf(handle.creationTime), Long.valueOf(this.remainingBytes)});
        }
        updateMetrics();
        return Optional.of(new Handle(this.maxReadsInFlightSize, handle.creationTime, true));
    }

    private synchronized void scheduleTimeOutCheck(long j) {
        if (this.acquireTimeoutMillis > 0 && !this.timeoutCheckRunning) {
            this.timeoutCheckRunning = true;
            this.timeOutExecutor.schedule(this::timeoutCheck, j, TimeUnit.MILLISECONDS);
        }
    }

    private synchronized void timeoutCheck() {
        this.timeoutCheckRunning = false;
        long j = 0;
        while (true) {
            QueuedHandle peek = this.queuedHandles.peek();
            if (peek == null) {
                break;
            }
            long currentTimeMillis = System.currentTimeMillis() - peek.handle.creationTime;
            if (currentTimeMillis < this.acquireTimeoutMillis) {
                j = this.acquireTimeoutMillis - currentTimeMillis;
                break;
            } else {
                this.queuedHandles.poll();
                handleTimeout(peek);
            }
        }
        if (j > 0) {
            scheduleTimeOutCheck(j);
        }
    }

    private void handleTimeout(QueuedHandle queuedHandle) {
        if (log.isDebugEnabled()) {
            log.debug("timed out queued permits: {}, creationTime: {}, remainingBytes:{}", new Object[]{Long.valueOf(queuedHandle.handle.permits), Long.valueOf(queuedHandle.handle.creationTime), Long.valueOf(this.remainingBytes)});
        }
        try {
            queuedHandle.callback.accept(new Handle(0L, queuedHandle.handle.creationTime, false));
        } catch (Exception e) {
            log.error("Error in callback of timed out queued permits: {}, creationTime: {}, remainingBytes:{}", new Object[]{Long.valueOf(queuedHandle.handle.permits), Long.valueOf(queuedHandle.handle.creationTime), Long.valueOf(this.remainingBytes), e});
        }
    }

    public void release(Handle handle) {
        if (handle == DISABLED) {
            return;
        }
        internalRelease(handle);
    }

    private synchronized void internalRelease(Handle handle) {
        if (log.isDebugEnabled()) {
            log.debug("release permits: {}, creationTime: {}, remainingBytes:{}", new Object[]{Long.valueOf(handle.permits), Long.valueOf(handle.creationTime), Long.valueOf(getRemainingBytes())});
        }
        this.remainingBytes += handle.permits;
        while (true) {
            QueuedHandle peek = this.queuedHandles.peek();
            if (peek != null) {
                if (!(this.acquireTimeoutMillis > 0 && System.currentTimeMillis() - peek.handle.creationTime > this.acquireTimeoutMillis)) {
                    if (this.remainingBytes < peek.handle.permits && (peek.handle.permits <= this.maxReadsInFlightSize || this.remainingBytes != this.maxReadsInFlightSize)) {
                        break;
                    }
                    this.queuedHandles.poll();
                    handleQueuedHandle(peek);
                } else {
                    this.queuedHandles.poll();
                    handleTimeout(peek);
                }
            } else {
                break;
            }
        }
        updateMetrics();
    }

    private void handleQueuedHandle(QueuedHandle queuedHandle) {
        long j = queuedHandle.handle.permits;
        Handle handle = queuedHandle.handle;
        if (j <= this.maxReadsInFlightSize || this.remainingBytes != this.maxReadsInFlightSize) {
            this.remainingBytes -= j;
            if (log.isDebugEnabled()) {
                log.debug("acquired queued permits: {}, creationTime: {}, remainingBytes:{}", new Object[]{Long.valueOf(j), Long.valueOf(queuedHandle.handle.creationTime), Long.valueOf(this.remainingBytes)});
            }
        } else {
            this.remainingBytes = 0L;
            if (log.isInfoEnabled()) {
                log.info("Requested permits {} exceeded maxReadsInFlightSize {}, creationTime: {}, remainingBytes:{}. Allowing request with permits set to maxReadsInFlightSize.", new Object[]{Long.valueOf(j), Long.valueOf(this.maxReadsInFlightSize), Long.valueOf(queuedHandle.handle.creationTime), Long.valueOf(this.remainingBytes)});
            }
            handle = new Handle(this.maxReadsInFlightSize, queuedHandle.handle.creationTime, true);
        }
        try {
            queuedHandle.callback.accept(handle);
        } catch (Exception e) {
            log.error("Error in callback of acquired queued permits: {}, creationTime: {}, remainingBytes:{}", new Object[]{Long.valueOf(handle.permits), Long.valueOf(handle.creationTime), Long.valueOf(this.remainingBytes), e});
        }
    }

    private synchronized void updateMetrics() {
        PULSAR_ML_READS_BUFFER_SIZE.set(this.maxReadsInFlightSize - this.remainingBytes);
        PULSAR_ML_READS_AVAILABLE_BUFFER_SIZE.set(this.remainingBytes);
    }

    public boolean isDisabled() {
        return !this.enabled;
    }
}
