package org.apache.pulsar.broker.service;

import java.util.Arrays;
import java.util.HashSet;
import lombok.Generated;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.PulsarMetadataEventSynchronizer;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.apache.pulsar.zookeeper.ZookeeperServerTest;
import org.awaitility.Awaitility;
import org.awaitility.reflect.WhiteboxImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/broker/service/SyncConfigStoreTest.class */
public class SyncConfigStoreTest extends GeoReplicationWithConfigurationSyncTestBase {
    private static final String CONF_NAME_SYNC_EVENT_TOPIC = "configurationMetadataSyncEventTopic";

    @Generated
    private static final Logger log = LoggerFactory.getLogger(SyncConfigStoreTest.class);
    private static final String SYNC_EVENT_TOPIC = TopicDomain.persistent.value() + "://" + String.valueOf(NamespaceName.SYSTEM_NAMESPACE) + "/__sync_config_meta";

    @Override // org.apache.pulsar.broker.service.GeoReplicationWithConfigurationSyncTestBase
    @BeforeClass(alwaysRun = true, timeOut = 300000)
    public void setup() throws Exception {
        super.setup();
        TenantInfoImpl tenantInfoImpl = new TenantInfoImpl();
        tenantInfoImpl.setAllowedClusters(new HashSet(Arrays.asList("r1", "r2")));
        this.admin1.tenants().createTenant(TopicName.get(SYNC_EVENT_TOPIC).getTenant(), tenantInfoImpl);
        this.admin1.namespaces().createNamespace(TopicName.get(SYNC_EVENT_TOPIC).getNamespace());
    }

    @Override // org.apache.pulsar.broker.service.GeoReplicationWithConfigurationSyncTestBase
    @AfterClass(alwaysRun = true, timeOut = 300000)
    public void cleanup() throws Exception {
        super.cleanup();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.pulsar.broker.service.GeoReplicationWithConfigurationSyncTestBase
    public void setConfigDefaults(ServiceConfiguration serviceConfiguration, String str, LocalBookkeeperEnsemble localBookkeeperEnsemble, ZookeeperServerTest zookeeperServerTest) {
        super.setConfigDefaults(serviceConfiguration, str, localBookkeeperEnsemble, zookeeperServerTest);
    }

    @Test
    public void testDynamicEnableConfigurationMetadataSyncEventTopic() throws Exception {
        Awaitility.await().untilAsserted(() -> {
            Assert.assertTrue(((Boolean) WhiteboxImpl.getInternalState(this.pulsar1, "shouldShutdownConfigurationMetadataStore")).booleanValue());
        });
        this.admin1.brokers().updateDynamicConfiguration(CONF_NAME_SYNC_EVENT_TOPIC, SYNC_EVENT_TOPIC);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.pulsar1.getConfig().getConfigurationMetadataSyncEventTopic(), SYNC_EVENT_TOPIC);
            PulsarMetadataEventSynchronizer pulsarMetadataEventSynchronizer = (PulsarMetadataEventSynchronizer) WhiteboxImpl.getInternalState(this.pulsar1, "configMetadataSynchronizer");
            Assert.assertNotNull(pulsarMetadataEventSynchronizer);
            Assert.assertEquals(pulsarMetadataEventSynchronizer.getState(), PulsarMetadataEventSynchronizer.State.Started);
            Assert.assertTrue(pulsarMetadataEventSynchronizer.isStarted());
        });
        PulsarMetadataEventSynchronizer pulsarMetadataEventSynchronizer = (PulsarMetadataEventSynchronizer) WhiteboxImpl.getInternalState(this.pulsar1, "configMetadataSynchronizer");
        Producer producer = (Producer) WhiteboxImpl.getInternalState(pulsarMetadataEventSynchronizer, "producer");
        Consumer consumer = (Consumer) WhiteboxImpl.getInternalState(pulsarMetadataEventSynchronizer, "consumer");
        this.admin1.brokers().deleteDynamicConfiguration(CONF_NAME_SYNC_EVENT_TOPIC);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(pulsarMetadataEventSynchronizer.getState(), PulsarMetadataEventSynchronizer.State.Closed);
            Assert.assertTrue(pulsarMetadataEventSynchronizer.isClosingOrClosed());
            Assert.assertFalse(producer.isConnected());
            Assert.assertFalse(consumer.isConnected());
            Assert.assertNull(this.pulsar1.getConfig().getConfigurationMetadataSyncEventTopic());
            Assert.assertNull((PulsarMetadataEventSynchronizer) WhiteboxImpl.getInternalState(this.pulsar1, "configMetadataSynchronizer"));
        });
    }
}
