package org.apache.pulsar.functions.runtime.thread;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.Thread;
import java.util.Arrays;
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.instance.InstanceUtils;
import org.apache.pulsar.functions.instance.JavaInstanceRunnable;
import org.apache.pulsar.functions.instance.stats.FunctionCollectorRegistry;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.runtime.Runtime;
import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
import org.apache.pulsar.functions.worker.ConnectorsManager;
import org.apache.pulsar.functions.worker.FunctionsManager;
import org.apache.pulsar.shade.org.apache.pulsar.common.nar.FileUtils;
import org.apache.pulsar.shade.org.apache.zookeeper.server.util.JvmPauseMonitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/functions/runtime/thread/ThreadRuntime.class */
public class ThreadRuntime implements Runtime {
    private static final Logger log = LoggerFactory.getLogger(ThreadRuntime.class);
    private Thread fnThread;
    private static final int THREAD_SHUTDOWN_TIMEOUT_MILLIS = 10000;
    private final InstanceConfig instanceConfig;
    private JavaInstanceRunnable javaInstanceRunnable;
    private final ThreadGroup threadGroup;
    private final FunctionCacheManager fnCache;
    private final String jarFile;
    private final ClientBuilder clientBuilder;
    private final PulsarClient pulsarClient;
    private final PulsarAdmin pulsarAdmin;
    private final String stateStorageImplClass;
    private final String stateStorageServiceUrl;
    private final SecretsProvider secretsProvider;
    private final FunctionCollectorRegistry collectorRegistry;
    private final String narExtractionDirectory;
    private final Optional<ConnectorsManager> connectorsManager;
    private final Optional<FunctionsManager> functionsManager;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ThreadRuntime(InstanceConfig instanceConfig, FunctionCacheManager functionCacheManager, ThreadGroup threadGroup, String str, PulsarClient pulsarClient, ClientBuilder clientBuilder, PulsarAdmin pulsarAdmin, String str2, String str3, SecretsProvider secretsProvider, FunctionCollectorRegistry functionCollectorRegistry, String str4, Optional<ConnectorsManager> optional, Optional<FunctionsManager> optional2) {
        this.instanceConfig = instanceConfig;
        if (instanceConfig.getFunctionDetails().getRuntime() != Function.FunctionDetails.Runtime.JAVA) {
            throw new RuntimeException("Thread Container only supports Java Runtime");
        }
        this.threadGroup = threadGroup;
        this.fnCache = functionCacheManager;
        this.jarFile = str;
        this.clientBuilder = clientBuilder;
        this.pulsarClient = pulsarClient;
        this.pulsarAdmin = pulsarAdmin;
        this.stateStorageImplClass = str2;
        this.stateStorageServiceUrl = str3;
        this.secretsProvider = secretsProvider;
        this.collectorRegistry = functionCollectorRegistry;
        this.narExtractionDirectory = str4;
        this.connectorsManager = optional;
        this.functionsManager = optional2;
    }

    private static ClassLoader getFunctionClassLoader(InstanceConfig instanceConfig, String str, String str2, FunctionCacheManager functionCacheManager, Optional<ConnectorsManager> optional, Optional<FunctionsManager> optional2) throws Exception {
        if (FunctionCommon.isFunctionCodeBuiltin(instanceConfig.getFunctionDetails())) {
            Function.FunctionDetails.ComponentType calculateSubjectType = InstanceUtils.calculateSubjectType(instanceConfig.getFunctionDetails());
            if (calculateSubjectType == Function.FunctionDetails.ComponentType.FUNCTION && optional2.isPresent()) {
                return optional2.get().getFunction(instanceConfig.getFunctionDetails().getBuiltin()).getFunctionPackage().getClassLoader();
            }
            if (calculateSubjectType == Function.FunctionDetails.ComponentType.SOURCE && optional.isPresent()) {
                return optional.get().getConnector(instanceConfig.getFunctionDetails().getSource().getBuiltin()).getConnectorFunctionPackage().getClassLoader();
            }
            if (calculateSubjectType == Function.FunctionDetails.ComponentType.SINK && optional.isPresent()) {
                return optional.get().getConnector(instanceConfig.getFunctionDetails().getSink().getBuiltin()).getConnectorFunctionPackage().getClassLoader();
            }
        }
        return loadJars(str, instanceConfig, instanceConfig.getFunctionDetails().getName(), str2, functionCacheManager);
    }

    public static ClassLoader loadJars(String str, InstanceConfig instanceConfig, String str2, String str3, FunctionCacheManager functionCacheManager) throws Exception {
        if (str == null) {
            return Thread.currentThread().getContextClassLoader();
        }
        boolean z = false;
        if (FileUtils.mayBeANarArchive(new File(str))) {
            try {
                log.info("Trying Loading file as NAR file: {}", str);
                functionCacheManager.registerFunctionInstanceWithArchive(instanceConfig.getFunctionId(), instanceConfig.getInstanceName(), str, str3);
                z = true;
            } catch (FileNotFoundException e) {
                log.error("The file {} does not look like a .nar file {}", str, e.toString());
            }
        }
        if (!z) {
            log.info("Load file as simple JAR file: {}", str);
            functionCacheManager.registerFunctionInstance(instanceConfig.getFunctionId(), instanceConfig.getInstanceName(), Arrays.asList(str), Collections.emptyList());
        }
        log.info("Initialize function class loader for function {} at function cache manager, functionClassLoader: {}", str2, functionCacheManager.getClassLoader(instanceConfig.getFunctionId()));
        ClassLoader classLoader = functionCacheManager.getClassLoader(instanceConfig.getFunctionId());
        if (null == classLoader) {
            throw new Exception("No function class loader available.");
        }
        return classLoader;
    }

    @Override // org.apache.pulsar.functions.runtime.Runtime
    public void start() throws Exception {
        this.javaInstanceRunnable = new JavaInstanceRunnable(this.instanceConfig, this.clientBuilder, this.pulsarClient, this.pulsarAdmin, this.stateStorageImplClass, this.stateStorageServiceUrl, this.secretsProvider, this.collectorRegistry, getFunctionClassLoader(this.instanceConfig, this.jarFile, this.narExtractionDirectory, this.fnCache, this.connectorsManager, this.functionsManager));
        log.info("ThreadContainer starting function with instanceId {} functionId {} namespace {}", new Object[]{Integer.valueOf(this.instanceConfig.getInstanceId()), this.instanceConfig.getFunctionId(), this.instanceConfig.getFunctionDetails().getNamespace()});
        this.fnThread = new Thread(this.threadGroup, this.javaInstanceRunnable, String.format("%s-%s", FunctionCommon.getFullyQualifiedName(this.instanceConfig.getFunctionDetails()), Integer.valueOf(this.instanceConfig.getInstanceId())));
        this.fnThread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { // from class: org.apache.pulsar.functions.runtime.thread.ThreadRuntime.1
            @Override // java.lang.Thread.UncaughtExceptionHandler
            public void uncaughtException(Thread thread, Throwable th) {
                ThreadRuntime.log.error("Uncaught exception in thread {}", thread, th);
            }
        });
        this.fnThread.start();
    }

    @Override // org.apache.pulsar.functions.runtime.Runtime
    public void join() throws Exception {
        if (this.fnThread != null) {
            this.fnThread.join();
        }
    }

    @Override // org.apache.pulsar.functions.runtime.Runtime
    public void stop() {
        if (this.fnThread != null) {
            this.fnThread.interrupt();
            try {
                this.fnThread.join(JvmPauseMonitor.WARN_THRESHOLD_DEFAULT, 0);
                if (this.fnThread.isAlive()) {
                    log.warn("The function instance thread is still alive after {} milliseconds. Giving up waiting and moving forward to close function.", 10000);
                }
            } catch (InterruptedException e) {
            }
            this.javaInstanceRunnable.close();
            log.info("Unloading JAR files for instanceId {} functionId {} namespace {}", new Object[]{Integer.valueOf(this.instanceConfig.getInstanceId()), this.instanceConfig.getFunctionId(), this.instanceConfig.getFunctionDetails().getNamespace()});
            this.fnCache.unregisterFunctionInstance(this.instanceConfig.getFunctionId(), this.instanceConfig.getInstanceName());
        }
    }

    @Override // org.apache.pulsar.functions.runtime.Runtime
    public CompletableFuture<InstanceCommunication.FunctionStatus> getFunctionStatus(int i) {
        CompletableFuture<InstanceCommunication.FunctionStatus> completableFuture = new CompletableFuture<>();
        if (isAlive()) {
            InstanceCommunication.FunctionStatus.Builder functionStatus = this.javaInstanceRunnable.getFunctionStatus();
            functionStatus.setRunning(true);
            completableFuture.complete(functionStatus.build());
            return completableFuture;
        }
        InstanceCommunication.FunctionStatus.Builder newBuilder = InstanceCommunication.FunctionStatus.newBuilder();
        newBuilder.setRunning(false);
        Throwable deathException = getDeathException();
        if (deathException != null && deathException.getMessage() != null) {
            newBuilder.setFailureException(deathException.getMessage());
        }
        completableFuture.complete(newBuilder.build());
        return completableFuture;
    }

    @Override // org.apache.pulsar.functions.runtime.Runtime
    public CompletableFuture<InstanceCommunication.MetricsData> getAndResetMetrics() {
        return CompletableFuture.completedFuture(this.javaInstanceRunnable.getAndResetMetrics());
    }

    @Override // org.apache.pulsar.functions.runtime.Runtime
    public CompletableFuture<InstanceCommunication.MetricsData> getMetrics(int i) {
        return CompletableFuture.completedFuture(this.javaInstanceRunnable.getMetrics());
    }

    @Override // org.apache.pulsar.functions.runtime.Runtime
    public String getPrometheusMetrics() throws IOException {
        if (this.javaInstanceRunnable == null) {
            throw new PulsarServerException("javaInstanceRunnable is not initialized");
        }
        return this.javaInstanceRunnable.getStatsAsString();
    }

    @Override // org.apache.pulsar.functions.runtime.Runtime
    public CompletableFuture<Void> resetMetrics() {
        this.javaInstanceRunnable.resetMetrics();
        return CompletableFuture.completedFuture(null);
    }

    @Override // org.apache.pulsar.functions.runtime.Runtime
    public boolean isAlive() {
        if (this.fnThread != null) {
            return this.fnThread.isAlive();
        }
        return false;
    }

    @Override // org.apache.pulsar.functions.runtime.Runtime
    public Throwable getDeathException() {
        if (isAlive() || null == this.javaInstanceRunnable) {
            return null;
        }
        return this.javaInstanceRunnable.getDeathException();
    }

    public InstanceConfig getInstanceConfig() {
        return this.instanceConfig;
    }
}
