package org.apache.kafka.connect.storage.amp;

import com.networknt.rule.RuleConstants;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Instant;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.http.client.utils.URIBuilder;
import org.apache.kafka.common.Confluent;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.storage.MetricsStore;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.jose4j.json.internal.json_simple.JSONArray;
import org.jose4j.json.internal.json_simple.JSONObject;
import org.jose4j.json.internal.json_simple.parser.JSONParser;
import org.jose4j.json.internal.json_simple.parser.ParseException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.http.HttpExecuteRequest;
import software.amazon.awssdk.http.HttpExecuteResponse;
import software.amazon.awssdk.http.SdkHttpClient;
import software.amazon.awssdk.http.SdkHttpFullRequest;
import software.amazon.awssdk.http.SdkHttpMethod;
import software.amazon.awssdk.http.apache.ApacheHttpClient;
import software.amazon.awssdk.http.auth.aws.signer.AwsV4HttpSigner;
import software.amazon.awssdk.http.auth.spi.signer.SignedRequest;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sts.StsClient;
import software.amazon.awssdk.services.sts.StsClientBuilder;
import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
import software.amazon.awssdk.services.sts.model.AssumeRoleResponse;
import software.amazon.awssdk.utils.IoUtils;

@Confluent
/* loaded from: input_file:org/apache/kafka/connect/storage/amp/AmazonManagedPrometheusMetricsStore.class */
public class AmazonManagedPrometheusMetricsStore implements MetricsStore {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) AmazonManagedPrometheusMetricsStore.class);
    private static final String ENDPOINT = "https://aps-workspaces.us-west-2.amazonaws.com";
    private static final String SERVICE_SIGNING_NAME = "aps";
    private static final String REGION_NAME = "us-west-2";
    private static final String AMP_SESSION_NAME = "connect-amp-session";
    private final String workerHeapUsageMetric;
    private final String workerCPULoadMetric;
    private final String taskCPULoadMetric;
    private final String workspaceId;
    private final String connectCluster;
    private final AwsV4HttpSigner signer = AwsV4HttpSigner.create();
    private final String arn;
    private SdkHttpClient httpClient;
    private StsClient stsClient;
    private AwsSessionCredentials sessionCredentials;
    private Instant credentialsExpiration;

    public AmazonManagedPrometheusMetricsStore(DistributedConfig distributedConfig) {
        this.workspaceId = distributedConfig.getString(DistributedConfig.CONNECT_AMP_METRICS_STORE_WORKSPACE_ID);
        this.connectCluster = ((String) distributedConfig.originals().get("group.id")).split("\\.")[0];
        this.arn = distributedConfig.getString(DistributedConfig.CONNECT_AMP_READER_ARN);
        this.workerCPULoadMetric = distributedConfig.getString(DistributedConfig.CONNECT_WORKER_CPU_LOAD_METRIC_NAME);
        this.workerHeapUsageMetric = distributedConfig.getString(DistributedConfig.CONNECT_WORKER_MEMORY_LOAD_METRIC_NAME);
        this.taskCPULoadMetric = distributedConfig.getString(DistributedConfig.CONNECT_TASK_CPU_LOAD_METRIC_NAME);
    }

    @Override // org.apache.kafka.connect.storage.MetricsStore
    public void start() throws Exception {
        validate();
        this.httpClient = ApacheHttpClient.create();
        this.stsClient = ((StsClientBuilder) ((StsClientBuilder) StsClient.builder().credentialsProvider((AwsCredentialsProvider) DefaultCredentialsProvider.create())).region(Region.US_WEST_2)).mo15196build();
    }

    @Override // org.apache.kafka.connect.storage.MetricsStore
    public void validate() throws IllegalArgumentException {
        if (Utils.isBlank(this.workspaceId)) {
            throw new IllegalArgumentException("Workspace ID is required.");
        }
        if (Utils.isBlank(this.connectCluster)) {
            throw new IllegalArgumentException("Connect cluster is required.");
        }
        if (Utils.isBlank(this.arn)) {
            throw new IllegalArgumentException("ARN is required.");
        }
        if (Utils.isBlank(this.workerCPULoadMetric)) {
            throw new IllegalArgumentException("Worker CPU load metric is required.");
        }
        if (Utils.isBlank(this.workerHeapUsageMetric)) {
            throw new IllegalArgumentException("Worker heap usage metric is required.");
        }
        if (Utils.isBlank(this.taskCPULoadMetric)) {
            throw new IllegalArgumentException("Task CPU load metric is required.");
        }
    }

    @Override // org.apache.kafka.connect.storage.MetricsStore
    public void stop() throws Exception {
        if (this.httpClient != null) {
            this.httpClient.close();
        }
        if (this.stsClient != null) {
            this.stsClient.close();
        }
        log.info("PrometheusMetricsStore stopped.");
    }

    @Override // org.apache.kafka.connect.storage.MetricsStore
    public Map<String, Double> getWorkersMemoryLoad() {
        try {
            return parseWorkerLoadResponse(executePrometheusQuery(buildWorkerPrometheusQuery(this.workerHeapUsageMetric, String.format("k8s_namespace_name=\"%s\", type=\"heap\"", this.connectCluster), false)));
        } catch (Exception e) {
            log.error("Error fetching workers memory load {}", (Throwable) e);
            return new HashMap();
        }
    }

    @Override // org.apache.kafka.connect.storage.MetricsStore
    public Map<String, Double> getWorkersCPULoad() {
        try {
            return parseWorkerLoadResponse(executePrometheusQuery(buildWorkerPrometheusQuery(this.workerCPULoadMetric, String.format("k8s_namespace_name=\"%s\"", this.connectCluster), true)));
        } catch (Exception e) {
            log.error("Error fetching workers cpu load {} ", (Throwable) e);
            return new HashMap();
        }
    }

    String buildWorkerPrometheusQuery(String str, String str2, boolean z) {
        String format = String.format("sum by(k8s_pod_name) (avg_over_time(%s{%s}[5m]))", str, str2);
        if (z) {
            format = format + "*100";
        }
        return format;
    }

    @Override // org.apache.kafka.connect.storage.MetricsStore
    public Map<String, Map<ConnectorTaskId, Double>> getTasksLoad() {
        try {
            return parseTaskLoadResponse(executePrometheusQuery(String.format("avg by (k8s_pod_name, connector, task) (avg_over_time(\"%s\"{k8s_namespace_name=\"%s\"}[5m]))", this.taskCPULoadMetric, this.connectCluster)));
        } catch (Exception e) {
            log.error("Error fetching task load: ", (Throwable) e);
            return new HashMap();
        }
    }

    private AwsSessionCredentials getSessionCredentials() {
        if (this.sessionCredentials == null || credentialsExpired()) {
            if (this.sessionCredentials == null) {
                log.info("Fetching session credentials for the first time.");
            } else {
                log.info("Session credentials expired. Fetching new session credentials.");
            }
            AssumeRoleResponse assumeRole = this.stsClient.assumeRole((AssumeRoleRequest) AssumeRoleRequest.builder().roleArn(this.arn).roleSessionName(AMP_SESSION_NAME).mo15196build());
            this.sessionCredentials = AwsSessionCredentials.create(assumeRole.credentials().accessKeyId(), assumeRole.credentials().secretAccessKey(), assumeRole.credentials().sessionToken());
            this.credentialsExpiration = assumeRole.credentials().expiration();
        }
        return this.sessionCredentials;
    }

    private boolean credentialsExpired() {
        return this.credentialsExpiration == null || Instant.now().isAfter(this.credentialsExpiration);
    }

    String executePrometheusQuery(String str) throws URISyntaxException {
        log.debug("Executing prometheus query: {}", str);
        URI build = new URIBuilder(new URI("https://aps-workspaces.us-west-2.amazonaws.com/workspaces/" + this.workspaceId + "/api/v1/query")).addParameter("query", str).build();
        AwsSessionCredentials sessionCredentials = getSessionCredentials();
        SdkHttpFullRequest mo15196build = SdkHttpFullRequest.builder().method(SdkHttpMethod.GET).uri(build).mo15196build();
        SignedRequest sign = this.signer.sign(builder -> {
            builder.identity(sessionCredentials).request(mo15196build).putProperty(AwsV4HttpSigner.SERVICE_SIGNING_NAME, SERVICE_SIGNING_NAME).putProperty(AwsV4HttpSigner.REGION_NAME, REGION_NAME);
        });
        try {
            HttpExecuteResponse call = this.httpClient.prepareRequest(HttpExecuteRequest.builder().request(sign.request()).contentStreamProvider(sign.payload().orElse(null)).build()).call();
            if (call.httpResponse().statusCode() == 200) {
                return IoUtils.toUtf8String(call.responseBody().orElseThrow(() -> {
                    return new RuntimeException("No Response Body found");
                }));
            }
            log.error("AMP query  {} request failed with status code {}", str, Integer.valueOf(call.httpResponse().statusCode()));
            throw new RuntimeException("Request failed with status code: " + call.httpResponse().toString());
        } catch (Exception e) {
            log.error("Error executing prometheus query: ", (Throwable) e);
            return null;
        }
    }

    private Map<String, Double> parseWorkerLoadResponse(String str) throws ParseException {
        if (Utils.isBlank(str)) {
            return new HashMap();
        }
        JSONParser jSONParser = new JSONParser();
        log.debug("Parsing worker load response: {}", str);
        JSONArray jSONArray = (JSONArray) ((JSONObject) ((JSONObject) jSONParser.parse(str)).get("data")).get(RuleConstants.RESULT);
        HashMap hashMap = new HashMap();
        Iterator it = jSONArray.iterator();
        while (it.hasNext()) {
            hashMap.put((String) ((JSONObject) ((JSONObject) it.next()).get("metric")).get("k8s_pod_name"), Double.valueOf(Math.round(Double.parseDouble((String) ((JSONArray) r0.get("value")).get(1)) * 100.0d) / 100.0d));
        }
        return hashMap;
    }

    private Map<String, Map<ConnectorTaskId, Double>> parseTaskLoadResponse(String str) throws ParseException {
        if (Utils.isBlank(str)) {
            return new HashMap();
        }
        log.debug("Parsing task load response: {}", str);
        JSONArray jSONArray = (JSONArray) ((JSONObject) ((JSONObject) new JSONParser().parse(str)).get("data")).get(RuleConstants.RESULT);
        HashMap hashMap = new HashMap();
        Iterator it = jSONArray.iterator();
        while (it.hasNext()) {
            JSONObject jSONObject = (JSONObject) it.next();
            JSONObject jSONObject2 = (JSONObject) jSONObject.get("metric");
            String str2 = (String) jSONObject2.get("k8s_pod_name");
            ConnectorTaskId connectorTaskId = new ConnectorTaskId(jSONObject2.get("connector").toString(), Integer.parseInt(jSONObject2.get("task").toString()));
            double parseDouble = Double.parseDouble((String) ((JSONArray) jSONObject.get("value")).get(1));
            hashMap.computeIfAbsent(str2, str3 -> {
                return new HashMap();
            });
            ((Map) hashMap.get(str2)).put(connectorTaskId, Double.valueOf(parseDouble));
        }
        return hashMap;
    }
}
