package org.apache.kafka.clients.consumer.internals;

import com.networknt.rule.RuleConstants;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.SortedSet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.GroupRebalanceConfig;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.GroupProtocol;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.clients.consumer.internals.events.AllTopicsMetadataEvent;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
import org.apache.kafka.clients.consumer.internals.events.AssignmentChangeEvent;
import org.apache.kafka.clients.consumer.internals.events.AsyncCommitEvent;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import org.apache.kafka.clients.consumer.internals.events.CommitEvent;
import org.apache.kafka.clients.consumer.internals.events.CommitOnCloseEvent;
import org.apache.kafka.clients.consumer.internals.events.CompletableEvent;
import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackCompletedEvent;
import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent;
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import org.apache.kafka.clients.consumer.internals.events.EventProcessor;
import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent;
import org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent;
import org.apache.kafka.clients.consumer.internals.events.PollEvent;
import org.apache.kafka.clients.consumer.internals.events.ResetPositionsEvent;
import org.apache.kafka.clients.consumer.internals.events.SubscriptionChangeEvent;
import org.apache.kafka.clients.consumer.internals.events.SyncCommitEvent;
import org.apache.kafka.clients.consumer.internals.events.TopicMetadataEvent;
import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent;
import org.apache.kafka.clients.consumer.internals.events.ValidatePositionsEvent;
import org.apache.kafka.clients.consumer.internals.metrics.KafkaConsumerMetrics;
import org.apache.kafka.clients.consumer.internals.metrics.RebalanceCallbackMetricsManager;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.FencedInstanceIdException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.InvalidGroupIdException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter;
import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender;
import org.apache.kafka.common.telemetry.internals.ClientTelemetryUtils;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.slf4j.Logger;
import org.slf4j.event.Level;

/* loaded from: input_file:org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.class */
public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
    private static final long NO_CURRENT_THREAD = -1;
    private final ApplicationEventHandler applicationEventHandler;
    private final Time time;
    private final AtomicReference<Optional<ConsumerGroupMetadata>> groupMetadata;
    private final KafkaConsumerMetrics kafkaConsumerMetrics;
    private Logger log;
    private final String clientId;
    private final BlockingQueue<BackgroundEvent> backgroundEventQueue;
    private final AsyncKafkaConsumer<K, V>.BackgroundEventProcessor backgroundEventProcessor;
    private final CompletableEventReaper backgroundEventReaper;
    private final Deserializers<K, V> deserializers;
    private final FetchBuffer fetchBuffer;
    private final FetchCollector<K, V> fetchCollector;
    private final ConsumerInterceptors<K, V> interceptors;
    private final IsolationLevel isolationLevel;
    private final SubscriptionState subscriptions;
    private final ConsumerMetadata metadata;
    private int metadataVersionSnapshot;
    private final Metrics metrics;
    private final long retryBackoffMs;
    private final int defaultApiTimeoutMs;
    private final boolean autoCommitEnabled;
    private volatile boolean closed;
    private final Optional<ClientTelemetryReporter> clientTelemetryReporter;
    private boolean cachedSubscriptionHasAllFetchPositions;
    private final WakeupTrigger wakeupTrigger;
    private final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker;
    private final AtomicBoolean asyncCommitFenced;
    private CompletableFuture<Void> lastPendingAsyncCommit;
    private final AtomicLong currentThread;
    private final AtomicInteger refCount;
    private FetchCommittedOffsetsEvent pendingOffsetFetchEvent;

    /* loaded from: input_file:org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer$ApplicationEventHandlerFactory.class */
    interface ApplicationEventHandlerFactory {
        ApplicationEventHandler build(LogContext logContext, Time time, BlockingQueue<ApplicationEvent> blockingQueue, CompletableEventReaper completableEventReaper, Supplier<ApplicationEventProcessor> supplier, Supplier<NetworkClientDelegate> supplier2, Supplier<RequestManagers> supplier3);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer$BackgroundEventProcessor.class */
    public class BackgroundEventProcessor implements EventProcessor<BackgroundEvent> {
        private final ConsumerRebalanceListenerInvoker rebalanceListenerInvoker;

        public BackgroundEventProcessor(ConsumerRebalanceListenerInvoker consumerRebalanceListenerInvoker) {
            this.rebalanceListenerInvoker = consumerRebalanceListenerInvoker;
        }

        @Override // org.apache.kafka.clients.consumer.internals.events.EventProcessor
        public void process(BackgroundEvent backgroundEvent) {
            switch (backgroundEvent.type()) {
                case ERROR:
                    process((ErrorEvent) backgroundEvent);
                    return;
                case CONSUMER_REBALANCE_LISTENER_CALLBACK_NEEDED:
                    process((ConsumerRebalanceListenerCallbackNeededEvent) backgroundEvent);
                    return;
                default:
                    throw new IllegalArgumentException("Background event type " + backgroundEvent.type() + " was not expected");
            }
        }

        private void process(ErrorEvent errorEvent) {
            throw errorEvent.error();
        }

        private void process(ConsumerRebalanceListenerCallbackNeededEvent consumerRebalanceListenerCallbackNeededEvent) {
            ConsumerRebalanceListenerCallbackCompletedEvent invokeRebalanceCallbacks = AsyncKafkaConsumer.invokeRebalanceCallbacks(this.rebalanceListenerInvoker, consumerRebalanceListenerCallbackNeededEvent.methodName(), consumerRebalanceListenerCallbackNeededEvent.partitions(), consumerRebalanceListenerCallbackNeededEvent.future());
            AsyncKafkaConsumer.this.applicationEventHandler.add(invokeRebalanceCallbacks);
            if (invokeRebalanceCallbacks.error().isPresent()) {
                throw invokeRebalanceCallbacks.error().get();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer$CompletableEventReaperFactory.class */
    public interface CompletableEventReaperFactory {
        CompletableEventReaper build(LogContext logContext);
    }

    /* loaded from: input_file:org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer$ConsumerMetadataFactory.class */
    interface ConsumerMetadataFactory {
        ConsumerMetadata build(ConsumerConfig consumerConfig, SubscriptionState subscriptionState, LogContext logContext, ClusterResourceListeners clusterResourceListeners);
    }

    /* loaded from: input_file:org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer$FetchCollectorFactory.class */
    interface FetchCollectorFactory<K, V> {
        FetchCollector<K, V> build(LogContext logContext, ConsumerMetadata consumerMetadata, SubscriptionState subscriptionState, FetchConfig fetchConfig, Deserializers<K, V> deserializers, FetchMetricsManager fetchMetricsManager, Time time);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AsyncKafkaConsumer(ConsumerConfig consumerConfig, Deserializer<K> deserializer, Deserializer<V> deserializer2) {
        this(consumerConfig, deserializer, deserializer2, Time.SYSTEM, ApplicationEventHandler::new, CompletableEventReaper::new, FetchCollector::new, ConsumerMetadata::new, new LinkedBlockingQueue());
    }

    AsyncKafkaConsumer(ConsumerConfig consumerConfig, Deserializer<K> deserializer, Deserializer<V> deserializer2, Time time, ApplicationEventHandlerFactory applicationEventHandlerFactory, CompletableEventReaperFactory completableEventReaperFactory, FetchCollectorFactory<K, V> fetchCollectorFactory, ConsumerMetadataFactory consumerMetadataFactory, LinkedBlockingQueue<BackgroundEvent> linkedBlockingQueue) {
        this.groupMetadata = new AtomicReference<>(Optional.empty());
        this.closed = false;
        this.wakeupTrigger = new WakeupTrigger();
        this.lastPendingAsyncCommit = null;
        this.currentThread = new AtomicLong(-1L);
        this.refCount = new AtomicInteger(0);
        try {
            GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig(consumerConfig, GroupRebalanceConfig.ProtocolType.CONSUMER);
            this.clientId = consumerConfig.getString("client.id");
            this.autoCommitEnabled = consumerConfig.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG).booleanValue();
            LogContext createLogContext = ConsumerUtils.createLogContext(consumerConfig, groupRebalanceConfig);
            this.backgroundEventQueue = linkedBlockingQueue;
            this.log = createLogContext.logger(getClass());
            this.log.debug("Initializing the Kafka consumer");
            this.defaultApiTimeoutMs = consumerConfig.getInt("default.api.timeout.ms").intValue();
            this.time = time;
            List<MetricsReporter> metricsReporters = CommonClientConfigs.metricsReporters(this.clientId, consumerConfig);
            this.clientTelemetryReporter = CommonClientConfigs.telemetryReporter(this.clientId, consumerConfig);
            Optional<ClientTelemetryReporter> optional = this.clientTelemetryReporter;
            metricsReporters.getClass();
            optional.ifPresent((v1) -> {
                r1.add(v1);
            });
            this.metrics = ConsumerUtils.createMetrics(consumerConfig, time, metricsReporters);
            this.retryBackoffMs = consumerConfig.getLong("retry.backoff.ms").longValue();
            List configuredConsumerInterceptors = ConsumerUtils.configuredConsumerInterceptors(consumerConfig);
            this.interceptors = new ConsumerInterceptors<>(configuredConsumerInterceptors);
            this.deserializers = new Deserializers<>(consumerConfig, deserializer, deserializer2);
            this.subscriptions = ConsumerUtils.createSubscriptionState(consumerConfig, createLogContext);
            this.metadata = consumerMetadataFactory.build(consumerConfig, this.subscriptions, createLogContext, ClientUtils.configureClusterResourceListeners(this.metrics.reporters(), configuredConsumerInterceptors, Arrays.asList(this.deserializers.keyDeserializer, this.deserializers.valueDeserializer)));
            this.metadata.bootstrap(ClientUtils.parseAndValidateAddresses(consumerConfig));
            this.metadataVersionSnapshot = this.metadata.updateVersion();
            FetchMetricsManager createFetchMetricsManager = ConsumerUtils.createFetchMetricsManager(this.metrics);
            FetchConfig fetchConfig = new FetchConfig(consumerConfig);
            this.isolationLevel = fetchConfig.isolationLevel;
            ApiVersions apiVersions = new ApiVersions();
            LinkedBlockingQueue linkedBlockingQueue2 = new LinkedBlockingQueue();
            BackgroundEventHandler backgroundEventHandler = new BackgroundEventHandler(linkedBlockingQueue);
            this.fetchBuffer = new FetchBuffer(createLogContext);
            Supplier<NetworkClientDelegate> supplier = NetworkClientDelegate.supplier(time, createLogContext, this.metadata, consumerConfig, apiVersions, this.metrics, createFetchMetricsManager.throttleTimeSensor(), (ClientTelemetrySender) this.clientTelemetryReporter.map((v0) -> {
                return v0.telemetrySender();
            }).orElse(null), backgroundEventHandler);
            this.offsetCommitCallbackInvoker = new OffsetCommitCallbackInvoker(this.interceptors);
            this.asyncCommitFenced = new AtomicBoolean(false);
            this.groupMetadata.set(initializeGroupMetadata(consumerConfig, groupRebalanceConfig));
            Supplier<RequestManagers> supplier2 = RequestManagers.supplier(time, createLogContext, backgroundEventHandler, this.metadata, this.subscriptions, this.fetchBuffer, consumerConfig, groupRebalanceConfig, apiVersions, createFetchMetricsManager, supplier, this.clientTelemetryReporter, this.metrics, this.offsetCommitCallbackInvoker, this::updateGroupMetadata);
            this.applicationEventHandler = applicationEventHandlerFactory.build(createLogContext, time, linkedBlockingQueue2, new CompletableEventReaper(createLogContext), ApplicationEventProcessor.supplier(createLogContext, this.metadata, this.subscriptions, supplier2), supplier, supplier2);
            this.backgroundEventProcessor = new BackgroundEventProcessor(new ConsumerRebalanceListenerInvoker(createLogContext, this.subscriptions, time, new RebalanceCallbackMetricsManager(this.metrics)));
            this.backgroundEventReaper = completableEventReaperFactory.build(createLogContext);
            this.fetchCollector = fetchCollectorFactory.build(createLogContext, this.metadata, this.subscriptions, fetchConfig, this.deserializers, createFetchMetricsManager, time);
            this.kafkaConsumerMetrics = new KafkaConsumerMetrics(this.metrics, "consumer");
            if (this.groupMetadata.get().isPresent() && GroupProtocol.of(consumerConfig.getString(ConsumerConfig.GROUP_PROTOCOL_CONFIG)) == GroupProtocol.CONSUMER) {
                consumerConfig.ignore(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG);
            }
            consumerConfig.logUnused();
            AppInfoParser.registerAppInfo(ConsumerUtils.CONSUMER_JMX_PREFIX, this.clientId, this.metrics, time.milliseconds());
            this.log.debug("Kafka consumer initialized");
        } catch (Throwable th) {
            if (this.log != null) {
                close(Duration.ZERO, true);
            }
            throw new KafkaException("Failed to construct kafka consumer", th);
        }
    }

    AsyncKafkaConsumer(LogContext logContext, String str, Deserializers<K, V> deserializers, FetchBuffer fetchBuffer, FetchCollector<K, V> fetchCollector, ConsumerInterceptors<K, V> consumerInterceptors, Time time, ApplicationEventHandler applicationEventHandler, BlockingQueue<BackgroundEvent> blockingQueue, CompletableEventReaper completableEventReaper, ConsumerRebalanceListenerInvoker consumerRebalanceListenerInvoker, Metrics metrics, SubscriptionState subscriptionState, ConsumerMetadata consumerMetadata, long j, int i, String str2, boolean z) {
        this.groupMetadata = new AtomicReference<>(Optional.empty());
        this.closed = false;
        this.wakeupTrigger = new WakeupTrigger();
        this.lastPendingAsyncCommit = null;
        this.currentThread = new AtomicLong(-1L);
        this.refCount = new AtomicInteger(0);
        this.log = logContext.logger(getClass());
        this.subscriptions = subscriptionState;
        this.clientId = str;
        this.fetchBuffer = fetchBuffer;
        this.fetchCollector = fetchCollector;
        this.isolationLevel = IsolationLevel.READ_UNCOMMITTED;
        this.interceptors = (ConsumerInterceptors) Objects.requireNonNull(consumerInterceptors);
        this.time = time;
        this.backgroundEventQueue = blockingQueue;
        this.backgroundEventProcessor = new BackgroundEventProcessor(consumerRebalanceListenerInvoker);
        this.backgroundEventReaper = completableEventReaper;
        this.metrics = metrics;
        this.groupMetadata.set(initializeGroupMetadata(str2, Optional.empty()));
        this.metadata = consumerMetadata;
        this.metadataVersionSnapshot = consumerMetadata.updateVersion();
        this.retryBackoffMs = j;
        this.defaultApiTimeoutMs = i;
        this.deserializers = deserializers;
        this.applicationEventHandler = applicationEventHandler;
        this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics, "consumer");
        this.clientTelemetryReporter = Optional.empty();
        this.autoCommitEnabled = z;
        this.offsetCommitCallbackInvoker = new OffsetCommitCallbackInvoker(consumerInterceptors);
        this.asyncCommitFenced = new AtomicBoolean(false);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AsyncKafkaConsumer(LogContext logContext, Time time, ConsumerConfig consumerConfig, Deserializer<K> deserializer, Deserializer<V> deserializer2, KafkaClient kafkaClient, SubscriptionState subscriptionState, ConsumerMetadata consumerMetadata) {
        this.groupMetadata = new AtomicReference<>(Optional.empty());
        this.closed = false;
        this.wakeupTrigger = new WakeupTrigger();
        this.lastPendingAsyncCommit = null;
        this.currentThread = new AtomicLong(-1L);
        this.refCount = new AtomicInteger(0);
        this.log = logContext.logger(getClass());
        this.subscriptions = subscriptionState;
        this.clientId = consumerConfig.getString("client.id");
        this.autoCommitEnabled = consumerConfig.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG).booleanValue();
        this.fetchBuffer = new FetchBuffer(logContext);
        this.isolationLevel = IsolationLevel.READ_UNCOMMITTED;
        this.interceptors = new ConsumerInterceptors<>(Collections.emptyList());
        this.time = time;
        this.metrics = new Metrics(time);
        this.metadata = consumerMetadata;
        this.metadataVersionSnapshot = consumerMetadata.updateVersion();
        this.retryBackoffMs = consumerConfig.getLong("retry.backoff.ms").longValue();
        this.defaultApiTimeoutMs = consumerConfig.getInt("default.api.timeout.ms").intValue();
        this.deserializers = new Deserializers<>(deserializer, deserializer2);
        this.clientTelemetryReporter = Optional.empty();
        FetchMetricsManager fetchMetricsManager = new FetchMetricsManager(this.metrics, new ConsumerMetrics("consumer").fetcherMetrics);
        this.fetchCollector = new FetchCollector<>(logContext, consumerMetadata, subscriptionState, new FetchConfig(consumerConfig), this.deserializers, fetchMetricsManager, time);
        this.kafkaConsumerMetrics = new KafkaConsumerMetrics(this.metrics, "consumer");
        GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig(consumerConfig, GroupRebalanceConfig.ProtocolType.CONSUMER);
        this.groupMetadata.set(initializeGroupMetadata(consumerConfig, groupRebalanceConfig));
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        this.backgroundEventQueue = new LinkedBlockingQueue();
        BackgroundEventHandler backgroundEventHandler = new BackgroundEventHandler(this.backgroundEventQueue);
        ConsumerRebalanceListenerInvoker consumerRebalanceListenerInvoker = new ConsumerRebalanceListenerInvoker(logContext, subscriptionState, time, new RebalanceCallbackMetricsManager(this.metrics));
        ApiVersions apiVersions = new ApiVersions();
        Supplier supplier = () -> {
            return new NetworkClientDelegate(time, consumerConfig, logContext, kafkaClient, consumerMetadata, backgroundEventHandler);
        };
        this.offsetCommitCallbackInvoker = new OffsetCommitCallbackInvoker(this.interceptors);
        this.asyncCommitFenced = new AtomicBoolean(false);
        Supplier<RequestManagers> supplier2 = RequestManagers.supplier(time, logContext, backgroundEventHandler, consumerMetadata, subscriptionState, this.fetchBuffer, consumerConfig, groupRebalanceConfig, apiVersions, fetchMetricsManager, supplier, this.clientTelemetryReporter, this.metrics, this.offsetCommitCallbackInvoker, this::updateGroupMetadata);
        this.applicationEventHandler = new ApplicationEventHandler(logContext, time, linkedBlockingQueue, new CompletableEventReaper(logContext), ApplicationEventProcessor.supplier(logContext, consumerMetadata, subscriptionState, supplier2), supplier, supplier2);
        this.backgroundEventProcessor = new BackgroundEventProcessor(consumerRebalanceListenerInvoker);
        this.backgroundEventReaper = new CompletableEventReaper(logContext);
    }

    private Optional<ConsumerGroupMetadata> initializeGroupMetadata(ConsumerConfig consumerConfig, GroupRebalanceConfig groupRebalanceConfig) {
        Optional<ConsumerGroupMetadata> initializeGroupMetadata = initializeGroupMetadata(groupRebalanceConfig.groupId, groupRebalanceConfig.groupInstanceId);
        if (!initializeGroupMetadata.isPresent()) {
            consumerConfig.ignore(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG);
            consumerConfig.ignore("internal.throw.on.fetch.stable.offset.unsupported");
        }
        return initializeGroupMetadata;
    }

    private Optional<ConsumerGroupMetadata> initializeGroupMetadata(String str, Optional<String> optional) {
        if (str == null) {
            return Optional.empty();
        }
        if (str.isEmpty()) {
            throw new InvalidGroupIdException("The configured group.id should not be an empty string or whitespace.");
        }
        return Optional.of(initializeConsumerGroupMetadata(str, optional));
    }

    private ConsumerGroupMetadata initializeConsumerGroupMetadata(String str, Optional<String> optional) {
        return new ConsumerGroupMetadata(str, -1, "", optional);
    }

    private void updateGroupMetadata(Optional<Integer> optional, Optional<String> optional2) {
        this.groupMetadata.updateAndGet(optional3 -> {
            return optional3.map(consumerGroupMetadata -> {
                return new ConsumerGroupMetadata(consumerGroupMetadata.groupId(), ((Integer) optional.orElse(Integer.valueOf(consumerGroupMetadata.generationId()))).intValue(), (String) optional2.orElse(consumerGroupMetadata.memberId()), consumerGroupMetadata.groupInstanceId());
            });
        });
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public ConsumerRecords<K, V> poll(Duration duration) {
        Timer timer = this.time.timer(duration);
        acquireAndEnsureOpen();
        try {
            this.kafkaConsumerMetrics.recordPollStart(timer.currentTimeMs());
            if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) {
                throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
            }
            do {
                this.applicationEventHandler.add(new PollEvent(timer.currentTimeMs()));
                this.wakeupTrigger.maybeTriggerWakeup();
                updateAssignmentMetadataIfNeeded(timer);
                Fetch<K, V> pollForFetches = pollForFetches(timer);
                if (!pollForFetches.isEmpty()) {
                    if (pollForFetches.records().isEmpty()) {
                        this.log.trace("Returning empty records from `poll()` since the consumer's position has advanced for at least one topic partition");
                    }
                    ConsumerRecords<K, V> onConsume = this.interceptors.onConsume(new ConsumerRecords<>(pollForFetches.records()));
                    this.kafkaConsumerMetrics.recordPollEnd(timer.currentTimeMs());
                    release();
                    return onConsume;
                }
            } while (timer.notExpired());
            ConsumerRecords<K, V> empty = ConsumerRecords.empty();
            this.kafkaConsumerMetrics.recordPollEnd(timer.currentTimeMs());
            release();
            return empty;
        } catch (Throwable th) {
            this.kafkaConsumerMetrics.recordPollEnd(timer.currentTimeMs());
            release();
            throw th;
        }
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void commitSync() {
        commitSync(Duration.ofMillis(this.defaultApiTimeoutMs));
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void commitAsync() {
        commitAsync(null);
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void commitAsync(OffsetCommitCallback offsetCommitCallback) {
        commitAsync(this.subscriptions.allConsumed(), offsetCommitCallback);
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void commitAsync(Map<TopicPartition, OffsetAndMetadata> map, OffsetCommitCallback offsetCommitCallback) {
        acquireAndEnsureOpen();
        try {
            this.lastPendingAsyncCommit = commit(new AsyncCommitEvent(map)).whenComplete((r8, th) -> {
                if (th == null) {
                    this.offsetCommitCallbackInvoker.enqueueInterceptorInvocation(map);
                }
                if (th instanceof FencedInstanceIdException) {
                    this.asyncCommitFenced.set(true);
                }
                if (offsetCommitCallback != null) {
                    this.offsetCommitCallbackInvoker.enqueueUserCallbackInvocation(offsetCommitCallback, map, (Exception) th);
                } else if (th != null) {
                    this.log.error("Offset commit with offsets {} failed", map, th);
                }
            });
            release();
        } catch (Throwable th2) {
            release();
            throw th2;
        }
    }

    private CompletableFuture<Void> commit(CommitEvent commitEvent) {
        maybeThrowInvalidGroupIdException();
        maybeThrowFencedInstanceException();
        this.offsetCommitCallbackInvoker.executeCallbacks();
        Map<TopicPartition, OffsetAndMetadata> offsets = commitEvent.offsets();
        this.log.debug("Committing offsets: {}", offsets);
        offsets.forEach(this::updateLastSeenEpochIfNewer);
        if (offsets.isEmpty()) {
            return CompletableFuture.completedFuture(null);
        }
        this.applicationEventHandler.add(commitEvent);
        return commitEvent.future();
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void seek(TopicPartition topicPartition, long j) {
        if (j < 0) {
            throw new IllegalArgumentException("seek offset must not be a negative number");
        }
        acquireAndEnsureOpen();
        try {
            this.log.info("Seeking to offset {} for partition {}", Long.valueOf(j), topicPartition);
            this.subscriptions.seekUnvalidated(topicPartition, new SubscriptionState.FetchPosition(j, Optional.empty(), this.metadata.currentLeader(topicPartition)));
            release();
        } catch (Throwable th) {
            release();
            throw th;
        }
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void seek(TopicPartition topicPartition, OffsetAndMetadata offsetAndMetadata) {
        long offset = offsetAndMetadata.offset();
        if (offset < 0) {
            throw new IllegalArgumentException("seek offset must not be a negative number");
        }
        acquireAndEnsureOpen();
        try {
            if (offsetAndMetadata.leaderEpoch().isPresent()) {
                this.log.info("Seeking to offset {} for partition {} with epoch {}", Long.valueOf(offset), topicPartition, offsetAndMetadata.leaderEpoch().get());
            } else {
                this.log.info("Seeking to offset {} for partition {}", Long.valueOf(offset), topicPartition);
            }
            SubscriptionState.FetchPosition fetchPosition = new SubscriptionState.FetchPosition(offsetAndMetadata.offset(), offsetAndMetadata.leaderEpoch(), this.metadata.currentLeader(topicPartition));
            updateLastSeenEpochIfNewer(topicPartition, offsetAndMetadata);
            this.subscriptions.seekUnvalidated(topicPartition, fetchPosition);
            release();
        } catch (Throwable th) {
            release();
            throw th;
        }
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void seekToBeginning(Collection<TopicPartition> collection) {
        if (collection == null) {
            throw new IllegalArgumentException("Partitions collection cannot be null");
        }
        acquireAndEnsureOpen();
        try {
            this.subscriptions.requestOffsetReset(collection.isEmpty() ? this.subscriptions.assignedPartitions() : collection, OffsetResetStrategy.EARLIEST);
        } finally {
            release();
        }
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void seekToEnd(Collection<TopicPartition> collection) {
        if (collection == null) {
            throw new IllegalArgumentException("Partitions collection cannot be null");
        }
        acquireAndEnsureOpen();
        try {
            this.subscriptions.requestOffsetReset(collection.isEmpty() ? this.subscriptions.assignedPartitions() : collection, OffsetResetStrategy.LATEST);
        } finally {
            release();
        }
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public long position(TopicPartition topicPartition) {
        return position(topicPartition, Duration.ofMillis(this.defaultApiTimeoutMs));
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public long position(TopicPartition topicPartition, Duration duration) {
        acquireAndEnsureOpen();
        try {
            if (!this.subscriptions.isAssigned(topicPartition)) {
                throw new IllegalStateException("You can only check the position for partitions assigned to this consumer.");
            }
            Timer timer = this.time.timer(duration);
            do {
                SubscriptionState.FetchPosition validPosition = this.subscriptions.validPosition(topicPartition);
                if (validPosition != null) {
                    long j = validPosition.offset;
                    release();
                    return j;
                }
                updateFetchPositions(timer);
                timer.update();
                this.wakeupTrigger.maybeTriggerWakeup();
            } while (timer.notExpired());
            throw new TimeoutException("Timeout of " + duration.toMillis() + "ms expired before the position for partition " + topicPartition + " could be determined");
        } catch (Throwable th) {
            release();
            throw th;
        }
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    @Deprecated
    public OffsetAndMetadata committed(TopicPartition topicPartition) {
        return committed(topicPartition, Duration.ofMillis(this.defaultApiTimeoutMs));
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    @Deprecated
    public OffsetAndMetadata committed(TopicPartition topicPartition, Duration duration) {
        return committed(Collections.singleton(topicPartition), duration).get(topicPartition);
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public Map<TopicPartition, OffsetAndMetadata> committed(Set<TopicPartition> set) {
        return committed(set, Duration.ofMillis(this.defaultApiTimeoutMs));
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public Map<TopicPartition, OffsetAndMetadata> committed(Set<TopicPartition> set, Duration duration) {
        acquireAndEnsureOpen();
        long nanoseconds = this.time.nanoseconds();
        try {
            maybeThrowInvalidGroupIdException();
            if (set.isEmpty()) {
                Map<TopicPartition, OffsetAndMetadata> emptyMap = Collections.emptyMap();
                this.kafkaConsumerMetrics.recordCommitted(this.time.nanoseconds() - nanoseconds);
                release();
                return emptyMap;
            }
            FetchCommittedOffsetsEvent fetchCommittedOffsetsEvent = new FetchCommittedOffsetsEvent(set, CompletableEvent.calculateDeadlineMs(this.time, duration));
            this.wakeupTrigger.setActiveTask(fetchCommittedOffsetsEvent.future());
            try {
                try {
                    Map<TopicPartition, OffsetAndMetadata> map = (Map) this.applicationEventHandler.addAndGet(fetchCommittedOffsetsEvent);
                    map.forEach(this::updateLastSeenEpochIfNewer);
                    this.wakeupTrigger.clearTask();
                    this.kafkaConsumerMetrics.recordCommitted(this.time.nanoseconds() - nanoseconds);
                    release();
                    return map;
                } catch (Throwable th) {
                    this.wakeupTrigger.clearTask();
                    throw th;
                }
            } catch (TimeoutException e) {
                throw new TimeoutException("Timeout of " + duration.toMillis() + "ms expired before the last committed offset for partitions " + set + " could be determined. Try tuning default.api.timeout.ms larger to relax the threshold.");
            }
        } catch (Throwable th2) {
            this.kafkaConsumerMetrics.recordCommitted(this.time.nanoseconds() - nanoseconds);
            release();
            throw th2;
        }
    }

    private void maybeThrowInvalidGroupIdException() {
        if (!this.groupMetadata.get().isPresent()) {
            throw new InvalidGroupIdException("To use the group management or offset commit APIs, you must provide a valid group.id in the consumer configuration.");
        }
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public Map<MetricName, ? extends Metric> metrics() {
        return Collections.unmodifiableMap(this.metrics.metrics());
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public List<PartitionInfo> partitionsFor(String str) {
        return partitionsFor(str, Duration.ofMillis(this.defaultApiTimeoutMs));
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public List<PartitionInfo> partitionsFor(String str, Duration duration) {
        acquireAndEnsureOpen();
        try {
            List<PartitionInfo> partitionsForTopic = this.metadata.fetch().partitionsForTopic(str);
            if (!partitionsForTopic.isEmpty()) {
                return partitionsForTopic;
            }
            if (duration.toMillis() == 0) {
                throw new TimeoutException();
            }
            TopicMetadataEvent topicMetadataEvent = new TopicMetadataEvent(str, CompletableEvent.calculateDeadlineMs(this.time, duration));
            this.wakeupTrigger.setActiveTask(topicMetadataEvent.future());
            try {
                List<PartitionInfo> list = (List) ((Map) this.applicationEventHandler.addAndGet(topicMetadataEvent)).getOrDefault(str, Collections.emptyList());
                this.wakeupTrigger.clearTask();
                release();
                return list;
            } catch (Throwable th) {
                this.wakeupTrigger.clearTask();
                throw th;
            }
        } finally {
            release();
        }
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public Map<String, List<PartitionInfo>> listTopics() {
        return listTopics(Duration.ofMillis(this.defaultApiTimeoutMs));
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public Map<String, List<PartitionInfo>> listTopics(Duration duration) {
        acquireAndEnsureOpen();
        try {
            if (duration.toMillis() == 0) {
                throw new TimeoutException();
            }
            AllTopicsMetadataEvent allTopicsMetadataEvent = new AllTopicsMetadataEvent(CompletableEvent.calculateDeadlineMs(this.time, duration));
            this.wakeupTrigger.setActiveTask(allTopicsMetadataEvent.future());
            try {
                Map<String, List<PartitionInfo>> map = (Map) this.applicationEventHandler.addAndGet(allTopicsMetadataEvent);
                this.wakeupTrigger.clearTask();
                release();
                return map;
            } catch (Throwable th) {
                this.wakeupTrigger.clearTask();
                throw th;
            }
        } catch (Throwable th2) {
            release();
            throw th2;
        }
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public Set<TopicPartition> paused() {
        acquireAndEnsureOpen();
        try {
            return Collections.unmodifiableSet(this.subscriptions.pausedPartitions());
        } finally {
            release();
        }
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void pause(Collection<TopicPartition> collection) {
        acquireAndEnsureOpen();
        try {
            this.log.debug("Pausing partitions {}", collection);
            Iterator<TopicPartition> it = collection.iterator();
            while (it.hasNext()) {
                this.subscriptions.pause(it.next());
            }
        } finally {
            release();
        }
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void resume(Collection<TopicPartition> collection) {
        acquireAndEnsureOpen();
        try {
            this.log.debug("Resuming partitions {}", collection);
            Iterator<TopicPartition> it = collection.iterator();
            while (it.hasNext()) {
                this.subscriptions.resume(it.next());
            }
        } finally {
            release();
        }
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> map) {
        return offsetsForTimes(map, Duration.ofMillis(this.defaultApiTimeoutMs));
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> map, Duration duration) {
        acquireAndEnsureOpen();
        try {
            Objects.requireNonNull(map, "Timestamps to search cannot be null");
            for (Map.Entry<TopicPartition, Long> entry : map.entrySet()) {
                if (entry.getValue().longValue() < 0) {
                    throw new IllegalArgumentException("The target time for partition " + entry.getKey() + " is " + entry.getValue() + ". The target time cannot be negative.");
                }
            }
            if (map.isEmpty()) {
                Map<TopicPartition, OffsetAndTimestamp> emptyMap = Collections.emptyMap();
                release();
                return emptyMap;
            }
            ListOffsetsEvent listOffsetsEvent = new ListOffsetsEvent(map, CompletableEvent.calculateDeadlineMs(this.time, duration), true);
            if (duration.toMillis() == 0) {
                this.applicationEventHandler.add(listOffsetsEvent);
                Map<TopicPartition, OffsetAndTimestamp> emptyResults = listOffsetsEvent.emptyResults();
                release();
                return emptyResults;
            }
            try {
                Map<TopicPartition, OffsetAndTimestamp> map2 = (Map) ((Map) this.applicationEventHandler.addAndGet(listOffsetsEvent)).entrySet().stream().collect(Collectors.toMap((v0) -> {
                    return v0.getKey();
                }, entry2 -> {
                    return ((OffsetAndTimestampInternal) entry2.getValue()).buildOffsetAndTimestamp();
                }));
                release();
                return map2;
            } catch (TimeoutException e) {
                throw new TimeoutException("Failed to get offsets by times in " + duration.toMillis() + "ms");
            }
        } catch (Throwable th) {
            release();
            throw th;
        }
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> collection) {
        return beginningOffsets(collection, Duration.ofMillis(this.defaultApiTimeoutMs));
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> collection, Duration duration) {
        return beginningOrEndOffset(collection, -2L, duration);
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> collection) {
        return endOffsets(collection, Duration.ofMillis(this.defaultApiTimeoutMs));
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> collection, Duration duration) {
        return beginningOrEndOffset(collection, -1L, duration);
    }

    private Map<TopicPartition, Long> beginningOrEndOffset(Collection<TopicPartition> collection, long j, Duration duration) {
        acquireAndEnsureOpen();
        try {
            Objects.requireNonNull(collection, "Partitions cannot be null");
            if (collection.isEmpty()) {
                Map<TopicPartition, Long> emptyMap = Collections.emptyMap();
                release();
                return emptyMap;
            }
            ListOffsetsEvent listOffsetsEvent = new ListOffsetsEvent((Map) collection.stream().collect(Collectors.toMap(Function.identity(), topicPartition -> {
                return Long.valueOf(j);
            })), CompletableEvent.calculateDeadlineMs(this.time, duration), false);
            if (duration.isZero()) {
                this.applicationEventHandler.add(listOffsetsEvent);
                Map<TopicPartition, Long> emptyResults = listOffsetsEvent.emptyResults();
                release();
                return emptyResults;
            }
            try {
                Map<TopicPartition, Long> map = (Map) ((Map) this.applicationEventHandler.addAndGet(listOffsetsEvent)).entrySet().stream().collect(Collectors.toMap((v0) -> {
                    return v0.getKey();
                }, entry -> {
                    return Long.valueOf(((OffsetAndTimestampInternal) entry.getValue()).offset());
                }));
                release();
                return map;
            } catch (TimeoutException e) {
                throw new TimeoutException("Failed to get offsets by times in " + duration.toMillis() + "ms");
            }
        } catch (Throwable th) {
            release();
            throw th;
        }
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public OptionalLong currentLag(TopicPartition topicPartition) {
        acquireAndEnsureOpen();
        try {
            Long partitionLag = this.subscriptions.partitionLag(topicPartition, this.isolationLevel);
            if (partitionLag != null) {
                OptionalLong of = OptionalLong.of(partitionLag.longValue());
                release();
                return of;
            }
            if (this.subscriptions.partitionEndOffset(topicPartition, this.isolationLevel) == null && !this.subscriptions.partitionEndOffsetRequested(topicPartition)) {
                this.log.info("Requesting the log end offset for {} in order to compute lag", topicPartition);
                this.subscriptions.requestPartitionEndOffset(topicPartition);
                endOffsets(Collections.singleton(topicPartition), Duration.ofMillis(0L));
            }
            OptionalLong empty = OptionalLong.empty();
            release();
            return empty;
        } catch (Throwable th) {
            release();
            throw th;
        }
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public ConsumerGroupMetadata groupMetadata() {
        acquireAndEnsureOpen();
        try {
            maybeThrowInvalidGroupIdException();
            return this.groupMetadata.get().get();
        } finally {
            release();
        }
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void enforceRebalance() {
        this.log.warn("Operation not supported in new consumer group protocol");
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void enforceRebalance(String str) {
        this.log.warn("Operation not supported in new consumer group protocol");
    }

    @Override // org.apache.kafka.clients.consumer.Consumer, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        close(Duration.ofMillis(30000L));
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void close(Duration duration) {
        if (duration.toMillis() < 0) {
            throw new IllegalArgumentException("The timeout cannot be negative.");
        }
        acquire();
        try {
            if (!this.closed) {
                close(duration, false);
            }
        } finally {
            this.closed = true;
            release();
        }
    }

    private void close(Duration duration, boolean z) {
        this.log.trace("Closing the Kafka consumer");
        AtomicReference atomicReference = new AtomicReference();
        this.wakeupTrigger.disableWakeups();
        Timer timer = this.time.timer(duration);
        this.clientTelemetryReporter.ifPresent((v0) -> {
            v0.initiateClose();
        });
        timer.update();
        org.apache.kafka.common.utils.Utils.swallow(this.log, Level.ERROR, "Failed to release assignment before closing consumer", () -> {
            releaseAssignmentAndLeaveGroup(timer);
        }, atomicReference);
        org.apache.kafka.common.utils.Utils.swallow(this.log, Level.ERROR, "Failed invoking asynchronous commit callback.", () -> {
            awaitPendingAsyncCommitsAndExecuteCommitCallbacks(timer, false);
        }, atomicReference);
        if (this.applicationEventHandler != null) {
            org.apache.kafka.common.utils.Utils.closeQuietly(() -> {
                this.applicationEventHandler.close(Duration.ofMillis(timer.remainingMs()));
            }, "Failed shutting down network thread", (AtomicReference<Throwable>) atomicReference);
        }
        timer.update();
        if (this.backgroundEventReaper != null && this.backgroundEventQueue != null) {
            this.backgroundEventReaper.reap(this.backgroundEventQueue);
        }
        org.apache.kafka.common.utils.Utils.closeQuietly(this.interceptors, "consumer interceptors", (AtomicReference<Throwable>) atomicReference);
        org.apache.kafka.common.utils.Utils.closeQuietly(this.kafkaConsumerMetrics, "kafka consumer metrics", (AtomicReference<Throwable>) atomicReference);
        org.apache.kafka.common.utils.Utils.closeQuietly(this.metrics, "consumer metrics", (AtomicReference<Throwable>) atomicReference);
        org.apache.kafka.common.utils.Utils.closeQuietly(this.deserializers, "consumer deserializers", (AtomicReference<Throwable>) atomicReference);
        this.clientTelemetryReporter.ifPresent(clientTelemetryReporter -> {
            org.apache.kafka.common.utils.Utils.closeQuietly(clientTelemetryReporter, "async consumer telemetry reporter", (AtomicReference<Throwable>) atomicReference);
        });
        AppInfoParser.unregisterAppInfo(ConsumerUtils.CONSUMER_JMX_PREFIX, this.clientId, this.metrics);
        this.log.debug("Kafka consumer has been closed");
        Throwable th = (Throwable) atomicReference.get();
        if (th == null || z) {
            return;
        }
        if (!(th instanceof InterruptException)) {
            throw new KafkaException("Failed to close kafka consumer", th);
        }
        throw ((InterruptException) th);
    }

    private void releaseAssignmentAndLeaveGroup(Timer timer) {
        if (this.groupMetadata.get().isPresent()) {
            if (this.autoCommitEnabled) {
                commitSyncAllConsumed(timer);
            }
            this.applicationEventHandler.add(new CommitOnCloseEvent());
            this.log.info("Releasing assignment and leaving group before closing consumer");
            UnsubscribeEvent unsubscribeEvent = new UnsubscribeEvent(CompletableEvent.calculateDeadlineMs(timer));
            this.applicationEventHandler.add(unsubscribeEvent);
            try {
                try {
                    processBackgroundEvents(unsubscribeEvent.future(), timer);
                    this.log.info("Completed releasing assignment and sending leave group to close consumer");
                    timer.update();
                } catch (TimeoutException e) {
                    this.log.warn("Consumer triggered an unsubscribe event to leave the group but couldn't complete it within {} ms. It will proceed to close.", Long.valueOf(timer.timeoutMs()));
                    timer.update();
                }
            } catch (Throwable th) {
                timer.update();
                throw th;
            }
        }
    }

    void commitSyncAllConsumed(Timer timer) {
        Map<TopicPartition, OffsetAndMetadata> allConsumed = this.subscriptions.allConsumed();
        this.log.debug("Sending synchronous auto-commit of offsets {} on closing", allConsumed);
        try {
            commitSync(allConsumed, Duration.ofMillis(timer.remainingMs()));
        } catch (Exception e) {
            this.log.warn("Synchronous auto-commit of offsets {} failed: {}", allConsumed, e.getMessage());
        }
        timer.update();
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void wakeup() {
        this.wakeupTrigger.wakeup();
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void commitSync(Duration duration) {
        commitSync(this.subscriptions.allConsumed(), duration);
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void commitSync(Map<TopicPartition, OffsetAndMetadata> map) {
        commitSync(map, Duration.ofMillis(this.defaultApiTimeoutMs));
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void commitSync(Map<TopicPartition, OffsetAndMetadata> map, Duration duration) {
        acquireAndEnsureOpen();
        long nanoseconds = this.time.nanoseconds();
        try {
            CompletableFuture<Void> commit = commit(new SyncCommitEvent(map, CompletableEvent.calculateDeadlineMs(this.time, duration)));
            Timer timer = this.time.timer(duration.toMillis());
            awaitPendingAsyncCommitsAndExecuteCommitCallbacks(timer, true);
            this.wakeupTrigger.setActiveTask(commit);
            ConsumerUtils.getResult(commit, timer);
            this.interceptors.onCommit(map);
            this.wakeupTrigger.clearTask();
            this.kafkaConsumerMetrics.recordCommitSync(this.time.nanoseconds() - nanoseconds);
            release();
        } catch (Throwable th) {
            this.wakeupTrigger.clearTask();
            this.kafkaConsumerMetrics.recordCommitSync(this.time.nanoseconds() - nanoseconds);
            release();
            throw th;
        }
    }

    private void awaitPendingAsyncCommitsAndExecuteCommitCallbacks(Timer timer, boolean z) {
        if (this.lastPendingAsyncCommit == null) {
            return;
        }
        try {
            CompletableFuture completableFuture = new CompletableFuture();
            this.lastPendingAsyncCommit.whenComplete((r4, th) -> {
                completableFuture.complete(null);
            });
            if (z) {
                this.wakeupTrigger.setActiveTask(completableFuture);
            }
            ConsumerUtils.getResult(completableFuture, timer);
            this.lastPendingAsyncCommit = null;
            if (z) {
                this.wakeupTrigger.clearTask();
            }
            timer.update();
            this.offsetCommitCallbackInvoker.executeCallbacks();
        } catch (Throwable th2) {
            if (z) {
                this.wakeupTrigger.clearTask();
            }
            timer.update();
            throw th2;
        }
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public Uuid clientInstanceId(Duration duration) {
        if (this.clientTelemetryReporter.isPresent()) {
            return ClientTelemetryUtils.fetchClientInstanceId(this.clientTelemetryReporter.get(), duration);
        }
        throw new IllegalStateException("Telemetry is not enabled. Set config `enable.metrics.push` to `true`.");
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public Set<TopicPartition> assignment() {
        acquireAndEnsureOpen();
        try {
            return Collections.unmodifiableSet(this.subscriptions.assignedPartitions());
        } finally {
            release();
        }
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public Set<String> subscription() {
        acquireAndEnsureOpen();
        try {
            return Collections.unmodifiableSet(this.subscriptions.subscription());
        } finally {
            release();
        }
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void assign(Collection<TopicPartition> collection) {
        acquireAndEnsureOpen();
        try {
            if (collection == null) {
                throw new IllegalArgumentException("Topic partitions collection to assign to cannot be null");
            }
            if (collection.isEmpty()) {
                unsubscribe();
                release();
                return;
            }
            Iterator<TopicPartition> it = collection.iterator();
            while (it.hasNext()) {
                TopicPartition next = it.next();
                if (org.apache.kafka.common.utils.Utils.isBlank(next != null ? next.topic() : null)) {
                    throw new IllegalArgumentException("Topic partitions to assign to cannot have null or empty topic");
                }
            }
            HashSet hashSet = new HashSet();
            for (TopicPartition topicPartition : this.subscriptions.assignedPartitions()) {
                if (collection.contains(topicPartition)) {
                    hashSet.add(topicPartition);
                }
            }
            this.fetchBuffer.retainAll(hashSet);
            this.applicationEventHandler.add(new AssignmentChangeEvent(this.subscriptions.allConsumed(), this.time.milliseconds()));
            this.log.info("Assigned to partition(s): {}", collection.stream().map((v0) -> {
                return v0.toString();
            }).collect(Collectors.joining(", ")));
            if (this.subscriptions.assignFromUser(new HashSet(collection))) {
                this.applicationEventHandler.add(new NewTopicsMetadataUpdateRequestEvent());
            }
        } finally {
            release();
        }
    }

    private void updatePatternSubscription(Cluster cluster) {
        Stream<String> stream = cluster.topics().stream();
        SubscriptionState subscriptionState = this.subscriptions;
        subscriptionState.getClass();
        if (this.subscriptions.subscribeFromPattern((Set) stream.filter(subscriptionState::matchesSubscribedPattern).collect(Collectors.toSet()))) {
            this.applicationEventHandler.add(new SubscriptionChangeEvent());
            this.metadataVersionSnapshot = this.metadata.requestUpdateForNewTopics();
        }
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void unsubscribe() {
        acquireAndEnsureOpen();
        try {
            try {
                this.fetchBuffer.retainAll(Collections.emptySet());
                Timer timer = this.time.timer(Long.MAX_VALUE);
                UnsubscribeEvent unsubscribeEvent = new UnsubscribeEvent(CompletableEvent.calculateDeadlineMs(timer));
                this.applicationEventHandler.add(unsubscribeEvent);
                this.log.info("Unsubscribing all topics or patterns and assigned partitions {}", this.subscriptions.assignedPartitions());
                try {
                    processBackgroundEvents(unsubscribeEvent.future(), timer);
                    this.log.info("Unsubscribed all topics or patterns and assigned partitions");
                } catch (TimeoutException e) {
                    this.log.error("Failed while waiting for the unsubscribe event to complete");
                }
                resetGroupMetadata();
                release();
            } catch (Throwable th) {
                release();
                throw th;
            }
        } catch (Exception e2) {
            this.log.error("Unsubscribe failed", (Throwable) e2);
            throw e2;
        }
    }

    private void resetGroupMetadata() {
        this.groupMetadata.updateAndGet(optional -> {
            return optional.map(consumerGroupMetadata -> {
                return initializeConsumerGroupMetadata(consumerGroupMetadata.groupId(), consumerGroupMetadata.groupInstanceId());
            });
        });
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    @Deprecated
    public ConsumerRecords<K, V> poll(long j) {
        throw new UnsupportedOperationException("Consumer.poll(long) is not supported when \"group.protocol\" is \"consumer\". This method is deprecated and will be removed in the next major release.");
    }

    WakeupTrigger wakeupTrigger() {
        return this.wakeupTrigger;
    }

    private Fetch<K, V> pollForFetches(Timer timer) {
        long min = isCommittedOffsetsManagementEnabled() ? Math.min(this.applicationEventHandler.maximumTimeToWait(), timer.remainingMs()) : timer.remainingMs();
        Fetch<K, V> collectFetch = collectFetch();
        if (!collectFetch.isEmpty()) {
            return collectFetch;
        }
        if (!this.cachedSubscriptionHasAllFetchPositions && min > this.retryBackoffMs) {
            min = this.retryBackoffMs;
        }
        this.log.trace("Polling for fetches with timeout {}", Long.valueOf(min));
        Timer timer2 = this.time.timer(min);
        this.wakeupTrigger.setFetchAction(this.fetchBuffer);
        try {
            try {
                this.fetchBuffer.awaitNotEmpty(timer2);
                timer.update(timer2.currentTimeMs());
                this.wakeupTrigger.clearTask();
                return collectFetch();
            } catch (InterruptException e) {
                this.log.trace("Interrupt during fetch", (Throwable) e);
                throw e;
            }
        } catch (Throwable th) {
            timer.update(timer2.currentTimeMs());
            this.wakeupTrigger.clearTask();
            throw th;
        }
    }

    private Fetch<K, V> collectFetch() {
        Fetch<K, V> collectFetch = this.fetchCollector.collectFetch(this.fetchBuffer);
        this.applicationEventHandler.wakeupNetworkThread();
        return collectFetch;
    }

    private boolean updateFetchPositions(Timer timer) {
        try {
            this.applicationEventHandler.addAndGet(new ValidatePositionsEvent(CompletableEvent.calculateDeadlineMs(timer)));
            this.cachedSubscriptionHasAllFetchPositions = this.subscriptions.hasAllFetchPositions();
            if (this.cachedSubscriptionHasAllFetchPositions) {
                return true;
            }
            if (isCommittedOffsetsManagementEnabled() && !initWithCommittedOffsetsIfNeeded(timer)) {
                return false;
            }
            this.subscriptions.resetInitializingPositions();
            this.applicationEventHandler.addAndGet(new ResetPositionsEvent(CompletableEvent.calculateDeadlineMs(timer)));
            return true;
        } catch (TimeoutException e) {
            return false;
        }
    }

    private boolean isCommittedOffsetsManagementEnabled() {
        return this.groupMetadata.get().isPresent();
    }

    private boolean initWithCommittedOffsetsIfNeeded(Timer timer) {
        Set<TopicPartition> initializingPartitions = this.subscriptions.initializingPartitions();
        if (initializingPartitions.isEmpty()) {
            return true;
        }
        this.log.debug("Refreshing committed offsets for partitions {}", initializingPartitions);
        if (!canReusePendingOffsetFetchEvent(initializingPartitions)) {
            this.pendingOffsetFetchEvent = new FetchCommittedOffsetsEvent(initializingPartitions, CompletableEvent.calculateDeadlineMs(this.time, Math.max(this.defaultApiTimeoutMs, timer.remainingMs())));
            this.applicationEventHandler.add(this.pendingOffsetFetchEvent);
        }
        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future = this.pendingOffsetFetchEvent.future();
        try {
            try {
                try {
                    try {
                        this.wakeupTrigger.setActiveTask(future);
                        Map map = (Map) ConsumerUtils.getResult(future, timer);
                        this.pendingOffsetFetchEvent = null;
                        ConsumerUtils.refreshCommittedOffsets(map, this.metadata, this.subscriptions);
                        this.wakeupTrigger.clearTask();
                        return true;
                    } catch (InterruptException e) {
                        throw e;
                    }
                } catch (Throwable th) {
                    this.pendingOffsetFetchEvent = null;
                    throw ConsumerUtils.maybeWrapAsKafkaException(th);
                }
            } catch (TimeoutException e2) {
                this.log.debug("The committed offsets for the following partition(s) could not be refreshed within the timeout: {} ", initializingPartitions);
                this.wakeupTrigger.clearTask();
                return false;
            }
        } catch (Throwable th2) {
            this.wakeupTrigger.clearTask();
            throw th2;
        }
    }

    private boolean canReusePendingOffsetFetchEvent(Set<TopicPartition> set) {
        return this.pendingOffsetFetchEvent != null && this.pendingOffsetFetchEvent.partitions().equals(set) && this.pendingOffsetFetchEvent.deadlineMs() > this.time.milliseconds();
    }

    private void updateLastSeenEpochIfNewer(TopicPartition topicPartition, OffsetAndMetadata offsetAndMetadata) {
        if (offsetAndMetadata != null) {
            offsetAndMetadata.leaderEpoch().ifPresent(num -> {
                this.metadata.updateLastSeenEpochIfNewer(topicPartition, num.intValue());
            });
        }
    }

    @Override // org.apache.kafka.clients.consumer.internals.ConsumerDelegate
    public boolean updateAssignmentMetadataIfNeeded(Timer timer) {
        maybeThrowFencedInstanceException();
        this.offsetCommitCallbackInvoker.executeCallbacks();
        maybeUpdateSubscriptionMetadata();
        processBackgroundEvents();
        return updateFetchPositions(timer);
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void subscribe(Collection<String> collection) {
        subscribeInternal(collection, Optional.empty());
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void subscribe(Collection<String> collection, ConsumerRebalanceListener consumerRebalanceListener) {
        if (consumerRebalanceListener == null) {
            throw new IllegalArgumentException("RebalanceListener cannot be null");
        }
        subscribeInternal(collection, Optional.of(consumerRebalanceListener));
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void subscribe(Pattern pattern) {
        subscribeInternal(pattern, Optional.empty());
    }

    @Override // org.apache.kafka.clients.consumer.Consumer
    public void subscribe(Pattern pattern, ConsumerRebalanceListener consumerRebalanceListener) {
        if (consumerRebalanceListener == null) {
            throw new IllegalArgumentException("RebalanceListener cannot be null");
        }
        subscribeInternal(pattern, Optional.of(consumerRebalanceListener));
    }

    private void acquireAndEnsureOpen() {
        acquire();
        if (this.closed) {
            release();
            throw new IllegalStateException("This consumer has already been closed.");
        }
    }

    private void acquire() {
        Thread currentThread = Thread.currentThread();
        long id = currentThread.getId();
        if (id != this.currentThread.get() && !this.currentThread.compareAndSet(-1L, id)) {
            throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access. currentThread(name: " + currentThread.getName() + ", id: " + id + ") otherThread(id: " + this.currentThread.get() + RuleConstants.RIGHT_PARENTHESIS);
        }
        this.refCount.incrementAndGet();
    }

    private void release() {
        if (this.refCount.decrementAndGet() == 0) {
            this.currentThread.set(-1L);
        }
    }

    private void subscribeInternal(Pattern pattern, Optional<ConsumerRebalanceListener> optional) {
        acquireAndEnsureOpen();
        try {
            maybeThrowInvalidGroupIdException();
            if (pattern == null || pattern.toString().isEmpty()) {
                throw new IllegalArgumentException("Topic pattern to subscribe to cannot be " + (pattern == null ? "null" : "empty"));
            }
            this.log.info("Subscribed to pattern: '{}'", pattern);
            this.subscriptions.subscribe(pattern, optional);
            this.metadata.requestUpdateForNewTopics();
            updatePatternSubscription(this.metadata.fetch());
        } finally {
            release();
        }
    }

    private void subscribeInternal(Collection<String> collection, Optional<ConsumerRebalanceListener> optional) {
        acquireAndEnsureOpen();
        try {
            maybeThrowInvalidGroupIdException();
            if (collection == null) {
                throw new IllegalArgumentException("Topic collection to subscribe to cannot be null");
            }
            if (collection.isEmpty()) {
                unsubscribe();
            } else {
                Iterator<String> it = collection.iterator();
                while (it.hasNext()) {
                    if (org.apache.kafka.common.utils.Utils.isBlank(it.next())) {
                        throw new IllegalArgumentException("Topic collection to subscribe to cannot contain null or empty topic");
                    }
                }
                HashSet hashSet = new HashSet();
                for (TopicPartition topicPartition : this.subscriptions.assignedPartitions()) {
                    if (collection.contains(topicPartition.topic())) {
                        hashSet.add(topicPartition);
                    }
                }
                this.fetchBuffer.retainAll(hashSet);
                this.log.info("Subscribed to topic(s): {}", String.join(", ", collection));
                if (this.subscriptions.subscribe(new HashSet(collection), optional)) {
                    this.metadataVersionSnapshot = this.metadata.requestUpdateForNewTopics();
                }
                this.applicationEventHandler.add(new SubscriptionChangeEvent());
            }
        } finally {
            release();
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private boolean processBackgroundEvents() {
        AtomicReference atomicReference = new AtomicReference();
        LinkedList linkedList = new LinkedList();
        this.backgroundEventQueue.drainTo(linkedList);
        Iterator it = linkedList.iterator();
        while (it.hasNext()) {
            BackgroundEvent backgroundEvent = (BackgroundEvent) it.next();
            try {
                if (backgroundEvent instanceof CompletableEvent) {
                    this.backgroundEventReaper.add((CompletableEvent) backgroundEvent);
                }
                this.backgroundEventProcessor.process(backgroundEvent);
            } catch (Throwable th) {
                KafkaException maybeWrapAsKafkaException = ConsumerUtils.maybeWrapAsKafkaException(th);
                if (!atomicReference.compareAndSet(null, maybeWrapAsKafkaException)) {
                    this.log.warn("An error occurred when processing the background event: {}", maybeWrapAsKafkaException.getMessage(), maybeWrapAsKafkaException);
                }
            }
        }
        this.backgroundEventReaper.reap(this.time.milliseconds());
        if (atomicReference.get() != null) {
            throw ((KafkaException) atomicReference.get());
        }
        return !linkedList.isEmpty();
    }

    <T> T processBackgroundEvents(Future<T> future, Timer timer) {
        do {
            boolean processBackgroundEvents = processBackgroundEvents();
            try {
            } catch (TimeoutException e) {
                timer.update();
            } catch (Throwable th) {
                timer.update();
                throw th;
            }
            if (future.isDone()) {
                T t = (T) ConsumerUtils.getResult(future);
                timer.update();
                return t;
            }
            if (!processBackgroundEvents) {
                T t2 = (T) ConsumerUtils.getResult(future, this.time.timer(100L));
                timer.update();
                return t2;
            }
            timer.update();
        } while (timer.notExpired());
        throw new TimeoutException("Operation timed out before completion");
    }

    static ConsumerRebalanceListenerCallbackCompletedEvent invokeRebalanceCallbacks(ConsumerRebalanceListenerInvoker consumerRebalanceListenerInvoker, ConsumerRebalanceListenerMethodName consumerRebalanceListenerMethodName, SortedSet<TopicPartition> sortedSet, CompletableFuture<Void> completableFuture) {
        Exception invokePartitionsLost;
        switch (consumerRebalanceListenerMethodName) {
            case ON_PARTITIONS_REVOKED:
                invokePartitionsLost = consumerRebalanceListenerInvoker.invokePartitionsRevoked(sortedSet);
                break;
            case ON_PARTITIONS_ASSIGNED:
                invokePartitionsLost = consumerRebalanceListenerInvoker.invokePartitionsAssigned(sortedSet);
                break;
            case ON_PARTITIONS_LOST:
                invokePartitionsLost = consumerRebalanceListenerInvoker.invokePartitionsLost(sortedSet);
                break;
            default:
                throw new IllegalArgumentException("The method " + consumerRebalanceListenerMethodName.fullyQualifiedMethodName() + " to invoke was not expected");
        }
        return new ConsumerRebalanceListenerCallbackCompletedEvent(consumerRebalanceListenerMethodName, completableFuture, invokePartitionsLost != null ? Optional.of(ConsumerUtils.maybeWrapAsKafkaException(invokePartitionsLost, "User rebalance callback throws an error")) : Optional.empty());
    }

    @Override // org.apache.kafka.clients.consumer.internals.ConsumerDelegate
    public String clientId() {
        return this.clientId;
    }

    @Override // org.apache.kafka.clients.consumer.internals.ConsumerDelegate
    public Metrics metricsRegistry() {
        return this.metrics;
    }

    @Override // org.apache.kafka.clients.consumer.internals.ConsumerDelegate
    public KafkaConsumerMetrics kafkaConsumerMetrics() {
        return this.kafkaConsumerMetrics;
    }

    private void maybeThrowFencedInstanceException() {
        if (this.asyncCommitFenced.get()) {
            String str = "unknown";
            if (!this.groupMetadata.get().isPresent()) {
                this.log.error("No group metadata found although a group ID was provided. This is a bug!");
            } else if (this.groupMetadata.get().get().groupInstanceId().isPresent()) {
                str = this.groupMetadata.get().get().groupInstanceId().get();
            } else {
                this.log.error("No group instance ID found although the consumer is fenced. This is a bug!");
            }
            throw new FencedInstanceIdException("Get fenced exception for group.instance.id " + str);
        }
    }

    SubscriptionState subscriptions() {
        return this.subscriptions;
    }

    boolean hasPendingOffsetFetchEvent() {
        return this.pendingOffsetFetchEvent != null;
    }

    private void maybeUpdateSubscriptionMetadata() {
        if (this.metadataVersionSnapshot < this.metadata.updateVersion()) {
            this.metadataVersionSnapshot = this.metadata.updateVersion();
            if (this.subscriptions.hasPatternSubscription()) {
                updatePatternSubscription(this.metadata.fetch());
            }
        }
    }
}
