package org.apache.kafka.connect.runtime.distributed.workermanager;

import com.networknt.config.schema.ConfigSchema;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.kafka.common.Confluent;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.CumulativeSum;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.ThreadUtils;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.runtime.ConnectMetrics;
import org.apache.kafka.connect.runtime.ConnectMetricsRegistry;
import org.apache.kafka.connect.runtime.distributed.ConnectClusterMetrics;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.runtime.distributed.WorkerCoordinator;
import org.apache.kafka.connect.storage.MetricsStore;
import org.apache.kafka.connect.storage.MetricsStoreFactory;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.slf4j.Logger;

@Confluent
/* loaded from: input_file:org/apache/kafka/connect/runtime/distributed/workermanager/WorkerResourceManager.class */
public class WorkerResourceManager {
    private final ResourceManagerMetrics resourceManagerMetrics;
    private MetricsStore metricsStore;
    private Logger log;
    private Time time;
    private final Double cpuThresholdPercentage;
    private final Double memoryThresholdPercentage;
    private final long totalAvailableHeapMemory;
    private final long rebalanceInterval = 1800000;
    private final int consecutiveImbalanceThreshold = 6;
    private Map<String, Map<ConnectorTaskId, Double>> taskLoad;
    private Map<String, Double> workersCpuLoad;
    private Map<String, Double> workersMemoryLoad;
    private WorkerCoordinator coordinator;
    private ScheduledExecutorService scheduler;
    private Map<String, List<ConnectorTaskId>> latestTaskAssignment;
    long lastRebalanceAt;
    Map<String, Integer> numConsecutiveCpuLoadImbalances;
    Map<String, Integer> numConsecutiveMemoryLoadImbalances;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kafka/connect/runtime/distributed/workermanager/WorkerResourceManager$ResourceManagerMetrics.class */
    public class ResourceManagerMetrics {
        private final ConnectMetrics.MetricGroup metricGroup;
        private final Sensor proactiveRebalanceCompletedCounts;

        public ResourceManagerMetrics(ConnectMetrics connectMetrics) {
            ConnectMetricsRegistry registry = connectMetrics.registry();
            this.metricGroup = connectMetrics.group(registry.workerResourceManagerGroupName(), new String[0]);
            this.proactiveRebalanceCompletedCounts = this.metricGroup.sensor("total-proactive-rebalances");
            this.proactiveRebalanceCompletedCounts.add(this.metricGroup.metricName(registry.proactiveRebalanceCount), new CumulativeSum());
        }

        void close() {
            this.metricGroup.close();
        }

        void reblanaceTriggered() {
            this.proactiveRebalanceCompletedCounts.record(1.0d);
        }

        protected ConnectMetrics.MetricGroup metricGroup() {
            return this.metricGroup;
        }
    }

    public WorkerResourceManager(LogContext logContext, WorkerCoordinator workerCoordinator, DistributedConfig distributedConfig, Time time, ConnectMetrics connectMetrics) {
        this.cpuThresholdPercentage = Double.valueOf(80.0d);
        this.memoryThresholdPercentage = Double.valueOf(80.0d);
        this.rebalanceInterval = 1800000L;
        this.consecutiveImbalanceThreshold = 6;
        this.lastRebalanceAt = 0L;
        this.numConsecutiveCpuLoadImbalances = new HashMap();
        this.numConsecutiveMemoryLoadImbalances = new HashMap();
        this.log = logContext.logger(WorkerResourceManager.class);
        this.metricsStore = MetricsStoreFactory.createMetricsStore(distributedConfig);
        this.scheduler = Executors.newSingleThreadScheduledExecutor();
        this.workersCpuLoad = new HashMap();
        this.workersMemoryLoad = new HashMap();
        this.taskLoad = new HashMap();
        this.coordinator = workerCoordinator;
        this.totalAvailableHeapMemory = Runtime.getRuntime().maxMemory();
        this.time = time;
        this.resourceManagerMetrics = new ResourceManagerMetrics(connectMetrics);
    }

    WorkerResourceManager(LogContext logContext, WorkerCoordinator workerCoordinator, DistributedConfig distributedConfig, ScheduledExecutorService scheduledExecutorService, long j, Time time, ConnectMetrics connectMetrics) {
        this.cpuThresholdPercentage = Double.valueOf(80.0d);
        this.memoryThresholdPercentage = Double.valueOf(80.0d);
        this.rebalanceInterval = 1800000L;
        this.consecutiveImbalanceThreshold = 6;
        this.lastRebalanceAt = 0L;
        this.numConsecutiveCpuLoadImbalances = new HashMap();
        this.numConsecutiveMemoryLoadImbalances = new HashMap();
        this.log = logContext.logger(WorkerResourceManager.class);
        this.metricsStore = MetricsStoreFactory.createMetricsStore(distributedConfig);
        this.scheduler = scheduledExecutorService;
        this.workersCpuLoad = new HashMap();
        this.workersMemoryLoad = new HashMap();
        this.taskLoad = new HashMap();
        this.coordinator = workerCoordinator;
        this.totalAvailableHeapMemory = j;
        this.time = time;
        this.resourceManagerMetrics = new ResourceManagerMetrics(connectMetrics);
    }

    public void start() throws Exception {
        try {
            this.metricsStore.start();
            getScheduler().scheduleWithFixedDelay(this::startImbalanceDetection, 0L, 5L, TimeUnit.MINUTES);
        } catch (Exception e) {
            this.log.error("Failed to start the metrics store, resource manager won't start", (Throwable) e);
            throw new RuntimeException(e);
        }
    }

    public void stop() throws Exception {
        ThreadUtils.shutdownExecutorServiceQuietly(getScheduler(), 60L, TimeUnit.SECONDS);
        this.metricsStore.stop();
        this.resourceManagerMetrics.close();
    }

    public synchronized void setLatestTaskAssignment(Map<String, List<ConnectorTaskId>> map) {
        this.latestTaskAssignment = new ConcurrentHashMap(map);
        this.log.debug("Latest Worker Resource Manager Task Assignment: {}", map);
    }

    protected synchronized Map<String, List<ConnectorTaskId>> getLatestTaskAssignment() {
        return this.latestTaskAssignment;
    }

    public synchronized void startImbalanceDetection() {
        if (this.lastRebalanceAt > 0 && this.time.milliseconds() - this.lastRebalanceAt < 1800000) {
            this.log.info("Skipping Load imbalance Detection as the last rebalance triggered by Worker Resource Manager was within the last {} minutes.", (Object) 1800000L);
            return;
        }
        this.log.debug("Starting Imbalance Detection");
        this.workersCpuLoad = this.metricsStore.getWorkersCPULoad();
        this.workersMemoryLoad = this.metricsStore.getWorkersMemoryLoad();
        for (String str : this.workersCpuLoad.keySet()) {
            if (this.latestTaskAssignment == null || this.latestTaskAssignment.containsKey(str)) {
                double doubleValue = this.workersCpuLoad.getOrDefault(str, Double.valueOf(ConfigSchema.DEFAULT_NUMBER)).doubleValue();
                double doubleValue2 = this.workersMemoryLoad.getOrDefault(str, Double.valueOf(ConfigSchema.DEFAULT_NUMBER)).doubleValue();
                boolean z = doubleValue > this.cpuThresholdPercentage.doubleValue();
                boolean z2 = doubleValue2 > this.memoryThresholdPercentage.doubleValue();
                if (z) {
                    this.numConsecutiveCpuLoadImbalances.put(str, Integer.valueOf(this.numConsecutiveCpuLoadImbalances.getOrDefault(str, 0).intValue() + 1));
                    this.log.debug("high cpu  {} detected on worker {} for consecutive {} times", Double.valueOf(doubleValue), str, this.numConsecutiveCpuLoadImbalances.get(str));
                } else {
                    this.numConsecutiveCpuLoadImbalances.put(str, 0);
                }
                if (z2) {
                    this.numConsecutiveMemoryLoadImbalances.put(str, Integer.valueOf(this.numConsecutiveMemoryLoadImbalances.getOrDefault(str, 0).intValue() + 1));
                    this.log.debug("high memory {} detected on worker {} for consecutive {} times", Double.valueOf(doubleValue2), str, this.numConsecutiveMemoryLoadImbalances.get(str));
                } else {
                    this.numConsecutiveMemoryLoadImbalances.put(str, 0);
                }
                if (this.numConsecutiveCpuLoadImbalances.get(str).intValue() > 6 || this.numConsecutiveMemoryLoadImbalances.get(str).intValue() > 6) {
                    if (taskCountOnWorker(str) > 1) {
                        this.log.info("Worker {} has exceeded thresholds - CPU: {}%, Memory: {}%, triggering re-balance", str, Double.valueOf(doubleValue), Double.valueOf(doubleValue2));
                        triggerRebalance();
                        return;
                    }
                }
            } else {
                this.numConsecutiveCpuLoadImbalances.remove(str);
                this.numConsecutiveMemoryLoadImbalances.remove(str);
            }
        }
    }

    protected int taskCountOnWorker(String str) {
        Map<ConnectorTaskId, Double> map;
        this.taskLoad = filterRedundantTasks(this.metricsStore.getTasksLoad());
        if (this.taskLoad == null || this.taskLoad.isEmpty() || (map = this.taskLoad.get(str)) == null) {
            return 0;
        }
        return map.size();
    }

    public void stopImbalanceDetection() throws Exception {
        stop();
    }

    public void triggerRebalance() {
        this.numConsecutiveCpuLoadImbalances.clear();
        this.numConsecutiveMemoryLoadImbalances.clear();
        this.lastRebalanceAt = this.time.milliseconds();
        this.coordinator.requestRejoin("Trigger re-balance due to worker imbalance");
        this.resourceManagerMetrics.reblanaceTriggered();
    }

    public synchronized ConnectClusterMetrics snapshot() {
        List<ConnectClusterMetrics.WorkerResourceLoad> calculateWorkersLoad = calculateWorkersLoad();
        this.taskLoad = filterRedundantTasks(this.metricsStore.getTasksLoad());
        return (this.taskLoad == null || this.taskLoad.isEmpty() || calculateWorkersLoad == null || calculateWorkersLoad.isEmpty()) ? new ConnectClusterMetrics(null, null) : new ConnectClusterMetrics(calculateWorkersLoad, (List) this.taskLoad.entrySet().stream().flatMap(entry -> {
            return ((Map) entry.getValue()).entrySet().stream().map(entry -> {
                return new ConnectClusterMetrics.TaskLoad((ConnectorTaskId) entry.getKey(), ((Double) entry.getValue()).doubleValue());
            });
        }).collect(Collectors.toList()));
    }

    protected List<ConnectClusterMetrics.WorkerResourceLoad> calculateWorkersLoad() {
        if (this.workersMemoryLoad.isEmpty() || this.workersCpuLoad.isEmpty()) {
            this.workersMemoryLoad = new ConcurrentHashMap(this.metricsStore.getWorkersMemoryLoad());
            this.workersCpuLoad = new ConcurrentHashMap(this.metricsStore.getWorkersCPULoad());
        }
        if (this.workersMemoryLoad.isEmpty() || this.workersCpuLoad.isEmpty()) {
            return null;
        }
        Map map = (Map) this.workersMemoryLoad.entrySet().stream().collect(HashMap::new, (hashMap, entry) -> {
        }, (v0, v1) -> {
            v0.putAll(v1);
        });
        Map map2 = (Map) this.workersCpuLoad.entrySet().stream().collect(HashMap::new, (hashMap2, entry2) -> {
        }, (v0, v1) -> {
            v0.putAll(v1);
        });
        double sum = map.values().stream().mapToDouble((v0) -> {
            return v0.doubleValue();
        }).sum();
        Map map3 = (Map) map.entrySet().stream().collect(HashMap::new, (hashMap3, entry3) -> {
        }, (v0, v1) -> {
            v0.putAll(v1);
        });
        double sum2 = map2.values().stream().mapToDouble((v0) -> {
            return v0.doubleValue();
        }).sum();
        Map map4 = (Map) map2.entrySet().stream().collect(HashMap::new, (hashMap4, entry4) -> {
        }, (v0, v1) -> {
            v0.putAll(v1);
        });
        HashMap hashMap5 = new HashMap();
        map3.forEach((str, d) -> {
            hashMap5.put(str, Double.valueOf(Math.min(d.doubleValue(), ((Double) map4.getOrDefault(str, Double.valueOf(ConfigSchema.DEFAULT_NUMBER))).doubleValue())));
        });
        return (List) hashMap5.entrySet().stream().map(entry5 -> {
            return new ConnectClusterMetrics.WorkerResourceLoad((String) entry5.getKey(), 100.0d - ((Double) entry5.getValue()).doubleValue());
        }).collect(Collectors.toList());
    }

    protected synchronized Map<String, Map<ConnectorTaskId, Double>> filterRedundantTasks(Map<String, Map<ConnectorTaskId, Double>> map) {
        if (this.latestTaskAssignment == null) {
            return map;
        }
        HashMap hashMap = new HashMap();
        map.forEach((str, map2) -> {
            if (this.latestTaskAssignment.containsKey(str)) {
                hashMap.put(str, (Map) map2.entrySet().stream().filter(entry -> {
                    return this.latestTaskAssignment.get(str).contains(entry.getKey());
                }).collect(Collectors.toMap((v0) -> {
                    return v0.getKey();
                }, (v0) -> {
                    return v0.getValue();
                })));
            }
        });
        this.log.debug("Metrics store Task Load:  {} Filtered Task Load: {}", map, hashMap);
        return hashMap;
    }

    protected ScheduledExecutorService getScheduler() {
        return this.scheduler;
    }

    protected long getTotalAvailableHeapMemory() {
        return this.totalAvailableHeapMemory;
    }

    protected ResourceManagerMetrics resourceManagerMetrics() {
        return this.resourceManagerMetrics;
    }
}
