package io.confluent.kafka.concurrent;

import java.util.OptionalInt;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

/* loaded from: input_file:io/confluent/kafka/concurrent/DefaultEventExecutor.class */
public final class DefaultEventExecutor implements EventExecutor {
    private final ScheduledThreadPoolExecutor scheduler;
    private final int pendingTasksCapacity;
    private final AtomicInteger pendingTasksCounter = new AtomicInteger(0);
    private OptionalInt prevQueueSize = OptionalInt.empty();
    private final AtomicReference<CompletableFuture<Void>> shutdownFuture = new AtomicReference<>(null);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/kafka/concurrent/DefaultEventExecutor$Task.class */
    public final class Task<T> implements Runnable {
        private final Callable<T> task;
        private final CompletableFuture<T> future = new CompletableFuture<>();

        Task(Callable<T> callable) {
            this.task = callable;
        }

        CompletableFuture<T> future() {
            return this.future;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                DefaultEventExecutor.this.pendingTasksCounter.decrementAndGet();
                if (!this.future.isDone()) {
                    this.future.complete(this.task.call());
                }
            } catch (Throwable th) {
                this.future.completeExceptionally(th);
            }
        }
    }

    public DefaultEventExecutor(ThreadFactory threadFactory, int i) {
        this.scheduler = new ScheduledThreadPoolExecutor(1, threadFactory);
        this.scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
        this.scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
        this.scheduler.setRemoveOnCancelPolicy(true);
        this.pendingTasksCapacity = i;
    }

    @Override // io.confluent.kafka.concurrent.EventExecutor
    public CompletableFuture<Void> submit(Runnable runnable) {
        return submit(new VoidCallable(runnable));
    }

    @Override // io.confluent.kafka.concurrent.EventExecutor
    public <T> CompletableFuture<T> submit(Callable<T> callable) {
        ensureAccepting();
        Task<?> task = new Task<>(callable);
        Future<?> submit = this.scheduler.submit(task);
        this.pendingTasksCounter.incrementAndGet();
        hookupCancellation(submit, task);
        return (CompletableFuture<T>) task.future();
    }

    @Override // io.confluent.kafka.concurrent.EventExecutor
    public CompletableFuture<Void> schedule(Runnable runnable, long j, TimeUnit timeUnit) {
        return schedule(new VoidCallable(runnable), j, timeUnit);
    }

    @Override // io.confluent.kafka.concurrent.EventExecutor
    public <T> CompletableFuture<T> schedule(Callable<T> callable, long j, TimeUnit timeUnit) {
        ensureAccepting();
        Task<?> task = new Task<>(callable);
        ScheduledFuture<?> schedule = this.scheduler.schedule(task, j, timeUnit);
        this.pendingTasksCounter.incrementAndGet();
        hookupCancellation(schedule, task);
        return (CompletableFuture<T>) task.future();
    }

    @Override // io.confluent.kafka.concurrent.EventExecutor
    public CompletableFuture<Void> shutdown() {
        if (this.shutdownFuture.compareAndSet(null, new CompletableFuture<>())) {
            scheduleShutdown();
        }
        return this.shutdownFuture.get();
    }

    private void scheduleShutdown() {
        this.scheduler.submit(() -> {
            int size = this.scheduler.getQueue().size();
            if (this.prevQueueSize.isPresent() && size == this.prevQueueSize.getAsInt()) {
                this.scheduler.shutdown();
                this.shutdownFuture.get().complete(null);
            } else {
                this.prevQueueSize = OptionalInt.of(size);
                scheduleShutdown();
            }
        });
    }

    private void hookupCancellation(Future<?> future, Task<?> task) {
        task.future().whenComplete((obj, th) -> {
            if ((th instanceof CancellationException) && future.cancel(false)) {
                this.pendingTasksCounter.decrementAndGet();
            }
        });
    }

    private void ensureAccepting() {
        if (this.shutdownFuture.get() != null) {
            throw new RejectedExecutionException("event executor was shutdown");
        }
        if (this.pendingTasksCounter.get() >= this.pendingTasksCapacity) {
            throw new RejectedExecutionException(String.format("pending task capacity reached %s", Integer.valueOf(this.pendingTasksCapacity)));
        }
    }
}
