package org.apache.pulsar.io;

import com.google.common.collect.Sets;
import java.lang.reflect.Method;
import java.net.URL;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.ServiceConfigurationUtils;
import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
import org.apache.pulsar.client.admin.BrokerStats;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig;
import org.apache.pulsar.functions.worker.PulsarWorkerService;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler;
import org.apache.pulsar.utils.ResourceUtils;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = {"broker-io"})
/* loaded from: input_file:org/apache/pulsar/io/PulsarFunctionAdminTest.class */
public class PulsarFunctionAdminTest {
    LocalBookkeeperEnsemble bkEnsemble;
    ServiceConfiguration config;
    WorkerConfig workerConfig;
    URL urlTls;
    PulsarService pulsar;
    PulsarAdmin admin;
    PulsarClient pulsarClient;
    BrokerStats brokerStatsClient;
    PulsarWorkerService functionsWorkerService;
    String primaryHost;
    private static final Logger log = LoggerFactory.getLogger(PulsarFunctionAdminTest.class);
    final String tenant = "external-repl-prop";
    String pulsarFunctionsNamespace = "external-repl-prop/pulsar-function-admin";
    private final String TLS_SERVER_CERT_FILE_PATH = ResourceUtils.getAbsolutePath("certificate-authority/server-keys/broker.cert.pem");
    private final String TLS_SERVER_KEY_FILE_PATH = ResourceUtils.getAbsolutePath("certificate-authority/server-keys/broker.key-pk8.pem");
    private final String TLS_CLIENT_CERT_FILE_PATH = ResourceUtils.getAbsolutePath("certificate-authority/client-keys/admin.cert.pem");
    private final String TLS_CLIENT_KEY_FILE_PATH = ResourceUtils.getAbsolutePath("certificate-authority/client-keys/admin.key-pk8.pem");
    private final String TLS_TRUST_CERT_FILE_PATH = ResourceUtils.getAbsolutePath("certificate-authority/certs/ca.cert.pem");

    @BeforeMethod(alwaysRun = true)
    void setup(Method method) throws Exception {
        log.info("--- Setting up method {} ---", method.getName());
        this.bkEnsemble = new LocalBookkeeperEnsemble(3, 0, () -> {
            return 0;
        });
        this.bkEnsemble.start();
        this.config = new ServiceConfiguration();
        this.config.setClusterName("use");
        this.config.setSuperUserRoles(Sets.newHashSet(new String[]{"superUser", "admin"}));
        this.config.setWebServicePort(Optional.of(0));
        this.config.setWebServicePortTls(Optional.of(0));
        this.config.setMetadataStoreUrl("zk:127.0.0.1:" + this.bkEnsemble.getZookeeperPort());
        this.config.setBrokerShutdownTimeoutMs(0L);
        this.config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(Double.valueOf(1.0d)));
        this.config.setBrokerServicePort(Optional.of(0));
        this.config.setBrokerServicePortTls(Optional.of(0));
        this.config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName());
        HashSet hashSet = new HashSet();
        hashSet.add(AuthenticationProviderTls.class.getName());
        this.config.setAuthenticationEnabled(true);
        this.config.setAuthenticationProviders(hashSet);
        this.config.setTlsCertificateFilePath(this.TLS_SERVER_CERT_FILE_PATH);
        this.config.setTlsKeyFilePath(this.TLS_SERVER_KEY_FILE_PATH);
        this.config.setTlsTrustCertsFilePath(this.TLS_TRUST_CERT_FILE_PATH);
        this.functionsWorkerService = createPulsarFunctionWorker(this.config);
        this.pulsar = new PulsarService(this.config, this.workerConfig, Optional.of(this.functionsWorkerService), num -> {
        });
        this.pulsar.start();
        this.urlTls = new URL(this.pulsar.getBrokerServiceUrlTls());
        HashMap hashMap = new HashMap();
        hashMap.put("tlsCertFile", this.TLS_CLIENT_CERT_FILE_PATH);
        hashMap.put("tlsKeyFile", this.TLS_CLIENT_KEY_FILE_PATH);
        AuthenticationTls authenticationTls = new AuthenticationTls();
        authenticationTls.configure(hashMap);
        this.admin = (PulsarAdmin) Mockito.spy(PulsarAdmin.builder().serviceHttpUrl(this.pulsar.getWebServiceAddressTls()).tlsTrustCertsFilePath(this.TLS_CLIENT_CERT_FILE_PATH).authentication(authenticationTls).build());
        this.brokerStatsClient = this.admin.brokerStats();
        this.primaryHost = this.pulsar.getWebServiceAddress();
        this.admin.clusters().updateCluster(this.config.getClusterName(), ClusterData.builder().serviceUrl(this.urlTls.toString()).build());
        ClientBuilder serviceUrl = PulsarClient.builder().serviceUrl(this.workerConfig.getPulsarServiceUrl());
        if (StringUtils.isNotBlank(this.workerConfig.getBrokerClientAuthenticationPlugin()) && StringUtils.isNotBlank(this.workerConfig.getBrokerClientAuthenticationParameters())) {
            serviceUrl.enableTls(this.workerConfig.isUseTls());
            serviceUrl.allowTlsInsecureConnection(this.workerConfig.isTlsAllowInsecureConnection());
            serviceUrl.authentication(this.workerConfig.getBrokerClientAuthenticationPlugin(), this.workerConfig.getBrokerClientAuthenticationParameters());
        }
        if (this.pulsarClient != null) {
            this.pulsarClient.close();
        }
        this.pulsarClient = serviceUrl.build();
        this.admin.tenants().updateTenant("external-repl-prop", TenantInfo.builder().allowedClusters(Collections.singleton("use")).build());
        Thread.sleep(100L);
    }

    @AfterMethod(alwaysRun = true)
    void shutdown() throws Exception {
        log.info("--- Shutting down ---");
        if (this.pulsarClient != null) {
            this.pulsarClient.close();
            this.pulsarClient = null;
        }
        if (this.admin != null) {
            this.admin.close();
            this.admin = null;
        }
        if (this.functionsWorkerService != null) {
            this.functionsWorkerService.stop();
            this.functionsWorkerService = null;
        }
        if (this.pulsar != null) {
            this.pulsar.close();
            this.pulsar = null;
        }
        if (this.bkEnsemble != null) {
            this.bkEnsemble.stop();
            this.bkEnsemble = null;
        }
    }

    private PulsarWorkerService createPulsarFunctionWorker(ServiceConfiguration serviceConfiguration) {
        this.workerConfig = new WorkerConfig();
        this.workerConfig.setPulsarFunctionsNamespace(this.pulsarFunctionsNamespace);
        this.workerConfig.setSchedulerClassName(RoundRobinScheduler.class.getName());
        this.workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
        this.workerConfig.setFunctionRuntimeFactoryConfigs((Map) ObjectMapperFactory.getMapper().getObjectMapper().convertValue(new ThreadRuntimeFactoryConfig().setThreadGroupName("use"), Map.class));
        this.workerConfig.setPulsarServiceUrl("pulsar://127.0.0.1:" + String.valueOf(serviceConfiguration.getBrokerServicePortTls().get()));
        this.workerConfig.setPulsarWebServiceUrl("https://127.0.0.1:" + String.valueOf(serviceConfiguration.getWebServicePortTls().get()));
        this.workerConfig.setFailureCheckFreqMs(100L);
        this.workerConfig.setNumFunctionPackageReplicas(1);
        this.workerConfig.setClusterCoordinationTopicName("coordinate");
        this.workerConfig.setFunctionAssignmentTopicName("assignment");
        this.workerConfig.setFunctionMetadataTopicName("metadata");
        this.workerConfig.setInstanceLivenessCheckFreqMs(100L);
        this.workerConfig.setWorkerPort(0);
        this.workerConfig.setPulsarFunctionsCluster(serviceConfiguration.getClusterName());
        String defaultOrConfiguredAddress = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(serviceConfiguration.getAdvertisedAddress());
        this.workerConfig.setWorkerHostname(defaultOrConfiguredAddress);
        this.workerConfig.setWorkerId("c-" + serviceConfiguration.getClusterName() + "-fw-" + defaultOrConfiguredAddress + "-" + this.workerConfig.getWorkerPort());
        this.workerConfig.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
        this.workerConfig.setBrokerClientAuthenticationParameters(String.format("tlsCertFile:%s,tlsKeyFile:%s", this.TLS_CLIENT_CERT_FILE_PATH, this.TLS_CLIENT_KEY_FILE_PATH));
        this.workerConfig.setUseTls(true);
        this.workerConfig.setTlsTrustCertsFilePath(this.TLS_CLIENT_CERT_FILE_PATH);
        return new PulsarWorkerService();
    }
}
