package io.github.panghy.javaflow.scheduler;

import io.github.panghy.javaflow.core.FlowFuture;
import io.github.panghy.javaflow.core.FlowPromise;
import io.github.panghy.javaflow.scheduler.Task;
import io.github.panghy.javaflow.util.LoggingUtil;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Logger;
import jdk.internal.vm.Continuation;
import jdk.internal.vm.ContinuationScope;

/* loaded from: input_file:io/github/panghy/javaflow/scheduler/SingleThreadedScheduler.class */
public class SingleThreadedScheduler implements AutoCloseable {
    private static final Logger LOGGER = Logger.getLogger(SingleThreadedScheduler.class.getName());
    private final AtomicLong taskIdCounter;
    private final AtomicLong timerIdCounter;
    private final Comparator<Task> effectivePriorityComparator;
    private final TreeSet<Task> readyTasks;
    private final NavigableMap<Long, List<TimerTask>> timerTasks;
    private final Map<Long, TimerTask> timerIdToTask;
    private final ReentrantLock taskLock;
    private final Condition tasksAvailableCondition;
    private final Map<Long, Continuation> taskToContinuation;
    private final Map<Long, Task> idToTask;
    private Thread schedulerThread;
    private final AtomicBoolean running;
    private final AtomicBoolean draining;
    private final AtomicReference<CountDownLatch> schedulerExitLatch;
    private final Map<Long, ContinuationScope> taskToScope;
    private final boolean enableCarrierThread;
    private final Map<Long, FlowPromise<Void>> yieldPromises;
    private final FlowClock clock;
    private final long priorityAgingIntervalMs;
    private final int priorityAgingBoost;
    private long lastPriorityUpdateTime;
    private static final long PRIORITY_UPDATE_INTERVAL_MS = 100;

    TreeSet<Task> getReadyTasks() {
        return this.readyTasks;
    }

    public SingleThreadedScheduler() {
        this(true);
    }

    public SingleThreadedScheduler(boolean z) {
        this(z, FlowClock.createRealClock());
    }

    public SingleThreadedScheduler(boolean z, FlowClock flowClock) {
        this.taskIdCounter = new AtomicLong(0L);
        this.timerIdCounter = new AtomicLong(0L);
        this.effectivePriorityComparator = Comparator.comparingInt((v0) -> {
            return v0.getEffectivePriority();
        }).thenComparingLong((v0) -> {
            return v0.getCreationTime();
        }).thenComparingLong((v0) -> {
            return v0.getSequence();
        });
        this.readyTasks = new TreeSet<>(this.effectivePriorityComparator);
        this.timerTasks = new TreeMap();
        this.timerIdToTask = new ConcurrentHashMap();
        this.taskLock = new ReentrantLock();
        this.tasksAvailableCondition = this.taskLock.newCondition();
        this.taskToContinuation = new ConcurrentHashMap();
        this.idToTask = new ConcurrentHashMap();
        this.running = new AtomicBoolean(false);
        this.draining = new AtomicBoolean(false);
        this.schedulerExitLatch = new AtomicReference<>(new CountDownLatch(1));
        this.taskToScope = new HashMap();
        this.yieldPromises = new HashMap();
        this.lastPriorityUpdateTime = 0L;
        this.enableCarrierThread = z;
        this.clock = flowClock;
        this.priorityAgingIntervalMs = PRIORITY_UPDATE_INTERVAL_MS;
        this.priorityAgingBoost = 1;
    }

    public FlowClock getClock() {
        return this.clock;
    }

    private void updateEffectivePriorities() {
        long currentTimeMillis = this.clock.currentTimeMillis();
        this.taskLock.lock();
        try {
            for (Task task : new ArrayList(this.readyTasks)) {
                long lastPriorityBoostTime = currentTimeMillis - task.getLastPriorityBoostTime();
                if (lastPriorityBoostTime >= this.priorityAgingIntervalMs) {
                    int max = Math.max(-1, task.getOriginalPriority() - ((int) ((lastPriorityBoostTime / this.priorityAgingIntervalMs) * this.priorityAgingBoost)));
                    if (max != task.getEffectivePriority()) {
                        this.readyTasks.remove(task);
                        task.setEffectivePriority(max);
                        this.readyTasks.add(task);
                    }
                }
            }
        } finally {
            this.taskLock.unlock();
        }
    }

    private void resetSchedulerExitLatch() {
        this.schedulerExitLatch.set(new CountDownLatch(1));
        LoggingUtil.debug(LOGGER, "Reset scheduler exit latch");
    }

    public synchronized void start() {
        if (this.running.compareAndSet(false, true)) {
            LoggingUtil.info(LOGGER, "Starting flow scheduler");
            this.draining.set(false);
            resetSchedulerExitLatch();
            if (this.enableCarrierThread) {
                this.schedulerThread = Thread.ofPlatform().name("flow-scheduler").daemon(true).start(this::schedulerLoop);
            } else {
                LoggingUtil.debug(LOGGER, "Carrier thread disabled, tasks will only execute via pump()");
            }
        }
    }

    public <T> FlowFuture<T> schedule(Callable<T> callable) {
        return schedule(callable, 30);
    }

    public <T> FlowFuture<T> schedule(Callable<T> callable, int i) {
        if (this.draining.get() || !this.running.get()) {
            FlowFuture<T> flowFuture = new FlowFuture<>();
            flowFuture.getPromise().completeExceptionally(new IllegalStateException("Cannot schedule new tasks while scheduler is shutting down"));
            return flowFuture;
        }
        long incrementAndGet = this.taskIdCounter.incrementAndGet();
        FlowFuture<T> flowFuture2 = new FlowFuture<>();
        FlowPromise<T> promise = flowFuture2.getPromise();
        Callable callable2 = () -> {
            try {
                Object call = callable.call();
                promise.complete(call);
                return call;
            } catch (Throwable th) {
                promise.completeExceptionally(th);
                throw th;
            }
        };
        Task task = FlowScheduler.CURRENT_TASK.get();
        Task task2 = new Task(incrementAndGet, i, callable2, task);
        task2.setEffectivePriority(i);
        task2.setCancellationCallback(collection -> {
            cancelTask(incrementAndGet);
            collection.forEach((v1) -> {
                cancelTimer(v1);
            });
        });
        if (task != null) {
            task.addChild(task2);
        }
        promise.whenComplete((obj, th) -> {
            if (th instanceof CancellationException) {
                task2.cancel();
            }
        });
        this.idToTask.put(Long.valueOf(incrementAndGet), task2);
        LoggingUtil.debug(LOGGER, "Scheduling task " + incrementAndGet);
        this.taskLock.lock();
        try {
            this.readyTasks.add(task2);
            this.tasksAvailableCondition.signalAll();
            this.taskLock.unlock();
            return flowFuture2;
        } catch (Throwable th2) {
            this.taskLock.unlock();
            throw th2;
        }
    }

    public FlowFuture<Void> scheduleDelay(double d) {
        return scheduleDelay(d, 40);
    }

    public FlowFuture<Void> scheduleDelay(double d, int i) {
        if (this.draining.get() || !this.running.get()) {
            FlowFuture<Void> flowFuture = new FlowFuture<>();
            flowFuture.getPromise().completeExceptionally(new CancellationException("Cannot schedule timers while scheduler is shutting down"));
            return flowFuture;
        }
        if (d < 0.0d) {
            throw new IllegalArgumentException("Delay cannot be negative");
        }
        if (!FlowScheduler.isInFlowContext()) {
            throw new IllegalStateException("scheduleDelay called outside of a flow task");
        }
        Task task = FlowScheduler.CURRENT_TASK.get();
        if (task == null) {
            throw new IllegalStateException("scheduleDelay called for unknown task");
        }
        FlowFuture<Void> flowFuture2 = new FlowFuture<>();
        scheduleTimerTask((long) (d * 1000.0d), flowFuture2.getPromise(), i, task);
        return flowFuture2;
    }

    private void scheduleTimerTask(long j, FlowPromise<Void> flowPromise, int i, Task task) {
        long incrementAndGet = this.timerIdCounter.incrementAndGet();
        long currentTimeMillis = this.clock.currentTimeMillis() + j;
        TimerTask timerTask = new TimerTask(incrementAndGet, currentTimeMillis, () -> {
        }, i, flowPromise, task);
        this.taskLock.lock();
        if (task != null) {
            try {
                if (task.isCancelled()) {
                    flowPromise.completeExceptionally(new CancellationException("Parent task is already cancelled"));
                    this.taskLock.unlock();
                    return;
                }
            } catch (Throwable th) {
                this.taskLock.unlock();
                throw th;
            }
        }
        ((List) this.timerTasks.computeIfAbsent(Long.valueOf(currentTimeMillis), l -> {
            return new ArrayList();
        })).add(timerTask);
        this.timerIdToTask.put(Long.valueOf(incrementAndGet), timerTask);
        if (task != null) {
            task.registerTimerTask(incrementAndGet);
        }
        this.tasksAvailableCondition.signalAll();
        this.taskLock.unlock();
    }

    public void cancelTimer(long j) {
        this.taskLock.lock();
        try {
            TimerTask remove = this.timerIdToTask.remove(Long.valueOf(j));
            if (remove != null) {
                List list = (List) this.timerTasks.get(Long.valueOf(remove.getScheduledTimeMillis()));
                if (list != null) {
                    list.remove(remove);
                    if (list.isEmpty()) {
                        this.timerTasks.remove(Long.valueOf(remove.getScheduledTimeMillis()));
                    }
                }
                Task parentTask = remove.getParentTask();
                if (parentTask != null) {
                    parentTask.unregisterTimerTask(j);
                }
                remove.getPromise().completeExceptionally(new CancellationException("Timer cancelled"));
            }
        } finally {
            this.taskLock.unlock();
        }
    }

    private int processTimerTasks() {
        Map.Entry<Long, List<TimerTask>> firstEntry;
        int i = 0;
        long currentTimeMillis = this.clock.currentTimeMillis();
        this.taskLock.lock();
        while (!this.timerTasks.isEmpty() && (firstEntry = this.timerTasks.firstEntry()) != null && firstEntry.getKey().longValue() <= currentTimeMillis) {
            try {
                Iterator it = new ArrayList(firstEntry.getValue()).iterator();
                while (it.hasNext()) {
                    TimerTask timerTask = (TimerTask) it.next();
                    this.timerIdToTask.remove(Long.valueOf(timerTask.getId()));
                    timerTask.getParentTask().unregisterTimerTask(timerTask.getId());
                    timerTask.execute();
                    i++;
                }
                this.timerTasks.remove(firstEntry.getKey());
            } finally {
                this.taskLock.unlock();
            }
        }
        return i;
    }

    public long getNextTimerTime() {
        this.taskLock.lock();
        try {
            if (this.timerTasks.isEmpty()) {
                return Long.MAX_VALUE;
            }
            return this.timerTasks.firstKey().longValue();
        } finally {
            this.taskLock.unlock();
        }
    }

    public FlowFuture<Void> yield() {
        return yield(null);
    }

    public FlowFuture<Void> yield(Integer num) {
        if (this.draining.get() || !this.running.get()) {
            FlowFuture<Void> flowFuture = new FlowFuture<>();
            flowFuture.getPromise().completeExceptionally(new CancellationException("Cannot yield while scheduler is shutting down"));
            return flowFuture;
        }
        if (!FlowScheduler.isInFlowContext()) {
            throw new IllegalStateException("yield called outside of a flow task");
        }
        Task task = FlowScheduler.CURRENT_TASK.get();
        if (task == null) {
            throw new IllegalStateException("yield called for unknown task");
        }
        FlowFuture<Void> flowFuture2 = new FlowFuture<>();
        FlowPromise<Void> promise = flowFuture2.getPromise();
        this.yieldPromises.put(Long.valueOf(task.getId()), promise);
        schedule(() -> {
            promise.complete(null);
            return null;
        }, num == null ? task.getOriginalPriority() : num.intValue());
        return flowFuture2;
    }

    private void resumeTask(long j) {
        Continuation continuation = this.taskToContinuation.get(Long.valueOf(j));
        Task task = this.idToTask.get(Long.valueOf(j));
        if (continuation == null || task == null) {
            return;
        }
        LoggingUtil.debug(LOGGER, "Resuming task " + j);
        task.setState(Task.TaskState.RUNNING);
        task.resetPriorityBoostTime(this.clock.currentTimeMillis());
        task.setEffectivePriority(task.getOriginalPriority());
        FlowPromise<Void> remove = this.yieldPromises.remove(Long.valueOf(j));
        if (remove != null) {
            remove.complete(null);
        }
        try {
            FlowScheduler.CURRENT_TASK.set(task);
            continuation.run();
            FlowScheduler.CURRENT_TASK.remove();
            if (continuation.isDone()) {
                LoggingUtil.debug(LOGGER, "Task " + j + " completed");
                task.setState(Task.TaskState.COMPLETED);
                this.taskToContinuation.remove(Long.valueOf(j));
                this.taskToScope.remove(Long.valueOf(j));
                this.idToTask.remove(Long.valueOf(j));
            }
        } catch (Throwable th) {
            FlowScheduler.CURRENT_TASK.remove();
            throw th;
        }
    }

    private void schedulerLoop() {
        while (this.running.get()) {
            try {
                try {
                    try {
                        this.taskLock.lock();
                        long currentTimeMillis = this.clock.currentTimeMillis();
                        if (currentTimeMillis - this.lastPriorityUpdateTime > PRIORITY_UPDATE_INTERVAL_MS) {
                            updateEffectivePriorities();
                            this.lastPriorityUpdateTime = currentTimeMillis;
                        }
                        processTimerTasks();
                    } catch (Throwable th) {
                        if (0 != 0) {
                            this.taskLock.unlock();
                        }
                        throw th;
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    LoggingUtil.warn(LOGGER, "Scheduler thread interrupted", e);
                    if (0 != 0) {
                        this.taskLock.unlock();
                    }
                } catch (Exception e2) {
                    LoggingUtil.error(LOGGER, "Error in scheduler", e2);
                    if (0 != 0) {
                        this.taskLock.unlock();
                    }
                }
                if (this.draining.get() && this.readyTasks.isEmpty() && this.idToTask.isEmpty()) {
                    LoggingUtil.info(LOGGER, "All tasks processed, exiting drain mode");
                    this.taskLock.unlock();
                    if (0 != 0) {
                        this.taskLock.unlock();
                    }
                    LoggingUtil.info(LOGGER, "Scheduler loop ending");
                    CountDownLatch countDownLatch = this.schedulerExitLatch.get();
                    if (countDownLatch != null) {
                        countDownLatch.countDown();
                        return;
                    }
                    return;
                }
                while (this.readyTasks.isEmpty()) {
                    if (this.draining.get() && this.idToTask.isEmpty()) {
                        LoggingUtil.info(LOGGER, "All tasks processed, exiting drain mode");
                        this.taskLock.unlock();
                        if (0 != 0) {
                            this.taskLock.unlock();
                        }
                        LoggingUtil.info(LOGGER, "Scheduler loop ending");
                        CountDownLatch countDownLatch2 = this.schedulerExitLatch.get();
                        if (countDownLatch2 != null) {
                            countDownLatch2.countDown();
                            return;
                        }
                        return;
                    }
                    long nextTimerTime = getNextTimerTime();
                    long currentTimeMillis2 = this.clock.currentTimeMillis();
                    if (nextTimerTime != Long.MAX_VALUE && nextTimerTime <= currentTimeMillis2) {
                        processTimerTasks();
                        if (!this.readyTasks.isEmpty()) {
                            break;
                        } else {
                            nextTimerTime = getNextTimerTime();
                        }
                    }
                    if (nextTimerTime != Long.MAX_VALUE) {
                        long j = nextTimerTime - currentTimeMillis2;
                        if (j > 0) {
                            this.tasksAvailableCondition.await(j, TimeUnit.MILLISECONDS);
                        }
                    } else {
                        this.tasksAvailableCondition.await();
                    }
                    if (!this.running.get()) {
                        LoggingUtil.warn(LOGGER, "Scheduler loop stopping");
                        this.taskLock.unlock();
                        if (0 != 0) {
                            this.taskLock.unlock();
                        }
                        LoggingUtil.info(LOGGER, "Scheduler loop ending");
                        CountDownLatch countDownLatch3 = this.schedulerExitLatch.get();
                        if (countDownLatch3 != null) {
                            countDownLatch3.countDown();
                            return;
                        }
                        return;
                    }
                    processTimerTasks();
                    if (!this.readyTasks.isEmpty()) {
                        break;
                    }
                }
                Task task = (Task) this.readyTasks.removeFirst();
                this.taskLock.unlock();
                if (task != null) {
                    LoggingUtil.debug(LOGGER, "Picking up task " + task.getId());
                    if (this.taskToContinuation.containsKey(Long.valueOf(task.getId()))) {
                        resumeTask(task.getId());
                    } else {
                        startTask(task);
                    }
                }
                if (0 != 0) {
                    this.taskLock.unlock();
                }
            } catch (Throwable th2) {
                LoggingUtil.info(LOGGER, "Scheduler loop ending");
                CountDownLatch countDownLatch4 = this.schedulerExitLatch.get();
                if (countDownLatch4 != null) {
                    countDownLatch4.countDown();
                }
                throw th2;
            }
        }
        LoggingUtil.info(LOGGER, "Scheduler loop ending");
        CountDownLatch countDownLatch5 = this.schedulerExitLatch.get();
        if (countDownLatch5 != null) {
            countDownLatch5.countDown();
        }
    }

    private void startTask(Task task) {
        LoggingUtil.debug(LOGGER, "Task " + task.getId() + " starting");
        task.setState(Task.TaskState.RUNNING);
        task.resetPriorityBoostTime(this.clock.currentTimeMillis());
        task.setEffectivePriority(task.getOriginalPriority());
        ContinuationScope continuationScope = new ContinuationScope("flow-task-" + task.getId());
        this.taskToScope.put(Long.valueOf(task.getId()), continuationScope);
        Continuation continuation = new Continuation(continuationScope, () -> {
            try {
                try {
                    FlowScheduler.CURRENT_TASK.set(task);
                    task.getCallable().call();
                    FlowScheduler.CURRENT_TASK.remove();
                } catch (Exception e) {
                    if (task.isCancelled()) {
                        FlowScheduler.CURRENT_TASK.remove();
                        return;
                    }
                    task.setState(Task.TaskState.FAILED);
                    LoggingUtil.warn(LOGGER, "Task " + task.getId() + " failed: ", e);
                    FlowScheduler.CURRENT_TASK.remove();
                }
            } catch (Throwable th) {
                FlowScheduler.CURRENT_TASK.remove();
                throw th;
            }
        });
        this.taskToContinuation.put(Long.valueOf(task.getId()), continuation);
        continuation.run();
        if (continuation.isDone()) {
            LoggingUtil.debug(LOGGER, "Task " + task.getId() + " completed immediately");
            task.setState(Task.TaskState.COMPLETED);
            this.taskToContinuation.remove(Long.valueOf(task.getId()));
            this.taskToScope.remove(Long.valueOf(task.getId()));
            this.idToTask.remove(Long.valueOf(task.getId()));
        }
    }

    public <T> T await(FlowFuture<T> flowFuture) throws Exception {
        if (this.draining.get()) {
            throw new CancellationException("Cannot await futures while scheduler is shutting down");
        }
        if (!FlowScheduler.isInFlowContext()) {
            throw new IllegalStateException("await called outside of a flow task");
        }
        Task task = FlowScheduler.CURRENT_TASK.get();
        if (task == null) {
            throw new IllegalStateException("missing task");
        }
        ContinuationScope continuationScope = this.taskToScope.get(Long.valueOf(task.getId()));
        if (continuationScope == null) {
            throw new IllegalStateException("missing task scope");
        }
        if (task.getState() != Task.TaskState.RUNNING) {
            throw new IllegalStateException("task is not running");
        }
        LoggingUtil.debug(LOGGER, "Task " + task.getId() + " suspending");
        task.setState(Task.TaskState.SUSPENDED);
        flowFuture.getPromise().whenComplete((obj, th) -> {
            this.taskLock.lock();
            try {
                this.readyTasks.add(task);
                this.tasksAvailableCondition.signalAll();
                this.taskLock.unlock();
            } catch (Throwable th) {
                this.taskLock.unlock();
                throw th;
            }
        });
        Continuation.yield(continuationScope);
        if (task.isCancelled()) {
            throw new CancellationException("task cancelled");
        }
        if (flowFuture.isCompletedExceptionally()) {
            throw new ExecutionException(flowFuture.getException());
        }
        return flowFuture.getNow();
    }

    public int pump() {
        if (this.enableCarrierThread) {
            throw new IllegalStateException("pump() can only be called when carrier thread is disabled");
        }
        int i = 0;
        int processTimerTasks = processTimerTasks();
        this.taskLock.lock();
        try {
            if (this.readyTasks.isEmpty()) {
                return processTimerTasks;
            }
            ArrayList<Task> arrayList = new ArrayList(this.readyTasks);
            this.readyTasks.clear();
            for (Task task : arrayList) {
                this.taskLock.unlock();
                try {
                    if (this.taskToContinuation.containsKey(Long.valueOf(task.getId()))) {
                        resumeTask(task.getId());
                    } else {
                        startTask(task);
                    }
                    i++;
                    this.taskLock.lock();
                } catch (Throwable th) {
                    this.taskLock.lock();
                    throw th;
                }
            }
            int i2 = i + processTimerTasks;
            if (this.taskLock.isHeldByCurrentThread()) {
                this.taskLock.unlock();
            }
            return i2;
        } finally {
            if (this.taskLock.isHeldByCurrentThread()) {
                this.taskLock.unlock();
            }
        }
    }

    private void cancelTask(long j) {
        LoggingUtil.debug(LOGGER, "Cancelling task " + j);
        this.taskLock.lock();
        try {
            if (this.idToTask.get(Long.valueOf(j)) == null) {
                return;
            }
            FlowPromise<Void> remove = this.yieldPromises.remove(Long.valueOf(j));
            if (remove != null) {
                remove.completeExceptionally(new CancellationException());
            }
            this.taskLock.unlock();
        } finally {
            this.taskLock.unlock();
        }
    }

    public int advanceTime(long j) {
        if (!this.clock.isSimulated()) {
            throw new IllegalStateException("advanceTime can only be called with a simulated clock");
        }
        SimulatedClock simulatedClock = (SimulatedClock) this.clock;
        int processTimerTasks = processTimerTasks();
        simulatedClock.advanceTime(j);
        int processTimerTasks2 = processTimerTasks + processTimerTasks();
        for (int i = 0; i < 3; i++) {
            int pump = pump();
            processTimerTasks2 += pump;
            if (pump == 0) {
                break;
            }
            processTimerTasks2 += processTimerTasks();
        }
        return processTimerTasks2;
    }

    public Set<Task> getActiveTasks() {
        HashSet hashSet = new HashSet();
        this.taskLock.lock();
        try {
            hashSet.addAll(this.readyTasks);
            hashSet.addAll(this.idToTask.values());
            return hashSet;
        } finally {
            this.taskLock.unlock();
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        drain();
        LoggingUtil.info(LOGGER, "Shutting down scheduler");
        if (this.schedulerThread != null) {
            try {
                LoggingUtil.debug(LOGGER, "Waiting for scheduler loop to exit...");
                CountDownLatch countDownLatch = this.schedulerExitLatch.get();
                if (countDownLatch != null && countDownLatch.await(5L, TimeUnit.SECONDS)) {
                    LoggingUtil.debug(LOGGER, "Scheduler loop exited successfully");
                } else {
                    LoggingUtil.warn(LOGGER, "Scheduler loop did not exit within timeout period");
                    if (this.schedulerThread.isAlive()) {
                        LoggingUtil.warn(LOGGER, "Scheduler thread is still alive, attempting to join it...");
                        try {
                            this.schedulerThread.join(2000L);
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                            LoggingUtil.warn(LOGGER, "Interrupted while joining scheduler thread", e);
                        }
                        if (this.schedulerThread.isAlive()) {
                            LoggingUtil.warn(LOGGER, "Scheduler thread is still alive after join attempt.");
                        } else {
                            LoggingUtil.debug(LOGGER, "Scheduler thread terminated after join.");
                        }
                    }
                }
            } catch (InterruptedException e2) {
                Thread.currentThread().interrupt();
                LoggingUtil.warn(LOGGER, "Interrupted while waiting for scheduler loop to exit", e2);
            }
        }
        this.taskToContinuation.clear();
        this.taskToScope.clear();
        this.idToTask.clear();
        this.yieldPromises.clear();
        this.timerTasks.clear();
        this.timerIdToTask.clear();
        this.running.set(false);
    }

    void drain() {
        if (this.draining.compareAndSet(false, true)) {
            LoggingUtil.info(LOGGER, "Putting scheduler in drain mode");
            this.taskLock.lock();
            try {
                for (Map.Entry<Long, FlowPromise<Void>> entry : this.yieldPromises.entrySet()) {
                    LoggingUtil.debug(LOGGER, "Cancelling yield promise for task " + String.valueOf(entry.getKey()) + " due to scheduler drain");
                    entry.getValue().completeExceptionally(new CancellationException("Scheduler is draining while waiting for yield"));
                }
                for (Map.Entry<Long, TimerTask> entry2 : this.timerIdToTask.entrySet()) {
                    LoggingUtil.debug(LOGGER, "Cancelling timer task " + String.valueOf(entry2.getKey()) + " due to scheduler drain");
                    entry2.getValue().getPromise().completeExceptionally(new CancellationException("Scheduler is draining while waiting for timer"));
                }
                for (Task task : this.idToTask.values()) {
                    if (task.getState() == Task.TaskState.SUSPENDED) {
                        LoggingUtil.debug(LOGGER, "Resuming suspended task " + task.getId() + " for cleanup");
                        this.readyTasks.add(task);
                    }
                    task.cancel();
                }
                this.tasksAvailableCondition.signalAll();
                this.taskLock.unlock();
                if (this.enableCarrierThread) {
                    return;
                }
                for (int i = 0; i < 100 && pump() != 0; i++) {
                }
                if (this.idToTask.isEmpty()) {
                    return;
                }
                LoggingUtil.warn(LOGGER, "Some tasks still remain after draining: " + this.idToTask.size());
            } catch (Throwable th) {
                this.taskLock.unlock();
                throw th;
            }
        }
    }
}
