package org.apache.kafka.streams.processor.internals;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.MissingSourceTopicException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskAssignmentException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.TopologyMetadata;
import org.apache.kafka.streams.processor.internals.assignment.CopartitionedTopicsEnforcer;
import org.slf4j.Logger;

/* loaded from: input_file:BOOT-INF/lib/kafka-streams-3.8.1.jar:org/apache/kafka/streams/processor/internals/RepartitionTopics.class */
public class RepartitionTopics {
    private final InternalTopicManager internalTopicManager;
    private final TopologyMetadata topologyMetadata;
    private final Cluster clusterMetadata;
    private final CopartitionedTopicsEnforcer copartitionedTopicsEnforcer;
    private final Logger log;
    private final Map<TopicPartition, PartitionInfo> topicPartitionInfos = new HashMap();
    private final Map<TopologyMetadata.Subtopology, Set<String>> missingInputTopicsBySubtopology = new HashMap();

    public RepartitionTopics(TopologyMetadata topologyMetadata, InternalTopicManager internalTopicManager, CopartitionedTopicsEnforcer copartitionedTopicsEnforcer, Cluster cluster, String str) {
        this.topologyMetadata = topologyMetadata;
        this.internalTopicManager = internalTopicManager;
        this.clusterMetadata = cluster;
        this.copartitionedTopicsEnforcer = copartitionedTopicsEnforcer;
        this.log = new LogContext(str).logger(getClass());
    }

    public void setup() {
        Map<String, InternalTopicConfig> computeRepartitionTopicConfig = computeRepartitionTopicConfig(this.clusterMetadata);
        if (computeRepartitionTopicConfig.isEmpty()) {
            if (this.missingInputTopicsBySubtopology.isEmpty()) {
                this.log.info("Skipping the repartition topic validation since there are no repartition topics.");
                return;
            } else {
                this.log.info("Skipping the repartition topic validation since all topologies containing repartitiontopics are missing external user source topics and cannot be processed.");
                return;
            }
        }
        ensureCopartitioning(this.topologyMetadata.copartitionGroups(), computeRepartitionTopicConfig, this.clusterMetadata);
        this.internalTopicManager.makeReady(computeRepartitionTopicConfig);
        for (Map.Entry<String, InternalTopicConfig> entry : computeRepartitionTopicConfig.entrySet()) {
            String key = entry.getKey();
            int intValue = entry.getValue().numberOfPartitions().orElse(-1).intValue();
            for (int i = 0; i < intValue; i++) {
                this.topicPartitionInfos.put(new TopicPartition(key, i), new PartitionInfo(key, i, null, new Node[0], new Node[0]));
            }
        }
    }

    public Set<String> topologiesWithMissingInputTopics() {
        return (Set) this.missingInputTopicsBySubtopology.keySet().stream().map(subtopology -> {
            return TopologyMetadata.getTopologyNameOrElseUnnamed(subtopology.namedTopology);
        }).collect(Collectors.toSet());
    }

    public Set<String> missingSourceTopics() {
        return (Set) this.missingInputTopicsBySubtopology.entrySet().stream().map(entry -> {
            return (Set) entry.getValue();
        }).flatMap(set -> {
            return set.stream();
        }).collect(Collectors.toSet());
    }

    public Queue<StreamsException> missingSourceTopicExceptions() {
        return (Queue) this.missingInputTopicsBySubtopology.entrySet().stream().map(entry -> {
            Set set = (Set) entry.getValue();
            int i = ((TopologyMetadata.Subtopology) entry.getKey()).nodeGroupId;
            String str = ((TopologyMetadata.Subtopology) entry.getKey()).namedTopology;
            return new StreamsException(new MissingSourceTopicException(String.format("Missing source topics %s for subtopology %d of topology %s", set, Integer.valueOf(i), str)), new TaskId(i, 0, str));
        }).collect(Collectors.toCollection(LinkedList::new));
    }

    public Map<TopicPartition, PartitionInfo> topicPartitionsInfo() {
        return Collections.unmodifiableMap(this.topicPartitionInfos);
    }

    private Map<String, InternalTopicConfig> computeRepartitionTopicConfig(Cluster cluster) {
        HashSet hashSet = new HashSet();
        Map<String, InternalTopicConfig> hashMap = new HashMap<>();
        for (Map.Entry<String, Map<TopologyMetadata.Subtopology, InternalTopologyBuilder.TopicsInfo>> entry : this.topologyMetadata.topologyToSubtopologyTopicsInfoMap().entrySet()) {
            String key = this.topologyMetadata.hasNamedTopologies() ? entry.getKey() : null;
            HashSet hashSet2 = new HashSet();
            HashSet hashSet3 = new HashSet();
            Map<? extends String, ? extends InternalTopicConfig> hashMap2 = new HashMap<>();
            for (Map.Entry<TopologyMetadata.Subtopology, InternalTopologyBuilder.TopicsInfo> entry2 : entry.getValue().entrySet()) {
                InternalTopologyBuilder.TopicsInfo value = entry2.getValue();
                hashSet2.add(value);
                hashMap2.putAll((Map) value.repartitionSourceTopics.values().stream().collect(Collectors.toMap((v0) -> {
                    return v0.name();
                }, internalTopicConfig -> {
                    return internalTopicConfig;
                })));
                Set<String> computeMissingExternalSourceTopics = computeMissingExternalSourceTopics(value, cluster);
                hashSet3.addAll(computeMissingExternalSourceTopics);
                if (!computeMissingExternalSourceTopics.isEmpty()) {
                    TopologyMetadata.Subtopology key2 = entry2.getKey();
                    this.missingInputTopicsBySubtopology.put(key2, computeMissingExternalSourceTopics);
                    this.log.error("Subtopology {} has missing source topics {} and will be excluded from the current assignment, this can be due to the consumer client's metadata being stale or because they have not been created yet. Please verify that you have created all input topics; if they do exist, you just need to wait for the metadata to be updated, at which time a new rebalance will be kicked off automatically and the topology will be retried at that time.", Integer.valueOf(key2.nodeGroupId), computeMissingExternalSourceTopics);
                }
            }
            if (hashSet3.isEmpty()) {
                hashMap.putAll(hashMap2);
                hashSet.addAll(hashSet2);
            } else {
                this.log.debug("Skipping repartition topic validation for entire topology {} due to missing source topics {}", key, hashSet3);
            }
        }
        setRepartitionSourceTopicPartitionCount(hashMap, hashSet, cluster);
        return hashMap;
    }

    private void ensureCopartitioning(Collection<Set<String>> collection, Map<String, InternalTopicConfig> map, Cluster cluster) {
        Iterator<Set<String>> it = collection.iterator();
        while (it.hasNext()) {
            this.copartitionedTopicsEnforcer.enforce(it.next(), map, cluster);
        }
    }

    private Set<String> computeMissingExternalSourceTopics(InternalTopologyBuilder.TopicsInfo topicsInfo, Cluster cluster) {
        HashSet hashSet = new HashSet(topicsInfo.sourceTopics);
        hashSet.removeAll(topicsInfo.repartitionSourceTopics.keySet());
        hashSet.removeAll(cluster.topics());
        return hashSet;
    }

    private void setRepartitionSourceTopicPartitionCount(Map<String, InternalTopicConfig> map, Collection<InternalTopologyBuilder.TopicsInfo> collection, Cluster cluster) {
        boolean z;
        do {
            z = false;
            boolean z2 = false;
            Iterator<InternalTopologyBuilder.TopicsInfo> it = collection.iterator();
            while (it.hasNext()) {
                for (String str : it.next().repartitionSourceTopics.keySet()) {
                    if (!map.get(str).numberOfPartitions().isPresent()) {
                        Integer computePartitionCount = computePartitionCount(map, collection, cluster, str);
                        if (computePartitionCount == null) {
                            z = true;
                            this.log.trace("Unable to determine number of partitions for {}, another iteration is needed", str);
                        } else {
                            this.log.trace("Determined number of partitions for {} to be {}", str, computePartitionCount);
                            map.get(str).setNumberOfPartitions(computePartitionCount.intValue());
                            z2 = true;
                        }
                    }
                }
            }
            if (!z2 && z) {
                this.log.error("Unable to determine the number of partitions of all repartition topics, most likely a source topic is missing or pattern doesn't match any topics\ntopic groups: {}\ncluster topics: {}.", collection, cluster.topics());
                throw new TaskAssignmentException("Failed to compute number of partitions for all repartition topics, make sure all user input topics are created and all Pattern subscriptions match at least one topic in the cluster");
            }
        } while (z);
    }

    private Integer computePartitionCount(Map<String, InternalTopicConfig> map, Collection<InternalTopologyBuilder.TopicsInfo> collection, Cluster cluster, String str) {
        Integer num = null;
        for (InternalTopologyBuilder.TopicsInfo topicsInfo : collection) {
            if (topicsInfo.sinkTopics.contains(str)) {
                for (String str2 : topicsInfo.sourceTopics) {
                    Integer num2 = null;
                    if (!map.containsKey(str2)) {
                        Integer partitionCountForTopic = cluster.partitionCountForTopic(str2);
                        if (partitionCountForTopic == null) {
                            throw new TaskAssignmentException("No partition count found for source topic " + str2 + ", but it should have been.");
                        }
                        num2 = partitionCountForTopic;
                    } else if (map.get(str2).numberOfPartitions().isPresent()) {
                        num2 = map.get(str2).numberOfPartitions().get();
                    }
                    if (num2 != null && (num == null || num2.intValue() > num.intValue())) {
                        num = num2;
                    }
                }
            }
        }
        return num;
    }
}
