package org.apache.kafka.streams.processor.internals.assignment;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.assignment.AssignmentConfigs;
import org.apache.kafka.streams.processor.assignment.ProcessId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/assignment/LegacyStickyTaskAssignor.class */
public class LegacyStickyTaskAssignor implements LegacyTaskAssignor {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) LegacyStickyTaskAssignor.class);
    private static final int DEFAULT_STATEFUL_TRAFFIC_COST = 1;
    private static final int DEFAULT_STATEFUL_NON_OVERLAP_COST = 10;
    private Map<ProcessId, ClientState> clients;
    private Set<TaskId> allTaskIds;
    private Set<TaskId> statefulTaskIds;
    private final Map<TaskId, ProcessId> previousActiveTaskAssignment;
    private final Map<TaskId, Set<ProcessId>> previousStandbyTaskAssignment;
    private RackAwareTaskAssignor rackAwareTaskAssignor;
    private AssignmentConfigs configs;
    private TaskPairs taskPairs;
    private final boolean mustPreserveActiveTaskAssignment;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/assignment/LegacyStickyTaskAssignor$TaskPairs.class */
    public static class TaskPairs {
        private final Set<Pair> pairs;
        private final int maxPairs;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/kafka/streams/processor/internals/assignment/LegacyStickyTaskAssignor$TaskPairs$Pair.class */
        public static class Pair {
            private final TaskId task1;
            private final TaskId task2;

            Pair(TaskId taskId, TaskId taskId2) {
                this.task1 = taskId;
                this.task2 = taskId2;
            }

            public boolean equals(Object obj) {
                if (this == obj) {
                    return true;
                }
                if (obj == null || getClass() != obj.getClass()) {
                    return false;
                }
                Pair pair = (Pair) obj;
                return Objects.equals(this.task1, pair.task1) && Objects.equals(this.task2, pair.task2);
            }

            public int hashCode() {
                return Objects.hash(this.task1, this.task2);
            }
        }

        TaskPairs(int i) {
            this.maxPairs = i;
            this.pairs = new HashSet(i);
        }

        boolean hasNewPair(TaskId taskId, Set<TaskId> set) {
            if (this.pairs.size() == this.maxPairs) {
                return false;
            }
            Iterator<TaskId> it = set.iterator();
            while (it.hasNext()) {
                if (!this.pairs.contains(pair(taskId, it.next()))) {
                    return true;
                }
            }
            return false;
        }

        void addPairs(TaskId taskId, Set<TaskId> set) {
            Iterator<TaskId> it = set.iterator();
            while (it.hasNext()) {
                this.pairs.add(pair(it.next(), taskId));
            }
        }

        Pair pair(TaskId taskId, TaskId taskId2) {
            return taskId.compareTo(taskId2) < 0 ? new Pair(taskId, taskId2) : new Pair(taskId2, taskId);
        }
    }

    public LegacyStickyTaskAssignor() {
        this(false);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public LegacyStickyTaskAssignor(boolean z) {
        this.previousActiveTaskAssignment = new HashMap();
        this.previousStandbyTaskAssignment = new HashMap();
        this.mustPreserveActiveTaskAssignment = z;
    }

    @Override // org.apache.kafka.streams.processor.internals.assignment.LegacyTaskAssignor
    public boolean assign(Map<ProcessId, ClientState> map, Set<TaskId> set, Set<TaskId> set2, RackAwareTaskAssignor rackAwareTaskAssignor, AssignmentConfigs assignmentConfigs) {
        this.clients = map;
        this.allTaskIds = set;
        this.statefulTaskIds = set2;
        this.rackAwareTaskAssignor = rackAwareTaskAssignor;
        this.configs = assignmentConfigs;
        this.taskPairs = new TaskPairs((set.size() * (set.size() - 1)) / 2);
        mapPreviousTaskAssignment(map);
        assignActive();
        optimizeActive();
        assignStandby(assignmentConfigs.numStandbyReplicas());
        optimizeStandby();
        return false;
    }

    private void optimizeStandby() {
        if (this.configs.numStandbyReplicas() <= 0 || this.rackAwareTaskAssignor == null || !this.rackAwareTaskAssignor.canEnableRackAwareAssignor()) {
            return;
        }
        int orElse = this.configs.rackAwareTrafficCost().orElse(1);
        int orElse2 = this.configs.rackAwareNonOverlapCost().orElse(10);
        this.rackAwareTaskAssignor.optimizeStandbyTasks(new TreeMap(this.clients), orElse, orElse2, (clientState, clientState2, taskId, map) -> {
            return true;
        });
    }

    private void optimizeActive() {
        if (this.rackAwareTaskAssignor == null || !this.rackAwareTaskAssignor.canEnableRackAwareAssignor()) {
            return;
        }
        int orElse = this.configs.rackAwareTrafficCost().orElse(1);
        int orElse2 = this.configs.rackAwareNonOverlapCost().orElse(10);
        TreeSet treeSet = new TreeSet(this.statefulTaskIds);
        TreeMap treeMap = new TreeMap(this.clients);
        this.rackAwareTaskAssignor.optimizeActiveTasks(treeSet, treeMap, orElse, orElse2);
        this.rackAwareTaskAssignor.optimizeActiveTasks((TreeSet) Utils.diff(TreeSet::new, this.allTaskIds, treeSet), treeMap, 1, 0);
    }

    private void assignStandby(int i) {
        for (TaskId taskId : this.statefulTaskIds) {
            int i2 = 0;
            while (true) {
                if (i2 < i) {
                    Set<ProcessId> findClientsWithoutAssignedTask = findClientsWithoutAssignedTask(taskId);
                    if (findClientsWithoutAssignedTask.isEmpty()) {
                        log.warn("Unable to assign {} of {} standby tasks for task [{}]. There is not enough available capacity. You should increase the number of threads and/or application instances to maintain the requested number of standby replicas.", Integer.valueOf(i - i2), Integer.valueOf(i), taskId);
                        break;
                    } else {
                        allocateTaskWithClientCandidates(taskId, findClientsWithoutAssignedTask, false);
                        i2++;
                    }
                }
            }
        }
    }

    private void assignActive() {
        int sumCapacity = sumCapacity(this.clients.values());
        if (sumCapacity == 0) {
            throw new IllegalStateException("`totalCapacity` should never be zero.");
        }
        int size = this.allTaskIds.size() / sumCapacity;
        HashSet hashSet = new HashSet();
        for (Map.Entry<TaskId, ProcessId> entry : this.previousActiveTaskAssignment.entrySet()) {
            TaskId key = entry.getKey();
            if (this.allTaskIds.contains(key)) {
                ClientState clientState = this.clients.get(entry.getValue());
                if (this.mustPreserveActiveTaskAssignment || clientState.hasUnfulfilledQuota(size)) {
                    assignTaskToClient(hashSet, key, clientState);
                }
            }
        }
        HashSet hashSet2 = new HashSet(this.allTaskIds);
        hashSet2.removeAll(hashSet);
        Iterator it = hashSet2.iterator();
        while (it.hasNext()) {
            TaskId taskId = (TaskId) it.next();
            Set<ProcessId> set = this.previousStandbyTaskAssignment.get(taskId);
            if (set != null) {
                Iterator<ProcessId> it2 = set.iterator();
                while (true) {
                    if (it2.hasNext()) {
                        ClientState clientState2 = this.clients.get(it2.next());
                        if (clientState2.hasUnfulfilledQuota(size)) {
                            assignTaskToClient(hashSet, taskId, clientState2);
                            it.remove();
                            break;
                        }
                    }
                }
            }
        }
        ArrayList arrayList = new ArrayList(hashSet2);
        Collections.sort(arrayList);
        Iterator it3 = arrayList.iterator();
        while (it3.hasNext()) {
            allocateTaskWithClientCandidates((TaskId) it3.next(), this.clients.keySet(), true);
        }
    }

    private void allocateTaskWithClientCandidates(TaskId taskId, Set<ProcessId> set, boolean z) {
        ClientState findClient = findClient(taskId, set);
        this.taskPairs.addPairs(taskId, findClient.assignedTasks());
        if (z) {
            findClient.assignActive(taskId);
        } else {
            findClient.assignStandby(taskId);
        }
    }

    private void assignTaskToClient(Set<TaskId> set, TaskId taskId, ClientState clientState) {
        this.taskPairs.addPairs(taskId, clientState.assignedTasks());
        clientState.assignActive(taskId);
        set.add(taskId);
    }

    private Set<ProcessId> findClientsWithoutAssignedTask(TaskId taskId) {
        HashSet hashSet = new HashSet();
        for (Map.Entry<ProcessId, ClientState> entry : this.clients.entrySet()) {
            if (!entry.getValue().hasAssignedTask(taskId)) {
                hashSet.add(entry.getKey());
            }
        }
        return hashSet;
    }

    private ClientState findClient(TaskId taskId, Set<ProcessId> set) {
        if (set.size() == 1) {
            return this.clients.get(set.iterator().next());
        }
        ClientState findClientsWithPreviousAssignedTask = findClientsWithPreviousAssignedTask(taskId, set);
        if (findClientsWithPreviousAssignedTask == null) {
            return leastLoaded(taskId, set);
        }
        if (!shouldBalanceLoad(findClientsWithPreviousAssignedTask)) {
            return findClientsWithPreviousAssignedTask;
        }
        ClientState findLeastLoadedClientWithPreviousStandByTask = findLeastLoadedClientWithPreviousStandByTask(taskId, set);
        return (findLeastLoadedClientWithPreviousStandByTask == null || shouldBalanceLoad(findLeastLoadedClientWithPreviousStandByTask)) ? leastLoaded(taskId, set) : findLeastLoadedClientWithPreviousStandByTask;
    }

    private boolean shouldBalanceLoad(ClientState clientState) {
        return clientState.reachedCapacity() && hasClientsWithMoreAvailableCapacity(clientState);
    }

    private boolean hasClientsWithMoreAvailableCapacity(ClientState clientState) {
        Iterator<ClientState> it = this.clients.values().iterator();
        while (it.hasNext()) {
            if (it.next().hasMoreAvailableCapacityThan(clientState)) {
                return true;
            }
        }
        return false;
    }

    private ClientState findClientsWithPreviousAssignedTask(TaskId taskId, Set<ProcessId> set) {
        ProcessId processId = this.previousActiveTaskAssignment.get(taskId);
        return (processId == null || !set.contains(processId)) ? findLeastLoadedClientWithPreviousStandByTask(taskId, set) : this.clients.get(processId);
    }

    private ClientState findLeastLoadedClientWithPreviousStandByTask(TaskId taskId, Set<ProcessId> set) {
        Set<ProcessId> set2 = this.previousStandbyTaskAssignment.get(taskId);
        if (set2 == null) {
            return null;
        }
        HashSet hashSet = new HashSet(set2);
        hashSet.retainAll(set);
        return leastLoaded(taskId, hashSet);
    }

    private ClientState leastLoaded(TaskId taskId, Set<ProcessId> set) {
        ClientState findLeastLoaded = findLeastLoaded(taskId, set, true);
        return findLeastLoaded == null ? findLeastLoaded(taskId, set, false) : findLeastLoaded;
    }

    private ClientState findLeastLoaded(TaskId taskId, Set<ProcessId> set, boolean z) {
        ClientState clientState = null;
        Iterator<ProcessId> it = set.iterator();
        while (it.hasNext()) {
            ClientState clientState2 = this.clients.get(it.next());
            if (clientState2.assignedTaskCount() == 0) {
                return clientState2;
            }
            if (clientState == null || clientState2.hasMoreAvailableCapacityThan(clientState)) {
                if (!z) {
                    clientState = clientState2;
                } else if (this.taskPairs.hasNewPair(taskId, clientState2.assignedTasks())) {
                    clientState = clientState2;
                }
            }
        }
        return clientState;
    }

    private void mapPreviousTaskAssignment(Map<ProcessId, ClientState> map) {
        for (Map.Entry<ProcessId, ClientState> entry : map.entrySet()) {
            Iterator<TaskId> it = entry.getValue().prevActiveTasks().iterator();
            while (it.hasNext()) {
                this.previousActiveTaskAssignment.put(it.next(), entry.getKey());
            }
            for (TaskId taskId : entry.getValue().prevStandbyTasks()) {
                this.previousStandbyTaskAssignment.computeIfAbsent(taskId, taskId2 -> {
                    return new HashSet();
                });
                this.previousStandbyTaskAssignment.get(taskId).add(entry.getKey());
            }
        }
    }

    private int sumCapacity(Collection<ClientState> collection) {
        int i = 0;
        Iterator<ClientState> it = collection.iterator();
        while (it.hasNext()) {
            i += it.next().capacity();
        }
        return i;
    }
}
