package org.apache.pulsar.broker.service;

import com.google.common.collect.Sets;
import java.net.URL;
import java.time.Duration;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.tests.TestRetrySupport;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.apache.pulsar.zookeeper.ZookeeperServerTest;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;

/* loaded from: input_file:org/apache/pulsar/broker/service/OneWayReplicatorTestBase.class */
public abstract class OneWayReplicatorTestBase extends TestRetrySupport {
    private static final Logger log = LoggerFactory.getLogger(OneWayReplicatorTestBase.class);
    protected URL url1;
    protected URL urlTls1;
    protected ZookeeperServerTest brokerConfigZk1;
    protected LocalBookkeeperEnsemble bkEnsemble1;
    protected PulsarService pulsar1;
    protected BrokerService broker1;
    protected PulsarAdmin admin1;
    protected PulsarClient client1;
    protected URL url2;
    protected URL urlTls2;
    protected ZookeeperServerTest brokerConfigZk2;
    protected LocalBookkeeperEnsemble bkEnsemble2;
    protected PulsarService pulsar2;
    protected BrokerService broker2;
    protected PulsarAdmin admin2;
    protected PulsarClient client2;
    protected final String defaultTenant = "public";
    protected final String replicatedNamespace = "public/default";
    protected final String nonReplicatedNamespace = "public/ns1";
    protected final String cluster1 = "r1";
    protected boolean usingGlobalZK = false;
    protected ServiceConfiguration config1 = new ServiceConfiguration();
    protected final String cluster2 = "r2";
    protected ServiceConfiguration config2 = new ServiceConfiguration();

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/pulsar/broker/service/OneWayReplicatorTestBase$CleanupTopicAction.class */
    public interface CleanupTopicAction {
        void run() throws Exception;
    }

    protected void startZKAndBK() throws Exception {
        this.brokerConfigZk1 = new ZookeeperServerTest(0);
        this.brokerConfigZk1.start();
        if (this.usingGlobalZK) {
            this.brokerConfigZk2 = this.brokerConfigZk1;
        } else {
            this.brokerConfigZk2 = new ZookeeperServerTest(0);
            this.brokerConfigZk2.start();
        }
        this.bkEnsemble1 = new LocalBookkeeperEnsemble(3, 0, () -> {
            return 0;
        });
        this.bkEnsemble1.start();
        this.bkEnsemble2 = new LocalBookkeeperEnsemble(3, 0, () -> {
            return 0;
        });
        this.bkEnsemble2.start();
    }

    protected void startBrokers() throws Exception {
        setConfigDefaults(this.config1, "r1", this.bkEnsemble1, this.brokerConfigZk1);
        this.pulsar1 = new PulsarService(this.config1);
        this.pulsar1.start();
        this.broker1 = this.pulsar1.getBrokerService();
        this.url1 = new URL(this.pulsar1.getWebServiceAddress());
        this.urlTls1 = new URL(this.pulsar1.getWebServiceAddressTls());
        setConfigDefaults(this.config2, "r2", this.bkEnsemble2, this.brokerConfigZk2);
        this.pulsar2 = new PulsarService(this.config2);
        this.pulsar2.start();
        this.broker2 = this.pulsar2.getBrokerService();
        this.url2 = new URL(this.pulsar2.getWebServiceAddress());
        this.urlTls2 = new URL(this.pulsar2.getWebServiceAddressTls());
    }

    protected void startAdminClient() throws Exception {
        this.admin1 = PulsarAdmin.builder().serviceHttpUrl(this.url1.toString()).build();
        this.admin2 = PulsarAdmin.builder().serviceHttpUrl(this.url2.toString()).build();
    }

    protected void startPulsarClient() throws Exception {
        this.client1 = initClient(PulsarClient.builder().serviceUrl(this.url1.toString()));
        this.client2 = initClient(PulsarClient.builder().serviceUrl(this.url2.toString()));
    }

    protected void createDefaultTenantsAndClustersAndNamespace() throws Exception {
        this.admin1.clusters().createCluster("r1", ClusterData.builder().serviceUrl(this.url1.toString()).serviceUrlTls(this.urlTls1.toString()).brokerServiceUrl(this.pulsar1.getBrokerServiceUrl()).brokerServiceUrlTls(this.pulsar1.getBrokerServiceUrlTls()).brokerClientTlsEnabled(false).build());
        this.admin1.clusters().createCluster("r2", ClusterData.builder().serviceUrl(this.url2.toString()).serviceUrlTls(this.urlTls2.toString()).brokerServiceUrl(this.pulsar2.getBrokerServiceUrl()).brokerServiceUrlTls(this.pulsar2.getBrokerServiceUrlTls()).brokerClientTlsEnabled(false).build());
        this.admin1.tenants().createTenant("public", new TenantInfoImpl(Collections.emptySet(), Sets.newHashSet(new String[]{"r1", "r2"})));
        this.admin1.namespaces().createNamespace("public/default", Sets.newHashSet(new String[]{"r1", "r2"}));
        this.admin1.namespaces().createNamespace("public/ns1");
        if (this.usingGlobalZK) {
            return;
        }
        this.admin2.clusters().createCluster("r1", ClusterData.builder().serviceUrl(this.url1.toString()).serviceUrlTls(this.urlTls1.toString()).brokerServiceUrl(this.pulsar1.getBrokerServiceUrl()).brokerServiceUrlTls(this.pulsar1.getBrokerServiceUrlTls()).brokerClientTlsEnabled(false).build());
        this.admin2.clusters().createCluster("r2", ClusterData.builder().serviceUrl(this.url2.toString()).serviceUrlTls(this.urlTls2.toString()).brokerServiceUrl(this.pulsar2.getBrokerServiceUrl()).brokerServiceUrlTls(this.pulsar2.getBrokerServiceUrlTls()).brokerClientTlsEnabled(false).build());
        this.admin2.tenants().createTenant("public", new TenantInfoImpl(Collections.emptySet(), Sets.newHashSet(new String[]{"r1", "r2"})));
        this.admin2.namespaces().createNamespace("public/default");
        this.admin2.namespaces().createNamespace("public/ns1");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void cleanupTopics(CleanupTopicAction cleanupTopicAction) throws Exception {
        cleanupTopics("public/default", cleanupTopicAction);
    }

    protected void cleanupTopics(String str, CleanupTopicAction cleanupTopicAction) throws Exception {
        if (this.usingGlobalZK) {
            throw new IllegalArgumentException("The method cleanupTopics does not support for global ZK");
        }
        waitChangeEventsInit(str);
        this.admin1.namespaces().setNamespaceReplicationClusters(str, Collections.singleton("r1"));
        this.admin1.namespaces().unload(str);
        cleanupTopicAction.run();
        this.admin1.namespaces().setNamespaceReplicationClusters(str, Sets.newHashSet(new String[]{"r1", "r2"}));
        waitChangeEventsInit(str);
    }

    protected void waitChangeEventsInit(String str) {
        CompletableFuture topic = this.pulsar1.getBrokerService().getTopic(str + "/__change_events", false);
        if (topic == null) {
            return;
        }
        Optional optional = (Optional) topic.join();
        if (optional.isPresent()) {
            PersistentTopic persistentTopic = (PersistentTopic) optional.get();
            Awaitility.await().atMost(Duration.ofSeconds(180L)).untilAsserted(() -> {
                persistentTopic.getStats(true, false, false).getSubscriptions().entrySet().forEach(entry -> {
                    if ("__compaction".equals(entry.getKey()) || ((SubscriptionStats) entry.getValue()).isDurable()) {
                        return;
                    }
                    Assert.assertTrue(((SubscriptionStats) entry.getValue()).getMsgBacklog() == 0, (String) entry.getKey());
                });
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setup() throws Exception {
        incrementSetupNumber();
        log.info("--- Starting OneWayReplicatorTestBase::setup ---");
        startZKAndBK();
        startBrokers();
        startAdminClient();
        createDefaultTenantsAndClustersAndNamespace();
        startPulsarClient();
        Thread.sleep(100L);
        log.info("--- OneWayReplicatorTestBase::setup completed ---");
    }

    protected void setConfigDefaults(ServiceConfiguration serviceConfiguration, String str, LocalBookkeeperEnsemble localBookkeeperEnsemble, ZookeeperServerTest zookeeperServerTest) {
        serviceConfiguration.setClusterName(str);
        serviceConfiguration.setAdvertisedAddress("localhost");
        serviceConfiguration.setWebServicePort(Optional.of(0));
        serviceConfiguration.setWebServicePortTls(Optional.of(0));
        serviceConfiguration.setMetadataStoreUrl("zk:127.0.0.1:" + localBookkeeperEnsemble.getZookeeperPort());
        serviceConfiguration.setConfigurationMetadataStoreUrl("zk:127.0.0.1:" + zookeeperServerTest.getZookeeperPort() + "/foo");
        serviceConfiguration.setBrokerDeleteInactiveTopicsEnabled(false);
        serviceConfiguration.setBrokerDeleteInactiveTopicsFrequencySeconds(60);
        serviceConfiguration.setBrokerShutdownTimeoutMs(0L);
        serviceConfiguration.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(Double.valueOf(1.0d)));
        serviceConfiguration.setBrokerServicePort(Optional.of(0));
        serviceConfiguration.setBrokerServicePortTls(Optional.of(0));
        serviceConfiguration.setBacklogQuotaCheckIntervalInSeconds(5);
        serviceConfiguration.setDefaultNumberOfNamespaceBundles(1);
        serviceConfiguration.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED.toString());
        serviceConfiguration.setEnableReplicatedSubscriptions(true);
        serviceConfiguration.setReplicatedSubscriptionsSnapshotFrequencyMillis(1000);
        serviceConfiguration.setLoadBalancerSheddingEnabled(false);
        serviceConfiguration.setForceDeleteNamespaceAllowed(true);
        serviceConfiguration.setClusterName(str);
        serviceConfiguration.setTlsRequireTrustedClientCertOnConnect(false);
        serviceConfiguration.setSystemTopicEnabled(true);
        serviceConfiguration.setTopicLevelPoliciesEnabled(true);
        Set newConcurrentHashSet = Sets.newConcurrentHashSet();
        newConcurrentHashSet.add("TLSv1.3");
        newConcurrentHashSet.add("TLSv1.2");
        serviceConfiguration.setTlsProtocols(newConcurrentHashSet);
    }

    protected void cleanupPulsarResources() throws Exception {
        waitChangeEventsInit("public/default");
        this.admin1.namespaces().setNamespaceReplicationClusters("public/default", Sets.newHashSet(new String[]{"r1"}));
        if (!this.usingGlobalZK) {
            this.admin2.namespaces().setNamespaceReplicationClusters("public/default", Sets.newHashSet(new String[]{"r2"}));
        }
        this.admin1.namespaces().deleteNamespace("public/default", true);
        this.admin1.namespaces().deleteNamespace("public/ns1", true);
        if (this.usingGlobalZK) {
            return;
        }
        this.admin2.namespaces().deleteNamespace("public/default", true);
        this.admin2.namespaces().deleteNamespace("public/ns1", true);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void cleanup() throws Exception {
        cleanupPulsarResources();
        markCurrentSetupNumberCleaned();
        log.info("--- Shutting down ---");
        if (this.client1 != null) {
            this.client1.close();
            this.client1 = null;
        }
        if (this.client2 != null) {
            this.client2.close();
            this.client2 = null;
        }
        if (this.admin1 != null) {
            this.admin1.close();
            this.admin1 = null;
        }
        if (this.admin2 != null) {
            this.admin2.close();
            this.admin2 = null;
        }
        if (this.pulsar2 != null) {
            this.pulsar2.close();
            this.pulsar2 = null;
        }
        if (this.pulsar1 != null) {
            this.pulsar1.close();
            this.pulsar1 = null;
        }
        if (this.bkEnsemble1 != null) {
            this.bkEnsemble1.stop();
            this.bkEnsemble1 = null;
        }
        if (this.bkEnsemble2 != null) {
            this.bkEnsemble2.stop();
            this.bkEnsemble2 = null;
        }
        if (this.brokerConfigZk1 != null) {
            this.brokerConfigZk1.stop();
            this.brokerConfigZk1 = null;
        }
        if (!this.usingGlobalZK && this.brokerConfigZk2 != null) {
            this.brokerConfigZk2.stop();
            this.brokerConfigZk2 = null;
        }
        this.config1 = new ServiceConfiguration();
        this.config2 = new ServiceConfiguration();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void waitReplicatorStarted(String str) {
        Awaitility.await().untilAsserted(() -> {
            Optional optional = (Optional) this.pulsar2.getBrokerService().getTopic(str, false).get();
            Assert.assertTrue(optional.isPresent());
            Assert.assertFalse(((PersistentTopic) optional.get()).getProducers().isEmpty());
        });
    }

    protected PulsarClient initClient(ClientBuilder clientBuilder) throws Exception {
        return clientBuilder.build();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void verifyReplicationWorks(String str) throws Exception {
        Awaitility.await().until(() -> {
            try {
                if (((PersistentTopic) ((Optional) this.pulsar1.getBrokerService().getTopic(str, false).join()).get()).getReplicators().size() > 0) {
                    return true;
                }
            } catch (Exception e) {
            }
            try {
                if (((PersistentTopic) ((Optional) this.pulsar1.getBrokerService().getTopic(TopicName.get(str).getPartition(0).toString(), false).join()).get()).getReplicators().size() > 0) {
                    return true;
                }
            } catch (Exception e2) {
            }
            return false;
        });
        Producer create = this.client1.newProducer(Schema.STRING).topic(str).create();
        Consumer subscribe = this.client2.newConsumer(Schema.STRING).topic(new String[]{str}).isAckReceiptEnabled(true).subscriptionName("__subscribe_1").subscribe();
        create.newMessage().value("__msg1").send();
        this.pulsar1.getBrokerService().checkReplicationPolicies();
        Assert.assertEquals((String) subscribe.receive(10, TimeUnit.SECONDS).getValue(), "__msg1");
        subscribe.unsubscribe();
        create.close();
    }

    protected void setTopicLevelClusters(String str, List<String> list, PulsarAdmin pulsarAdmin, PulsarService pulsarService) throws Exception {
        HashSet hashSet = new HashSet(list);
        TopicName topicName = TopicName.get(TopicName.get(str).getPartitionedTopicName());
        int ensurePartitionsAreSame = ensurePartitionsAreSame(str);
        pulsarAdmin.topics().setReplicationClusters(str, list);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(new HashSet(pulsarService.getTopicPoliciesService().getTopicPolicies(topicName).getReplicationClusters()), hashSet);
            if (ensurePartitionsAreSame == 0) {
                checkNonPartitionedTopicLevelClusters(topicName.toString(), list, pulsarAdmin, pulsarService.getBrokerService());
                return;
            }
            for (int i = 0; i < ensurePartitionsAreSame; i++) {
                checkNonPartitionedTopicLevelClusters(topicName.getPartition(i).toString(), list, pulsarAdmin, pulsarService.getBrokerService());
            }
        });
    }

    protected void checkNonPartitionedTopicLevelClusters(String str, List<String> list, PulsarAdmin pulsarAdmin, BrokerService brokerService) throws Exception {
        Optional optional;
        CompletableFuture topic = brokerService.getTopic(str, false);
        if (topic == null || (optional = (Optional) topic.join()) == null || !optional.isPresent()) {
            return;
        }
        Assert.assertEquals(new HashSet(((TopicPolicies) ((PersistentTopic) optional.get()).getTopicPolicies().get()).getReplicationClusters()), new HashSet(list));
    }

    protected int ensurePartitionsAreSame(String str) throws Exception {
        TopicName topicName = TopicName.get(TopicName.get(str).getPartitionedTopicName());
        boolean partitionedTopicExists = this.pulsar1.getPulsarResources().getNamespaceResources().getPartitionedTopicResources().partitionedTopicExists(topicName);
        boolean partitionedTopicExists2 = this.pulsar2.getPulsarResources().getNamespaceResources().getPartitionedTopicResources().partitionedTopicExists(topicName);
        if (partitionedTopicExists != partitionedTopicExists2) {
            throw new IllegalArgumentException(String.format("Can not delete topic. isPartitionedTopic1: %s, isPartitionedTopic2: %s", Boolean.valueOf(partitionedTopicExists), Boolean.valueOf(partitionedTopicExists2)));
        }
        if (!partitionedTopicExists) {
            return 0;
        }
        int i = ((PartitionedTopicMetadata) ((Optional) this.pulsar1.getPulsarResources().getNamespaceResources().getPartitionedTopicResources().getPartitionedTopicMetadataAsync(topicName).join()).get()).partitions;
        int i2 = ((PartitionedTopicMetadata) ((Optional) this.pulsar2.getPulsarResources().getNamespaceResources().getPartitionedTopicResources().getPartitionedTopicMetadataAsync(topicName).join()).get()).partitions;
        if (i != i2) {
            throw new IllegalArgumentException(String.format("Can not delete topic. partitions1: %s, partitions2: %s", Integer.valueOf(i), Integer.valueOf(i2)));
        }
        return i;
    }
}
