package org.apache.kafka.queue;

import io.confluent.kafka.util.OpenTelemetryManager;
import io.opentelemetry.context.Context;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.TreeMap;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.queue.EventQueue;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/kafka/queue/KafkaEventQueue.class */
public final class KafkaEventQueue implements EventQueue {
    public static final String EVENT_HANDLER_THREAD_SUFFIX = "event-handler";
    private final Time time;
    private final EventQueue.Event cleanupEvent;
    private final ReentrantLock lock;
    private final Logger log;
    private final EventHandler eventHandler;
    private final Thread eventHandlerThread;
    private boolean shuttingDown;
    private boolean interrupted;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/queue/KafkaEventQueue$EventContext.class */
    public static class EventContext {
        private final EventQueue.Event event;
        private final EventQueue.EventInsertionType insertionType;
        private EventContext prev = this;
        private EventContext next = this;
        private OptionalLong deadlineNs = OptionalLong.empty();
        private String tag;
        private Context context;

        EventContext(EventQueue.Event event, EventQueue.EventInsertionType eventInsertionType, String str, Context context) {
            this.event = event;
            this.insertionType = eventInsertionType;
            this.tag = str;
            this.context = context;
        }

        void insertAfter(EventContext eventContext) {
            this.next.prev = eventContext;
            eventContext.next = this.next;
            eventContext.prev = this;
            this.next = eventContext;
        }

        void insertBefore(EventContext eventContext) {
            this.prev.next = eventContext;
            eventContext.prev = this.prev;
            eventContext.next = this;
            this.prev = eventContext;
        }

        void remove() {
            this.prev.next = this.next;
            this.next.prev = this.prev;
            this.prev = this;
            this.next = this;
        }

        boolean isSingleton() {
            return this.prev == this && this.next == this;
        }

        boolean run(Logger logger, Throwable th) {
            if (th == null) {
                try {
                    OpenTelemetryManager.trace(OpenTelemetryManager.createSpanIfParentsExist(this.event.name(), this.context), () -> {
                        this.event.run();
                        return null;
                    });
                } catch (InterruptedException e) {
                    logger.warn("Interrupted while running event. Shutting down event queue");
                    return true;
                } catch (Throwable th2) {
                    logger.debug("Got exception while running {}. Invoking handleException.", this.event, th2);
                    th = th2;
                }
            }
            if (th != null) {
                completeWithException(logger, th);
            }
            return Thread.currentThread().isInterrupted();
        }

        void completeWithException(Logger logger, Throwable th) {
            try {
                this.event.handleException(th);
            } catch (Exception e) {
                logger.error("Unexpected exception in handleException", (Throwable) e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/queue/KafkaEventQueue$EventHandler.class */
    public class EventHandler implements Runnable {
        private int size;
        private final Map<String, EventContext> tagToEventContext;
        private final EventContext head;
        private final TreeMap<Long, EventContext> deadlineMap;
        private final Condition cond;

        private EventHandler() {
            this.size = 0;
            this.tagToEventContext = new HashMap();
            this.head = new EventContext(null, null, null, null);
            this.deadlineMap = new TreeMap<>();
            this.cond = KafkaEventQueue.this.lock.newCondition();
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                handleEvents();
            } catch (Throwable th) {
                KafkaEventQueue.this.log.warn("event handler thread exiting with exception", th);
            }
            try {
                KafkaEventQueue.this.cleanupEvent.run();
            } catch (Throwable th2) {
                KafkaEventQueue.this.log.warn("cleanup event threw exception", th2);
            }
        }

        private void remove(EventContext eventContext) {
            eventContext.remove();
            if (eventContext.deadlineNs.isPresent()) {
                this.deadlineMap.remove(Long.valueOf(eventContext.deadlineNs.getAsLong()));
                eventContext.deadlineNs = OptionalLong.empty();
            }
            if (eventContext.tag != null) {
                this.tagToEventContext.remove(eventContext.tag, eventContext);
                eventContext.tag = null;
            }
        }

        private void handleEvents() {
            Throwable th = null;
            EventContext eventContext = null;
            boolean z = false;
            while (true) {
                if (eventContext != null) {
                    z = eventContext.run(KafkaEventQueue.this.log, th);
                }
                KafkaEventQueue.this.lock.lock();
                if (eventContext != null) {
                    try {
                        this.size--;
                        if (z) {
                            KafkaEventQueue.this.interrupted = z;
                        }
                        th = null;
                        eventContext = null;
                        z = false;
                    } finally {
                    }
                }
                long j = Long.MAX_VALUE;
                Map.Entry<Long, EventContext> firstEntry = this.deadlineMap.firstEntry();
                if (firstEntry != null) {
                    long nanoseconds = KafkaEventQueue.this.time.nanoseconds();
                    long longValue = firstEntry.getKey().longValue();
                    EventContext value = firstEntry.getValue();
                    if (longValue <= nanoseconds) {
                        if (value.insertionType == EventQueue.EventInsertionType.DEFERRED) {
                            remove(value);
                            th = null;
                            eventContext = value;
                        } else {
                            remove(value);
                            th = new TimeoutException();
                            eventContext = value;
                        }
                        KafkaEventQueue.this.lock.unlock();
                    } else if (KafkaEventQueue.this.interrupted) {
                        remove(value);
                        th = new InterruptedException("The event handler thread is interrupted");
                        eventContext = value;
                        KafkaEventQueue.this.lock.unlock();
                    } else if (KafkaEventQueue.this.shuttingDown) {
                        remove(value);
                        th = new RejectedExecutionException("The event queue is shutting down");
                        eventContext = value;
                        KafkaEventQueue.this.lock.unlock();
                    } else {
                        j = longValue - nanoseconds;
                    }
                }
                if (this.head.next != this.head) {
                    th = KafkaEventQueue.this.interrupted ? new InterruptedException("The event handler thread is interrupted") : null;
                    eventContext = this.head.next;
                    remove(eventContext);
                    KafkaEventQueue.this.lock.unlock();
                } else if (!this.deadlineMap.isEmpty() || (!KafkaEventQueue.this.shuttingDown && !KafkaEventQueue.this.interrupted)) {
                    if (j == Long.MAX_VALUE) {
                        try {
                            this.cond.await();
                        } catch (InterruptedException e) {
                            KafkaEventQueue.this.log.warn("Interrupted while waiting for a new event. Shutting down event queue");
                            KafkaEventQueue.this.interrupted = true;
                        }
                        KafkaEventQueue.this.lock.unlock();
                    } else {
                        try {
                            this.cond.awaitNanos(j);
                        } catch (InterruptedException e2) {
                            KafkaEventQueue.this.log.warn("Interrupted while waiting for a deferred event. Shutting down event queue");
                            KafkaEventQueue.this.interrupted = true;
                        }
                        KafkaEventQueue.this.lock.unlock();
                    }
                    KafkaEventQueue.this.lock.unlock();
                }
            }
        }

        Exception enqueue(EventContext eventContext, Function<OptionalLong, OptionalLong> function) {
            EventContext put;
            KafkaEventQueue.this.lock.lock();
            try {
                if (KafkaEventQueue.this.shuttingDown) {
                    RejectedExecutionException rejectedExecutionException = new RejectedExecutionException("The event queue is shutting down");
                    KafkaEventQueue.this.lock.unlock();
                    return rejectedExecutionException;
                }
                if (KafkaEventQueue.this.interrupted) {
                    InterruptedException interruptedException = new InterruptedException("The event handler thread is interrupted");
                    KafkaEventQueue.this.lock.unlock();
                    return interruptedException;
                }
                OptionalLong empty = OptionalLong.empty();
                if (eventContext.tag != null && (put = this.tagToEventContext.put(eventContext.tag, eventContext)) != null) {
                    empty = put.deadlineNs;
                    remove(put);
                    this.size--;
                }
                OptionalLong apply = function.apply(empty);
                boolean isSingleton = this.head.isSingleton();
                boolean z = false;
                switch (eventContext.insertionType) {
                    case APPEND:
                        this.head.insertBefore(eventContext);
                        if (isSingleton) {
                            z = true;
                            break;
                        }
                        break;
                    case PREPEND:
                        this.head.insertAfter(eventContext);
                        if (isSingleton) {
                            z = true;
                            break;
                        }
                        break;
                    case DEFERRED:
                        if (!apply.isPresent()) {
                            RuntimeException runtimeException = new RuntimeException("You must specify a deadline for deferred events.");
                            KafkaEventQueue.this.lock.unlock();
                            return runtimeException;
                        }
                        break;
                }
                if (apply.isPresent()) {
                    long asLong = apply.getAsLong();
                    long longValue = this.deadlineMap.isEmpty() ? Long.MAX_VALUE : this.deadlineMap.firstKey().longValue();
                    while (this.deadlineMap.putIfAbsent(Long.valueOf(asLong), eventContext) != null) {
                        asLong++;
                    }
                    eventContext.deadlineNs = OptionalLong.of(asLong);
                    if (asLong <= longValue) {
                        z = true;
                    }
                }
                this.size++;
                if (z) {
                    this.cond.signal();
                }
                return null;
            } finally {
                KafkaEventQueue.this.lock.unlock();
            }
        }

        void cancelDeferred(String str) {
            KafkaEventQueue.this.lock.lock();
            try {
                EventContext eventContext = this.tagToEventContext.get(str);
                if (eventContext != null) {
                    remove(eventContext);
                    this.size--;
                }
            } finally {
                KafkaEventQueue.this.lock.unlock();
            }
        }

        void wakeUp() {
            KafkaEventQueue.this.lock.lock();
            try {
                KafkaEventQueue.this.eventHandler.cond.signal();
            } finally {
                KafkaEventQueue.this.lock.unlock();
            }
        }

        int size() {
            KafkaEventQueue.this.lock.lock();
            try {
                return this.size;
            } finally {
                KafkaEventQueue.this.lock.unlock();
            }
        }
    }

    public KafkaEventQueue(Time time, LogContext logContext, String str) {
        this(time, logContext, str, EventQueue.VoidEvent::new);
    }

    public KafkaEventQueue(Time time, LogContext logContext, String str, EventQueue.Event event) {
        this.time = time;
        this.cleanupEvent = (EventQueue.Event) Objects.requireNonNull(event);
        this.lock = new ReentrantLock();
        this.log = logContext.logger(KafkaEventQueue.class);
        this.eventHandler = new EventHandler();
        this.eventHandlerThread = new KafkaThread(str + EVENT_HANDLER_THREAD_SUFFIX, this.eventHandler, false);
        this.shuttingDown = false;
        this.interrupted = false;
        this.eventHandlerThread.start();
    }

    public Time time() {
        return this.time;
    }

    @Override // org.apache.kafka.queue.EventQueue
    public void enqueue(EventQueue.EventInsertionType eventInsertionType, String str, Function<OptionalLong, OptionalLong> function, EventQueue.Event event) {
        EventContext eventContext = new EventContext(event, eventInsertionType, str, Context.current());
        Exception enqueue = this.eventHandler.enqueue(eventContext, function);
        if (enqueue != null) {
            eventContext.completeWithException(this.log, enqueue);
        }
    }

    @Override // org.apache.kafka.queue.EventQueue
    public void cancelDeferred(String str) {
        this.eventHandler.cancelDeferred(str);
    }

    @Override // org.apache.kafka.queue.EventQueue
    public void beginShutdown(String str) {
        this.lock.lock();
        try {
            if (this.shuttingDown) {
                this.log.debug("{}: Event queue is already shutting down.", str);
                return;
            }
            this.log.info("{}: shutting down event queue.", str);
            this.shuttingDown = true;
            this.eventHandler.cond.signal();
        } finally {
            this.lock.unlock();
        }
    }

    @Override // org.apache.kafka.queue.EventQueue
    public int size() {
        return this.eventHandler.size();
    }

    @Override // org.apache.kafka.queue.EventQueue
    public void wakeup() {
        this.eventHandler.wakeUp();
    }

    @Override // org.apache.kafka.queue.EventQueue, java.lang.AutoCloseable
    public void close() throws InterruptedException {
        beginShutdown("KafkaEventQueue#close");
        this.eventHandlerThread.join();
        this.log.info("closed event queue.");
    }

    public Optional<EventQueue.Event> firstDeferredIfIdling() {
        this.lock.lock();
        try {
            if (this.eventHandler.head.next != this.eventHandler.head) {
                Optional<EventQueue.Event> empty = Optional.empty();
                this.lock.unlock();
                return empty;
            }
            Map.Entry firstEntry = this.eventHandler.deadlineMap.firstEntry();
            if (firstEntry == null) {
                Optional<EventQueue.Event> empty2 = Optional.empty();
                this.lock.unlock();
                return empty2;
            }
            EventContext eventContext = (EventContext) firstEntry.getValue();
            if (eventContext.insertionType != EventQueue.EventInsertionType.DEFERRED) {
                Optional<EventQueue.Event> empty3 = Optional.empty();
                this.lock.unlock();
                return empty3;
            }
            Optional<EventQueue.Event> of = Optional.of(eventContext.event);
            this.lock.unlock();
            return of;
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }
}
