package org.apache.pulsar.broker.loadbalance.impl;

import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import java.util.Map;
import org.apache.commons.lang3.mutable.MutableDouble;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.BrokerData;
import org.apache.pulsar.broker.BundleData;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.TimeAverageMessageData;
import org.apache.pulsar.broker.loadbalance.LoadData;
import org.apache.pulsar.broker.loadbalance.LoadSheddingStrategy;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/loadbalance/impl/UniformLoadShedder.class */
public class UniformLoadShedder implements LoadSheddingStrategy {
    private static final Logger log = LoggerFactory.getLogger(UniformLoadShedder.class);
    private final Multimap<String, String> selectedBundlesCache = ArrayListMultimap.create();
    private static final double EPS = 1.0E-6d;

    @Override // org.apache.pulsar.broker.loadbalance.LoadSheddingStrategy
    public Multimap<String, String> findBundlesForUnloading(LoadData loadData, ServiceConfiguration serviceConfiguration) {
        this.selectedBundlesCache.clear();
        Map<String, BrokerData> brokerData = loadData.getBrokerData();
        Map<String, BundleData> bundleDataForLoadShedding = loadData.getBundleDataForLoadShedding();
        Map<String, Long> recentlyUnloadedBundles = loadData.getRecentlyUnloadedBundles();
        MutableObject mutableObject = new MutableObject();
        MutableObject mutableObject2 = new MutableObject();
        MutableObject mutableObject3 = new MutableObject();
        MutableObject mutableObject4 = new MutableObject();
        MutableDouble mutableDouble = new MutableDouble(-1.0d);
        MutableDouble mutableDouble2 = new MutableDouble(-1.0d);
        MutableDouble mutableDouble3 = new MutableDouble(2.147483647E9d);
        MutableDouble mutableDouble4 = new MutableDouble(2.147483647E9d);
        brokerData.forEach((str, brokerData2) -> {
            double msgRateIn = brokerData2.getLocalData().getMsgRateIn() + brokerData2.getLocalData().getMsgRateOut();
            double msgThroughputIn = brokerData2.getLocalData().getMsgThroughputIn() + brokerData2.getLocalData().getMsgThroughputOut();
            if (msgRateIn > mutableDouble.getValue().doubleValue()) {
                mutableObject.setValue(str);
                mutableDouble.setValue(msgRateIn);
            }
            if (msgThroughputIn > mutableDouble2.getValue().doubleValue()) {
                mutableObject2.setValue(str);
                mutableDouble2.setValue(msgThroughputIn);
            }
            if (msgRateIn < mutableDouble3.getValue().doubleValue()) {
                mutableObject3.setValue(str);
                mutableDouble3.setValue(msgRateIn);
            }
            if (msgThroughputIn < mutableDouble4.getValue().doubleValue()) {
                mutableObject4.setValue(str);
                mutableDouble4.setValue(msgThroughputIn);
            }
        });
        if (mutableDouble3.getValue().doubleValue() <= EPS && mutableDouble3.getValue().doubleValue() >= -1.0E-6d) {
            mutableDouble3.setValue(1.0d);
        }
        if (mutableDouble4.getValue().doubleValue() <= EPS && mutableDouble4.getValue().doubleValue() >= -1.0E-6d) {
            mutableDouble4.setValue(1.0d);
        }
        double doubleValue = ((mutableDouble.getValue().doubleValue() - mutableDouble3.getValue().doubleValue()) * 100.0d) / mutableDouble3.getValue().doubleValue();
        double doubleValue2 = mutableDouble2.getValue().doubleValue() / mutableDouble4.getValue().doubleValue();
        boolean z = serviceConfiguration.getLoadBalancerMsgRateDifferenceShedderThreshold() > 0.0d && doubleValue > serviceConfiguration.getLoadBalancerMsgRateDifferenceShedderThreshold();
        boolean z2 = serviceConfiguration.getLoadBalancerMsgThroughputMultiplierDifferenceShedderThreshold() > 0.0d && doubleValue2 > serviceConfiguration.getLoadBalancerMsgThroughputMultiplierDifferenceShedderThreshold();
        if (z || z2) {
            MutableInt mutableInt = new MutableInt((int) ((mutableDouble.getValue().doubleValue() - mutableDouble3.getValue().doubleValue()) * serviceConfiguration.getMaxUnloadPercentage()));
            MutableInt mutableInt2 = new MutableInt((int) ((mutableDouble2.getValue().doubleValue() - mutableDouble4.getValue().doubleValue()) * serviceConfiguration.getMaxUnloadPercentage()));
            if (z) {
                if (log.isDebugEnabled()) {
                    log.debug("Found bundles for uniform load balancing. msgRate overloaded broker: {} with msgRate: {}, msgRate underloaded broker: {} with msgRate: {}", new Object[]{mutableObject.getValue(), mutableDouble.getValue(), mutableObject3.getValue(), mutableDouble3.getValue()});
                }
                LocalBrokerData localData = brokerData.get(mutableObject.getValue()).getLocalData();
                if (localData.getBundles().size() > 1 && mutableInt.getValue().intValue() >= serviceConfiguration.getMinUnloadMessage()) {
                    bundleDataForLoadShedding.entrySet().stream().filter(entry -> {
                        return localData.getBundles().contains(entry.getKey());
                    }).map(entry2 -> {
                        String str2 = (String) entry2.getKey();
                        TimeAverageMessageData shortTermData = ((BundleData) entry2.getValue()).getShortTermData();
                        return Pair.of(str2, Double.valueOf(shortTermData.getMsgRateIn() + shortTermData.getMsgRateOut()));
                    }).filter(pair -> {
                        return !recentlyUnloadedBundles.containsKey(pair.getLeft());
                    }).sorted((pair2, pair3) -> {
                        return Double.compare(((Double) pair3.getRight()).doubleValue(), ((Double) pair2.getRight()).doubleValue());
                    }).forEach(pair4 -> {
                        if (serviceConfiguration.getMaxUnloadBundleNumPerShedding() == -1 || this.selectedBundlesCache.size() < serviceConfiguration.getMaxUnloadBundleNumPerShedding()) {
                            String str2 = (String) pair4.getLeft();
                            double doubleValue3 = ((Double) pair4.getRight()).doubleValue();
                            if (doubleValue3 <= mutableInt.getValue().intValue() + 1000) {
                                log.info("Found bundle to unload with msgRate {}", Double.valueOf(doubleValue3));
                                mutableInt.add(Double.valueOf(-doubleValue3));
                                this.selectedBundlesCache.put((String) mutableObject.getValue(), str2);
                            }
                        }
                    });
                }
            } else {
                if (log.isDebugEnabled()) {
                    log.debug("Found bundles for uniform load balancing. msgThroughput overloaded broker: {} with msgThroughput {}, msgThroughput underloaded broker: {} with msgThroughput: {}", new Object[]{mutableObject2.getValue(), mutableDouble2.getValue(), mutableObject4.getValue(), mutableDouble4.getValue()});
                }
                LocalBrokerData localData2 = brokerData.get(mutableObject2.getValue()).getLocalData();
                if (localData2.getBundles().size() > 1 && mutableInt2.getValue().intValue() >= serviceConfiguration.getMinUnloadMessageThroughput()) {
                    bundleDataForLoadShedding.entrySet().stream().filter(entry3 -> {
                        return localData2.getBundles().contains(entry3.getKey());
                    }).map(entry4 -> {
                        String str2 = (String) entry4.getKey();
                        TimeAverageMessageData shortTermData = ((BundleData) entry4.getValue()).getShortTermData();
                        return Pair.of(str2, Double.valueOf(shortTermData.getMsgThroughputIn() + shortTermData.getMsgThroughputOut()));
                    }).filter(pair5 -> {
                        return !recentlyUnloadedBundles.containsKey(pair5.getLeft());
                    }).sorted((pair6, pair7) -> {
                        return Double.compare(((Double) pair7.getRight()).doubleValue(), ((Double) pair6.getRight()).doubleValue());
                    }).forEach(pair8 -> {
                        if (serviceConfiguration.getMaxUnloadBundleNumPerShedding() == -1 || this.selectedBundlesCache.size() < serviceConfiguration.getMaxUnloadBundleNumPerShedding()) {
                            String str2 = (String) pair8.getLeft();
                            double doubleValue3 = ((Double) pair8.getRight()).doubleValue();
                            if (doubleValue3 <= mutableInt2.getValue().intValue() + 1000) {
                                log.info("Found bundle to unload with msgThroughput {}", Double.valueOf(doubleValue3));
                                mutableInt2.add(Double.valueOf(-doubleValue3));
                                this.selectedBundlesCache.put((String) mutableObject2.getValue(), str2);
                            }
                        }
                    });
                }
            }
        }
        return this.selectedBundlesCache;
    }
}
