package com.networknt.mesh.kafka.handler;

import com.networknt.client.Http2Client;
import com.networknt.client.simplepool.SimpleConnectionHolder;
import com.networknt.config.Config;
import com.networknt.handler.LightHttpHandler;
import com.networknt.health.HealthConfig;
import com.networknt.mesh.kafka.AdminClientStartupHook;
import com.networknt.mesh.kafka.ProducerStartupHook;
import com.networknt.mesh.kafka.ReactiveConsumerStartupHook;
import com.networknt.server.StartupHookProvider;
import com.networknt.service.SingletonServiceFactory;
import io.undertow.UndertowOptions;
import io.undertow.client.ClientConnection;
import io.undertow.client.ClientRequest;
import io.undertow.client.ClientResponse;
import io.undertow.server.HttpServerExchange;
import io.undertow.util.Headers;
import io.undertow.util.Methods;
import java.net.URI;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.admin.DescribeClusterResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xnio.OptionMap;

/* loaded from: input_file:com/networknt/mesh/kafka/handler/SidecarHealthHandler.class */
public class SidecarHealthHandler implements LightHttpHandler {
    public static final String HEALTH_RESULT_OK = "OK";
    public static final String HEALTH_RESULT_ERROR = "ERROR";
    static final Logger logger = LoggerFactory.getLogger((Class<?>) SidecarHealthHandler.class);
    static final HealthConfig config = (HealthConfig) Config.getInstance().getJsonObjectConfig(HealthConfig.CONFIG_NAME, HealthConfig.class);
    public static Http2Client client = Http2Client.getInstance();

    @Override // io.undertow.server.HttpHandler
    public void handleRequest(HttpServerExchange httpServerExchange) throws Exception {
        String str = "OK";
        StartupHookProvider[] startupHookProviderArr = (StartupHookProvider[]) SingletonServiceFactory.getBeans(StartupHookProvider.class);
        if (startupHookProviderArr == null || startupHookProviderArr.length <= 0) {
            logger.error("No startup hook is defined and none of the component is enabled.");
            str = "ERROR";
        } else {
            for (StartupHookProvider startupHookProvider : startupHookProviderArr) {
                if ((startupHookProvider instanceof ProducerStartupHook) && ProducerStartupHook.producer == null) {
                    logger.error("Producer is enabled but it is not connected to the Kafka cluster.");
                    str = "ERROR";
                }
                if (startupHookProvider instanceof ReactiveConsumerStartupHook) {
                    if (ReactiveConsumerStartupHook.kafkaConsumerManager == null || !ReactiveConsumerStartupHook.healthy) {
                        logger.error("ReactiveConsumer is enabled but it is not connected to the Kafka cluster or it is marked as unhealthy.");
                        str = "ERROR";
                    }
                    if ("OK".equals(str) && config.isDownstreamEnabled()) {
                        str = backendHealth();
                    }
                }
                if (startupHookProvider instanceof AdminClientStartupHook) {
                    if (AdminClientStartupHook.admin == null) {
                        logger.error("AdminClient is enabled but it is not connected to the Kafka cluster.");
                        str = "ERROR";
                    } else {
                        str = kafkaHealth();
                    }
                }
                if (str.equals("ERROR")) {
                    break;
                }
            }
        }
        if ("ERROR".equals(str)) {
            httpServerExchange.setStatusCode(400);
            httpServerExchange.getResponseSender().send("ERROR");
        } else {
            httpServerExchange.setStatusCode(200);
            httpServerExchange.getResponseSender().send("OK");
        }
    }

    public static String backendHealth() {
        String str = "OK";
        long currentTimeMillis = System.currentTimeMillis();
        SimpleConnectionHolder.ConnectionToken connectionToken = null;
        try {
            try {
                connectionToken = config.getDownstreamHost().startsWith("https") ? client.borrow(new URI(config.getDownstreamHost()), Http2Client.WORKER, client.getDefaultXnioSsl(), Http2Client.BUFFER_POOL, OptionMap.create(UndertowOptions.ENABLE_HTTP2, true)) : client.borrow(new URI(config.getDownstreamHost()), Http2Client.WORKER, Http2Client.BUFFER_POOL, OptionMap.EMPTY);
                ClientConnection clientConnection = (ClientConnection) connectionToken.getRawConnection();
                CountDownLatch countDownLatch = new CountDownLatch(1);
                AtomicReference<ClientResponse> atomicReference = new AtomicReference<>();
                try {
                    ClientRequest path = new ClientRequest().setMethod(Methods.GET).setPath(config.getDownstreamPath());
                    path.getRequestHeaders().put(Headers.HOST, "localhost");
                    if (logger.isDebugEnabled()) {
                        logger.debug("Header information printed in HealthCheck {}", path.getRequestHeaders().toString());
                    }
                    clientConnection.sendRequest(path, ReactiveConsumerStartupHook.client.createClientCallback(atomicReference, countDownLatch));
                    countDownLatch.await(config.getTimeout(), TimeUnit.MILLISECONDS);
                    int responseCode = atomicReference.get().getResponseCode();
                    String str2 = (String) atomicReference.get().getAttachment(Http2Client.RESPONSE_BODY);
                    if (logger.isDebugEnabled()) {
                        logger.debug("statusCode = {} body  = {}", Integer.valueOf(responseCode), str2);
                    }
                    if (responseCode >= 400) {
                        logger.error("Error due to error response from backend with status code = {} body = {}", Integer.valueOf(responseCode), str2);
                        str = "ERROR";
                    }
                } catch (Exception e) {
                    logger.error("Error while sending a health check request to the backend with exception: ", (Throwable) e);
                    connectionToken.holder().safeClose(System.currentTimeMillis());
                    str = "ERROR";
                }
                long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
                if (logger.isDebugEnabled()) {
                    logger.debug("Downstream health check response time = {}", Long.valueOf(currentTimeMillis2));
                }
                String str3 = str;
                client.restore(connectionToken);
                return str3;
            } catch (Exception e2) {
                logger.error("Could not create connection to the backend: " + config.getDownstreamHost() + ":", (Throwable) e2);
                client.restore(connectionToken);
                return "ERROR";
            }
        } catch (Throwable th) {
            client.restore(connectionToken);
            throw th;
        }
    }

    private String kafkaHealth() {
        String str = "OK";
        try {
            DescribeClusterResult describeCluster = AdminClientStartupHook.admin.describeCluster();
            boolean z = !describeCluster.nodes().get((long) config.getTimeout(), TimeUnit.MILLISECONDS).isEmpty();
            boolean z2 = describeCluster.clusterId() != null;
            boolean z3 = describeCluster.controller().get((long) config.getTimeout(), TimeUnit.MILLISECONDS) != null;
            if (!z) {
                logger.error("no nodes found for the Kafka Cluster");
                str = "ERROR";
            }
            if (!z2) {
                logger.error("no cluster id available for the Kafka Cluster");
                str = "ERROR";
            }
            if (!z3) {
                logger.error("no active controller exists for the Kafka Cluster");
                str = "ERROR";
            }
        } catch (Exception e) {
            logger.error("Error describing Kafka Cluster", (Throwable) e);
            str = "ERROR";
        }
        return str;
    }
}
