package org.apache.pulsar.broker.loadbalance;

import com.google.common.collect.BoundType;
import com.google.common.collect.Range;
import com.google.common.collect.Sets;
import com.google.common.hash.Hashing;
import java.lang.reflect.Field;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.commons.lang.reflect.FieldUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper;
import org.apache.pulsar.broker.resources.NamespaceResources;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.resources.TenantResources;
import org.apache.pulsar.broker.service.PulsarMetadataEventSynchronizer;
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.policies.data.FailureDomain;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.class */
public class AntiAffinityNamespaceGroupTest extends MockedPulsarServiceBaseTest {
    private PulsarTestContext additionalPulsarTestContext;
    private PulsarService pulsar1;
    private PulsarAdmin admin1;
    private PulsarService pulsar2;
    protected String primaryHost;
    protected String secondaryHost;
    private NamespaceBundleFactory nsFactory;
    protected Object primaryLoadManager;
    private Object secondaryLoadManager;
    private PulsarResources resources;

    private static Object getField(Object obj, String str) throws Exception {
        Field declaredField = obj.getClass().getDeclaredField(str);
        declaredField.setAccessible(true);
        return declaredField.get(obj);
    }

    void setupConfigs(ServiceConfiguration serviceConfiguration) {
        serviceConfiguration.setAllowAutoTopicCreation(true);
        serviceConfiguration.setLoadManagerClassName(getLoadManagerClassName());
        serviceConfiguration.setFailureDomainsEnabled(true);
        serviceConfiguration.setLoadBalancerEnabled(true);
        serviceConfiguration.setLoadBalancerBrokerOverloadedThresholdPercentage(400);
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeClass
    public void setup() throws Exception {
        setupConfigs(this.conf);
        super.internalSetup(this.conf);
        this.pulsar1 = this.pulsar;
        this.primaryHost = this.pulsar1.getBrokerId();
        this.admin1 = this.admin;
        ServiceConfiguration defaultConf = getDefaultConf();
        setupConfigs(defaultConf);
        this.additionalPulsarTestContext = createAdditionalPulsarTestContext(defaultConf);
        this.pulsar2 = this.additionalPulsarTestContext.getPulsarService();
        this.secondaryHost = this.pulsar2.getBrokerId();
        this.primaryLoadManager = getField(this.pulsar1.getLoadManager().get(), "loadManager");
        this.secondaryLoadManager = getField(this.pulsar2.getLoadManager().get(), "loadManager");
        this.nsFactory = new NamespaceBundleFactory(this.pulsar1, Hashing.crc32());
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.pulsar1.getState(), PulsarService.State.Started);
            Assert.assertEquals(this.pulsar2.getState(), PulsarService.State.Started);
        });
        this.admin1.tenants().createTenant("my-tenant", createDefaultTenantInfo());
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterClass
    protected void cleanup() throws Exception {
        this.pulsar1 = null;
        this.pulsar2.close();
        super.internalCleanup();
        this.additionalPulsarTestContext.close();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    public void beforePulsarStart(PulsarService pulsarService) throws Exception {
        if (this.resources == null) {
            this.resources = new PulsarResources(pulsarService.createLocalMetadataStore((PulsarMetadataEventSynchronizer) null), pulsarService.createConfigurationMetadataStore((PulsarMetadataEventSynchronizer) null));
        }
        createNamespaceIfNotExists(this.resources, NamespaceName.SYSTEM_NAMESPACE.getTenant(), NamespaceName.SYSTEM_NAMESPACE);
    }

    protected void createNamespaceIfNotExists(PulsarResources pulsarResources, String str, NamespaceName namespaceName) throws Exception {
        TenantResources tenantResources = pulsarResources.getTenantResources();
        NamespaceResources namespaceResources = pulsarResources.getNamespaceResources();
        if (!tenantResources.tenantExists(str)) {
            tenantResources.createTenant(str, TenantInfo.builder().adminRoles(Sets.newHashSet(this.conf.getSuperUserRoles())).allowedClusters(Sets.newHashSet(new String[]{this.conf.getClusterName()})).build());
        }
        if (namespaceResources.namespaceExists(namespaceName)) {
            return;
        }
        Policies policies = new Policies();
        policies.replication_clusters = Collections.singleton(this.conf.getClusterName());
        namespaceResources.createPolicies(namespaceName, policies);
    }

    protected Object getBundleOwnershipData() {
        return ConcurrentOpenHashMap.newBuilder().build();
    }

    protected String getLoadManagerClassName() {
        return ModularLoadManagerImpl.class.getName();
    }

    @Test
    public void testClusterDomain() {
    }

    @Test
    public void testAntiAffinityNamespaceFilteringWithDomain() throws Exception {
        this.pulsar1.getConfiguration().setFailureDomainsEnabled(true);
        for (int i = 0; i < 5; i++) {
            String str = "my-tenant/test/my-ns" + i;
            this.admin1.namespaces().createNamespace(str);
            this.admin1.namespaces().setNamespaceAntiAffinityGroup(str, "my-antiaffinity");
        }
        HashSet hashSet = new HashSet();
        HashMap hashMap = new HashMap();
        hashSet.add("brokerName-0");
        hashMap.put("brokerName-0", "domain-0");
        hashSet.add("brokerName-1");
        hashMap.put("brokerName-1", "domain-0");
        hashSet.add("brokerName-2");
        hashMap.put("brokerName-2", "domain-1");
        hashSet.add("brokerName-3");
        hashMap.put("brokerName-3", "domain-1");
        HashSet hashSet2 = new HashSet();
        Object bundleOwnershipData = getBundleOwnershipData();
        Assert.assertEquals(hashSet.size(), 4);
        hashSet2.addAll(hashSet);
        filterAntiAffinityGroupOwnedBrokers(this.pulsar1, "my-tenant/test/my-ns0/0x00000000_0xffffffff", hashSet, bundleOwnershipData, hashMap);
        Assert.assertEquals(hashSet.size(), 4);
        selectBrokerForNamespace(bundleOwnershipData, "brokerName-0", "my-tenant/test/my-ns0", "my-tenant/test/my-ns0/0x00000000_0xffffffff");
        hashSet2.addAll(hashSet);
        filterAntiAffinityGroupOwnedBrokers(this.pulsar1, "my-tenant/test/my-ns1/0x00000000_0xffffffff", hashSet2, bundleOwnershipData, hashMap);
        Assert.assertEquals(hashSet2.size(), 2);
        hashSet2.forEach(str2 -> {
            Assert.assertEquals((String) hashMap.get(str2), "domain-1");
        });
        selectBrokerForNamespace(bundleOwnershipData, "brokerName-2", "my-tenant/test/my-ns1", "my-tenant/test/my-ns1/0x00000000_0xffffffff");
        hashSet2.addAll(hashSet);
        filterAntiAffinityGroupOwnedBrokers(this.pulsar1, "my-tenant/test/my-ns2/0x00000000_0xffffffff", hashSet2, bundleOwnershipData, hashMap);
        Assert.assertEquals(hashSet2.size(), 2);
        Assert.assertTrue(hashSet2.contains("brokerName-1"));
        Assert.assertTrue(hashSet2.contains("brokerName-3"));
        selectBrokerForNamespace(bundleOwnershipData, "brokerName-1", "my-tenant/test/my-ns2", "my-tenant/test/my-ns2/0x00000000_0xffffffff");
        hashSet2.addAll(hashSet);
        filterAntiAffinityGroupOwnedBrokers(this.pulsar1, "my-tenant/test/my-ns3/0x00000000_0xffffffff", hashSet2, bundleOwnershipData, hashMap);
        Assert.assertEquals(hashSet2.size(), 1);
        Assert.assertTrue(hashSet2.contains("brokerName-3"));
        selectBrokerForNamespace(bundleOwnershipData, "brokerName-3", "my-tenant/test/my-ns3", "my-tenant/test/my-ns3/0x00000000_0xffffffff");
        hashSet2.addAll(hashSet);
        filterAntiAffinityGroupOwnedBrokers(this.pulsar1, "my-tenant/test/my-ns4/0x00000000_0xffffffff", hashSet2, bundleOwnershipData, hashMap);
        Assert.assertEquals(hashSet2.size(), 4);
    }

    @Test
    public void testAntiAffinityNamespaceFilteringWithoutDomain() throws Exception {
        for (int i = 0; i < 5; i++) {
            String str = "my-tenant/test/my-ns-wo-domain" + i;
            this.admin1.namespaces().createNamespace(str);
            this.admin1.namespaces().setNamespaceAntiAffinityGroup(str, "my-antiaffinity-wo-domain");
        }
        HashSet hashSet = new HashSet();
        HashSet hashSet2 = new HashSet();
        Object bundleOwnershipData = getBundleOwnershipData();
        hashSet.add("broker-0");
        hashSet.add("broker-1");
        hashSet.add("broker-2");
        hashSet2.addAll(hashSet);
        filterAntiAffinityGroupOwnedBrokers(this.pulsar1, "my-tenant/test/my-ns-wo-domain0/0x00000000_0xffffffff", hashSet, bundleOwnershipData, null);
        Assert.assertEquals(hashSet.size(), 3);
        selectBrokerForNamespace(bundleOwnershipData, "broker-0", "my-tenant/test/my-ns-wo-domain0", "my-tenant/test/my-ns-wo-domain0/0x00000000_0xffffffff");
        hashSet2.addAll(hashSet);
        filterAntiAffinityGroupOwnedBrokers(this.pulsar1, "my-tenant/test/my-ns-wo-domain1/0x00000000_0xffffffff", hashSet2, bundleOwnershipData, null);
        Assert.assertEquals(hashSet2.size(), 2);
        Assert.assertTrue(hashSet2.contains("broker-1"));
        Assert.assertTrue(hashSet2.contains("broker-2"));
        selectBrokerForNamespace(bundleOwnershipData, "broker-1", "my-tenant/test/my-ns-wo-domain1", "my-tenant/test/my-ns-wo-domain1/0x00000000_0xffffffff");
        hashSet2.addAll(hashSet);
        filterAntiAffinityGroupOwnedBrokers(this.pulsar1, "my-tenant/test/my-ns-wo-domain2/0x00000000_0xffffffff", hashSet2, bundleOwnershipData, null);
        Assert.assertEquals(hashSet2.size(), 1);
        Assert.assertTrue(hashSet2.contains("broker-2"));
        selectBrokerForNamespace(bundleOwnershipData, "broker-2", "my-tenant/test/my-ns-wo-domain2", "my-tenant/test/my-ns-wo-domain2/0x00000000_0xffffffff");
        hashSet2.addAll(hashSet);
        filterAntiAffinityGroupOwnedBrokers(this.pulsar1, "my-tenant/test/my-ns-wo-domain3/0x00000000_0xffffffff", hashSet2, bundleOwnershipData, null);
        Assert.assertEquals(hashSet2.size(), 3);
    }

    protected void selectBrokerForNamespace(Object obj, String str, String str2, String str3) {
        ConcurrentOpenHashSet build = ConcurrentOpenHashSet.newBuilder().build();
        build.add(str3);
        ConcurrentOpenHashMap build2 = ConcurrentOpenHashMap.newBuilder().build();
        build2.put(str2, build);
        ((ConcurrentOpenHashMap) obj).put(str, build2);
    }

    @Test
    public void testBrokerSelectionForAntiAffinityGroup() throws Exception {
        String str = this.primaryHost;
        String str2 = this.secondaryHost;
        String clusterName = this.pulsar1.getConfiguration().getClusterName();
        String str3 = "tenant-" + String.valueOf(UUID.randomUUID());
        String str4 = str3 + "/" + clusterName + "/ns1";
        String str5 = str3 + "/" + clusterName + "/ns2";
        this.admin1.clusters().createFailureDomain(clusterName, "domain1", FailureDomain.builder().brokers(Collections.singleton(str)).build());
        this.admin1.clusters().createFailureDomain(clusterName, "domain2", FailureDomain.builder().brokers(Collections.singleton(str2)).build());
        this.admin1.tenants().createTenant(str3, new TenantInfoImpl((Set) null, Sets.newHashSet(new String[]{clusterName})));
        this.admin1.namespaces().createNamespace(str4);
        this.admin1.namespaces().createNamespace(str5);
        this.admin1.namespaces().setNamespaceAntiAffinityGroup(str4, "group");
        this.admin1.namespaces().setNamespaceAntiAffinityGroup(str5, "group");
        Awaitility.await().untilAsserted(() -> {
            Assert.assertTrue(isLoadManagerUpdatedDomainCache(this.primaryLoadManager));
            Assert.assertTrue(isLoadManagerUpdatedDomainCache(this.secondaryLoadManager));
        });
        Assert.assertNotEquals(selectBroker(makeBundle(str3, clusterName, "ns1"), this.primaryLoadManager), selectBroker(makeBundle(str3, clusterName, "ns2"), this.primaryLoadManager));
    }

    protected String selectBroker(ServiceUnitId serviceUnitId, Object obj) {
        return (String) ((ModularLoadManager) obj).selectBrokerForAssignment(serviceUnitId).get();
    }

    @Test
    public void testLoadSheddingUtilWithAntiAffinityNamespace() throws Exception {
        for (int i = 0; i < 5; i++) {
            String str = "my-tenant/test/my-ns-load-shedding-util" + i;
            this.admin1.namespaces().createNamespace(str);
            this.admin1.namespaces().setNamespaceAntiAffinityGroup(str, "my-antiaffinity-load-shedding-util");
        }
        HashSet hashSet = new HashSet();
        HashSet hashSet2 = new HashSet();
        Object bundleOwnershipData = getBundleOwnershipData();
        hashSet.add("broker-0");
        hashSet.add("broker-1");
        hashSet.add("broker-2");
        hashSet2.addAll(hashSet);
        selectBrokerForNamespace(bundleOwnershipData, "broker-0", "my-tenant/test/my-ns-load-shedding-util0", "my-tenant/test/my-ns-load-shedding-util0/0x00000000_0xffffffff");
        Assert.assertTrue(shouldAntiAffinityNamespaceUnload("my-tenant/test/my-ns-load-shedding-util0", "/0x00000000_0xffffffff", "broker-0", this.pulsar1, bundleOwnershipData, hashSet2));
        selectBrokerForNamespace(bundleOwnershipData, "broker-1", "my-tenant/test/my-ns-load-shedding-util1", "my-tenant/test/my-ns-load-shedding-util0/0x00000000_0xffffffff");
        Assert.assertTrue(shouldAntiAffinityNamespaceUnload("my-tenant/test/my-ns-load-shedding-util0", "/0x00000000_0xffffffff", "broker-0", this.pulsar1, bundleOwnershipData, hashSet2));
        selectBrokerForNamespace(bundleOwnershipData, "broker-2", "my-tenant/test/my-ns-load-shedding-util2", "my-tenant/test/my-ns-load-shedding-util0/0x00000000_0xffffffff");
        Assert.assertFalse(shouldAntiAffinityNamespaceUnload("my-tenant/test/my-ns-load-shedding-util0", "/0x00000000_0xffffffff", "broker-0", this.pulsar1, bundleOwnershipData, hashSet2));
    }

    @Test
    public void testLoadSheddingWithAntiAffinityNamespace() throws Exception {
        for (int i = 0; i < 5; i++) {
            String str = "my-tenant/test/my-ns-load-shedding" + i;
            this.admin1.namespaces().createNamespace(str);
            this.admin1.namespaces().setNamespaceAntiAffinityGroup(str, "my-antiaffinity-load-shedding");
        }
        PulsarClient build = PulsarClient.builder().serviceUrl(this.pulsar1.getSafeWebServiceAddress()).build();
        try {
            Producer create = build.newProducer().topic("persistent://my-tenant/test/my-ns-load-shedding0/my-topic1").create();
            this.pulsar1.getBrokerService().updateRates();
            verifyLoadSheddingWithAntiAffinityNamespace("my-tenant/test/my-ns-load-shedding0", "0x00000000_0xffffffff");
            create.close();
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
            throw th;
        }
    }

    protected void verifyLoadSheddingWithAntiAffinityNamespace(String str, String str2) {
        ModularLoadManagerImpl loadManager = ((ModularLoadManagerWrapper) this.pulsar1.getLoadManager().get()).getLoadManager();
        loadManager.updateAll();
        Assert.assertTrue(loadManager.shouldAntiAffinityNamespaceUnload(str, str2, this.primaryHost));
    }

    protected boolean isLoadManagerUpdatedDomainCache(Object obj) throws Exception {
        return !((Map) FieldUtils.readDeclaredField(obj, "brokerToFailureDomainMap", true)).isEmpty();
    }

    private NamespaceBundle makeBundle(String str, String str2, String str3) {
        return this.nsFactory.getBundle(NamespaceName.get(str, str2, str3), Range.range(NamespaceBundles.FULL_LOWER_BOUND, BoundType.CLOSED, NamespaceBundles.FULL_UPPER_BOUND, BoundType.CLOSED));
    }

    private static void filterAntiAffinityGroupOwnedBrokers(PulsarService pulsarService, String str, Set<String> set, Object obj, Map<String, String> map) {
        if (obj instanceof Set) {
            LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(pulsarService, str, set, (Set) obj, map);
        } else {
            if (!(obj instanceof ConcurrentOpenHashMap)) {
                throw new RuntimeException("Unknown ownershipData class type");
            }
            LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(pulsarService, str, set, (ConcurrentOpenHashMap) obj, map);
        }
    }

    private static boolean shouldAntiAffinityNamespaceUnload(String str, String str2, String str3, PulsarService pulsarService, Object obj, Set<String> set) throws Exception {
        if (obj instanceof Set) {
            return LoadManagerShared.shouldAntiAffinityNamespaceUnload(str, str2, str3, pulsarService, (Set) obj, set);
        }
        if (obj instanceof ConcurrentOpenHashMap) {
            return LoadManagerShared.shouldAntiAffinityNamespaceUnload(str, str2, str3, pulsarService, (ConcurrentOpenHashMap) obj, set);
        }
        throw new RuntimeException("Unknown ownershipData class type");
    }
}
