package org.apache.kafka.clients.consumer.internals;

import java.io.Closeable;
import java.time.Duration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.FetchSessionHandler;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.internals.IdempotentCloser;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.slf4j.Logger;
import org.slf4j.helpers.MessageFormatter;

/* loaded from: input_file:org/apache/kafka/clients/consumer/internals/AbstractFetch.class */
public abstract class AbstractFetch implements Closeable {
    private final Logger log;
    private final Logger completedFetchLog;
    protected final LogContext logContext;
    protected final ConsumerMetadata metadata;
    protected final SubscriptionState subscriptions;
    protected final FetchConfig fetchConfig;
    protected final Time time;
    protected final FetchMetricsManager metricsManager;
    protected final FetchBuffer fetchBuffer;
    private final ApiVersions apiVersions;
    private final IdempotentCloser idempotentCloser = new IdempotentCloser();
    protected final BufferSupplier decompressionBufferSupplier = BufferSupplier.create();
    private final Map<Integer, FetchSessionHandler> sessionHandlers = new HashMap();
    protected final Set<Integer> nodesWithPendingFetchRequests = new HashSet();

    /* JADX INFO: Access modifiers changed from: protected */
    @FunctionalInterface
    /* loaded from: input_file:org/apache/kafka/clients/consumer/internals/AbstractFetch$ResponseHandler.class */
    public interface ResponseHandler<T> {
        void handle(Node node, FetchSessionHandler.FetchRequestData fetchRequestData, T t);
    }

    public AbstractFetch(LogContext logContext, ConsumerMetadata consumerMetadata, SubscriptionState subscriptionState, FetchConfig fetchConfig, FetchBuffer fetchBuffer, FetchMetricsManager fetchMetricsManager, Time time, ApiVersions apiVersions) {
        this.log = logContext.logger(AbstractFetch.class);
        this.completedFetchLog = logContext.logger(CompletedFetch.class);
        this.logContext = logContext;
        this.metadata = consumerMetadata;
        this.subscriptions = subscriptionState;
        this.fetchConfig = fetchConfig;
        this.fetchBuffer = fetchBuffer;
        this.metricsManager = fetchMetricsManager;
        this.time = time;
        this.apiVersions = apiVersions;
    }

    protected abstract boolean isUnavailable(Node node);

    protected abstract void maybeThrowAuthFailure(Node node);

    boolean hasCompletedFetches() {
        return !this.fetchBuffer.isEmpty();
    }

    public boolean hasAvailableFetches() {
        return this.fetchBuffer.hasCompletedFetches(completedFetch -> {
            return this.subscriptions.isFetchable(completedFetch.partition);
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void handleFetchSuccess(Node node, FetchSessionHandler.FetchRequestData fetchRequestData, ClientResponse clientResponse) {
        try {
            FetchResponse fetchResponse = (FetchResponse) clientResponse.responseBody();
            FetchSessionHandler sessionHandler = sessionHandler(node.id());
            if (sessionHandler == null) {
                this.log.error("Unable to find FetchSessionHandler for node {}. Ignoring fetch response.", Integer.valueOf(node.id()));
                removePendingFetchRequest(node, fetchRequestData.metadata().sessionId());
                return;
            }
            short apiVersion = clientResponse.requestHeader().apiVersion();
            if (!sessionHandler.handleResponse(fetchResponse, apiVersion)) {
                if (fetchResponse.error() == Errors.FETCH_SESSION_TOPIC_ID_ERROR) {
                    this.metadata.requestUpdate(false);
                }
                return;
            }
            LinkedHashMap<TopicPartition, FetchResponseData.PartitionData> responseData = fetchResponse.responseData(sessionHandler.sessionTopicNames(), apiVersion);
            FetchMetricsAggregator fetchMetricsAggregator = new FetchMetricsAggregator(this.metricsManager, new HashSet(responseData.keySet()));
            HashMap hashMap = new HashMap();
            for (Map.Entry<TopicPartition, FetchResponseData.PartitionData> entry : responseData.entrySet()) {
                TopicPartition key = entry.getKey();
                FetchRequest.PartitionData partitionData = fetchRequestData.sessionPartitions().get(key);
                if (partitionData == null) {
                    throw new IllegalStateException(fetchRequestData.metadata().isFull() ? MessageFormatter.arrayFormat("Response for missing full request partition: partition={}; metadata={}", new Object[]{key, fetchRequestData.metadata()}).getMessage() : MessageFormatter.arrayFormat("Response for missing session request partition: partition={}; metadata={}; toSend={}; toForget={}; toReplace={}", new Object[]{key, fetchRequestData.metadata(), fetchRequestData.toSend(), fetchRequestData.toForget(), fetchRequestData.toReplace()}).getMessage());
                }
                long j = partitionData.fetchOffset;
                FetchResponseData.PartitionData value = entry.getValue();
                this.log.debug("Fetch {} at offset {} for partition {} returned fetch data {}", this.fetchConfig.isolationLevel, Long.valueOf(j), key, value);
                Errors forCode = Errors.forCode(value.errorCode());
                if (forCode == Errors.NOT_LEADER_OR_FOLLOWER || forCode == Errors.FENCED_LEADER_EPOCH) {
                    this.log.debug("For {}, received error {}, with leaderIdAndEpoch {}", key, forCode, value.currentLeader());
                    if (value.currentLeader().leaderId() != -1 && value.currentLeader().leaderEpoch() != -1) {
                        hashMap.put(key, new Metadata.LeaderIdAndEpoch(Optional.of(Integer.valueOf(value.currentLeader().leaderId())), Optional.of(Integer.valueOf(value.currentLeader().leaderEpoch()))));
                    }
                }
                this.fetchBuffer.add(new CompletedFetch(this.completedFetchLog, this.subscriptions, this.decompressionBufferSupplier, key, value, fetchMetricsAggregator, Long.valueOf(j), apiVersion));
            }
            if (!hashMap.isEmpty()) {
                this.metadata.updatePartitionLeadership(hashMap, (List) fetchResponse.data().nodeEndpoints().stream().map(nodeEndpoint -> {
                    return new Node(nodeEndpoint.nodeId(), nodeEndpoint.host(), nodeEndpoint.port(), nodeEndpoint.rack());
                }).filter(node2 -> {
                    return !node2.equals(Node.noNode());
                }).collect(Collectors.toList())).forEach(topicPartition -> {
                    this.log.debug("For {}, as the leader was updated, position will be validated.", topicPartition);
                    this.subscriptions.maybeValidatePositionForCurrentLeader(this.apiVersions, topicPartition, this.metadata.currentLeader(topicPartition));
                });
            }
            this.metricsManager.recordLatency(clientResponse.requestLatencyMs());
            removePendingFetchRequest(node, fetchRequestData.metadata().sessionId());
        } finally {
            removePendingFetchRequest(node, fetchRequestData.metadata().sessionId());
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void handleFetchFailure(Node node, FetchSessionHandler.FetchRequestData fetchRequestData, Throwable th) {
        try {
            FetchSessionHandler sessionHandler = sessionHandler(node.id());
            if (sessionHandler != null) {
                sessionHandler.handleError(th);
                Set<TopicPartition> sessionTopicPartitions = sessionHandler.sessionTopicPartitions();
                SubscriptionState subscriptionState = this.subscriptions;
                subscriptionState.getClass();
                sessionTopicPartitions.forEach(subscriptionState::clearPreferredReadReplica);
            }
        } finally {
            removePendingFetchRequest(node, fetchRequestData.metadata().sessionId());
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void handleCloseFetchSessionSuccess(Node node, FetchSessionHandler.FetchRequestData fetchRequestData, ClientResponse clientResponse) {
        int sessionId = fetchRequestData.metadata().sessionId();
        removePendingFetchRequest(node, sessionId);
        this.log.debug("Successfully sent a close message for fetch session: {} to node: {}", Integer.valueOf(sessionId), node);
    }

    public void handleCloseFetchSessionFailure(Node node, FetchSessionHandler.FetchRequestData fetchRequestData, Throwable th) {
        int sessionId = fetchRequestData.metadata().sessionId();
        removePendingFetchRequest(node, sessionId);
        this.log.debug("Unable to send a close message for fetch session: {} to node: {}. This may result in unnecessary fetch sessions at the broker.", Integer.valueOf(sessionId), node, th);
    }

    private void removePendingFetchRequest(Node node, int i) {
        this.log.debug("Removing pending request for fetch session: {} for node: {}", Integer.valueOf(i), node);
        this.nodesWithPendingFetchRequests.remove(Integer.valueOf(node.id()));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public FetchRequest.Builder createFetchRequest(Node node, FetchSessionHandler.FetchRequestData fetchRequestData) {
        FetchRequest.Builder rackId = FetchRequest.Builder.forConsumer(fetchRequestData.canUseTopicIds() ? ApiKeys.FETCH.latestVersion() : (short) 12, this.fetchConfig.maxWaitMs, this.fetchConfig.minBytes, fetchRequestData.toSend()).isolationLevel(this.fetchConfig.isolationLevel).setMaxBytes(this.fetchConfig.maxBytes).metadata(fetchRequestData.metadata()).removed(fetchRequestData.toForget()).replaced(fetchRequestData.toReplace()).rackId(this.fetchConfig.clientRackId);
        this.log.debug("Sending {} {} to broker {}", this.fetchConfig.isolationLevel, fetchRequestData, node);
        this.log.debug("Adding pending request for node {}", node);
        this.nodesWithPendingFetchRequests.add(Integer.valueOf(node.id()));
        return rackId;
    }

    private Set<TopicPartition> fetchablePartitions() {
        Set<TopicPartition> bufferedPartitions = this.fetchBuffer.bufferedPartitions();
        return new HashSet(this.subscriptions.fetchablePartitions(topicPartition -> {
            return !bufferedPartitions.contains(topicPartition);
        }));
    }

    Node selectReadReplica(TopicPartition topicPartition, Node node, long j) {
        Optional<Integer> preferredReadReplica = this.subscriptions.preferredReadReplica(topicPartition, j);
        if (!preferredReadReplica.isPresent()) {
            return node;
        }
        Optional<U> flatMap = preferredReadReplica.flatMap(num -> {
            return this.metadata.fetch().nodeIfOnline(topicPartition, num.intValue());
        });
        if (flatMap.isPresent()) {
            return (Node) flatMap.get();
        }
        this.log.trace("Not fetching from {} for partition {} since it is marked offline or is missing from our metadata, using the leader instead.", preferredReadReplica, topicPartition);
        FetchUtils.requestMetadataUpdate(this.metadata, this.subscriptions, topicPartition);
        return node;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Map<Node, FetchSessionHandler.FetchRequestData> prepareCloseFetchSessionRequests() {
        Cluster fetch = this.metadata.fetch();
        HashMap hashMap = new HashMap();
        this.sessionHandlers.forEach((num, fetchSessionHandler) -> {
            fetchSessionHandler.notifyClose();
            Node nodeById = fetch.nodeById(num.intValue());
            if (nodeById == null || isUnavailable(nodeById)) {
                this.log.debug("Skip sending close session request to broker {} since it is not reachable", nodeById);
            } else {
                hashMap.put(nodeById, fetchSessionHandler.newBuilder());
            }
        });
        return (Map) hashMap.entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            return ((FetchSessionHandler.Builder) entry.getValue()).build();
        }));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Map<Node, FetchSessionHandler.FetchRequestData> prepareFetchRequests() {
        this.metricsManager.maybeUpdateAssignment(this.subscriptions);
        HashMap hashMap = new HashMap();
        long milliseconds = this.time.milliseconds();
        Map<String, Uuid> map = this.metadata.topicIds();
        for (TopicPartition topicPartition : fetchablePartitions()) {
            SubscriptionState.FetchPosition position = this.subscriptions.position(topicPartition);
            if (position == null) {
                throw new IllegalStateException("Missing position for fetchable partition " + topicPartition);
            }
            Optional<Node> optional = position.currentLeader.leader;
            if (optional.isPresent()) {
                Node selectReadReplica = selectReadReplica(topicPartition, optional.get(), milliseconds);
                if (isUnavailable(selectReadReplica)) {
                    maybeThrowAuthFailure(selectReadReplica);
                    this.log.trace("Skipping fetch for partition {} because node {} is awaiting reconnect backoff", topicPartition, selectReadReplica);
                } else if (this.nodesWithPendingFetchRequests.contains(Integer.valueOf(selectReadReplica.id()))) {
                    this.log.trace("Skipping fetch for partition {} because previous request to {} has not been processed", topicPartition, selectReadReplica);
                } else {
                    ((FetchSessionHandler.Builder) hashMap.computeIfAbsent(selectReadReplica, node -> {
                        return this.sessionHandlers.computeIfAbsent(Integer.valueOf(selectReadReplica.id()), num -> {
                            return new FetchSessionHandler(this.logContext, num.intValue());
                        }).newBuilder();
                    })).add(topicPartition, new FetchRequest.PartitionData(map.getOrDefault(topicPartition.topic(), Uuid.ZERO_UUID), position.offset, -1L, this.fetchConfig.fetchSize, position.currentLeader.epoch, Optional.empty()));
                    this.log.debug("Added {} fetch request for partition {} at position {} to node {}", this.fetchConfig.isolationLevel, topicPartition, position, selectReadReplica);
                }
            } else {
                this.log.debug("Requesting metadata update for partition {} since the position {} is missing the current leader node", topicPartition, position);
                this.metadata.requestUpdate(false);
            }
        }
        return (Map) hashMap.entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            return ((FetchSessionHandler.Builder) entry.getValue()).build();
        }));
    }

    protected FetchSessionHandler sessionHandler(int i) {
        return this.sessionHandlers.get(Integer.valueOf(i));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void closeInternal(Timer timer) {
        org.apache.kafka.common.utils.Utils.closeQuietly(this.fetchBuffer, "fetchBuffer");
        org.apache.kafka.common.utils.Utils.closeQuietly(this.decompressionBufferSupplier, "decompressionBufferSupplier");
    }

    public void close(Timer timer) {
        this.idempotentCloser.close(() -> {
            closeInternal(timer);
        });
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        close(this.time.timer(Duration.ZERO));
    }
}
