package com.networknt.controller;

import com.networknt.config.Config;
import com.networknt.config.JsonMapper;
import com.networknt.kafka.common.AvroDeserializer;
import com.networknt.kafka.common.AvroSerializer;
import com.networknt.kafka.common.EventId;
import com.networknt.kafka.common.KafkaStreamsConfig;
import com.networknt.kafka.streams.LightStreams;
import com.networknt.scheduler.DefinitionAction;
import com.networknt.scheduler.TaskDefinition;
import com.networknt.scheduler.TaskDefinitionKey;
import com.networknt.scheduler.TaskFrequency;
import com.networknt.utility.TimeUtil;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import net.lightapi.portal.controller.ControllerDeregisteredEvent;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyQueryMetadata;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsMetadata;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/networknt/controller/HealthCheckStreams.class */
public class HealthCheckStreams implements LightStreams {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) HealthCheckStreams.class);
    static final KafkaStreamsConfig streamsConfig = (KafkaStreamsConfig) Config.getInstance().getJsonObjectConfig(KafkaStreamsConfig.CONFIG_NAME, KafkaStreamsConfig.class);
    static final ControllerConfig controllerConfig = (ControllerConfig) Config.getInstance().getJsonObjectConfig(ControllerConfig.CONFIG_NAME, ControllerConfig.class);
    private static final String health = "health-store";
    final Serde keySpecificAvroSerde = new SpecificAvroSerde();
    KafkaStreams healthStreams;

    /* loaded from: input_file:com/networknt/controller/HealthCheckStreams$HealthCheckProcessor.class */
    public static class HealthCheckProcessor extends AbstractProcessor<byte[], byte[]> {
        private ProcessorContext pc;
        private KeyValueStore<String, String> healthStore;

        @Override // org.apache.kafka.streams.processor.AbstractProcessor, org.apache.kafka.streams.processor.Processor
        public void init(ProcessorContext processorContext) {
            this.pc = processorContext;
            this.healthStore = (KeyValueStore) processorContext.getStateStore(HealthCheckStreams.health);
            if (HealthCheckStreams.logger.isInfoEnabled()) {
                HealthCheckStreams.logger.info("Processor initialized");
            }
        }

        @Override // org.apache.kafka.streams.processor.Processor
        public void process(byte[] bArr, byte[] bArr2) {
            if (HealthCheckStreams.logger.isDebugEnabled()) {
                HealthCheckStreams.logger.debug("HealthCheckStreams.process is called!");
            }
            try {
                Object deserialize = new AvroDeserializer(true).deserialize(bArr2);
                try {
                    if (deserialize instanceof TaskDefinition) {
                        TaskDefinition taskDefinition = (TaskDefinition) deserialize;
                        if (HealthCheckStreams.logger.isTraceEnabled()) {
                            HealthCheckStreams.logger.trace("Task Definition = " + String.valueOf(taskDefinition));
                        }
                        Map<String, String> data = taskDefinition.getData();
                        switch (taskDefinition.getAction()) {
                            case INSERT:
                                long oneTimeUnitMillisecond = TimeUtil.oneTimeUnitMillisecond(TimeUnit.valueOf(taskDefinition.getFrequency().getTimeUnit().name())) * taskDefinition.getFrequency().getTime() * 2;
                                if (HealthCheckStreams.logger.isTraceEnabled()) {
                                    Logger logger = HealthCheckStreams.logger;
                                    logger.trace("current = " + System.currentTimeMillis() + " task start = " + logger + " gracePeriod = " + taskDefinition.getStart());
                                }
                                if (System.currentTimeMillis() - taskDefinition.getStart() < oneTimeUnitMillisecond) {
                                    String str = this.healthStore.get(data.get("id"));
                                    Map<String, String> hashMap = str != null ? (Map) JsonMapper.string2Map(str).entrySet().stream().collect(Collectors.toMap((v0) -> {
                                        return v0.getKey();
                                    }, entry -> {
                                        return (String) entry.getValue();
                                    })) : new HashMap(data);
                                    String str2 = data.get("healthPath");
                                    if (str2 != null) {
                                        boolean checkHealth = ControllerClient.checkHealth(data.get("protocol"), data.get(ControllerConstants.ADDRESS), Integer.valueOf(data.get(ControllerConstants.PORT)).intValue(), str2, data.get("serviceId"));
                                        hashMap.put("lastExecuteTimestamp", String.valueOf(System.currentTimeMillis()));
                                        if (checkHealth) {
                                            hashMap.put("lastFailedTimestamp", "0");
                                            hashMap.put("executeInterval", data.get("interval"));
                                        } else {
                                            if ("0".equals(hashMap.get("lastFailedTimestamp"))) {
                                                hashMap.put("lastFailedTimestamp", String.valueOf(System.currentTimeMillis()));
                                            }
                                            hashMap.put("executeInterval", String.valueOf(Integer.valueOf(hashMap.get("executeInterval")).intValue() * 2));
                                        }
                                        if (!"0".equals(hashMap.get("lastFailedTimestamp"))) {
                                            removeNode(hashMap);
                                        }
                                    } else if ("0".equals(hashMap.get("lastFailedTimestamp"))) {
                                        hashMap.put("lastFailedTimestamp", String.valueOf(System.currentTimeMillis()));
                                    } else {
                                        removeNode(hashMap);
                                    }
                                    this.healthStore.put(data.get("id"), JsonMapper.toJson(hashMap));
                                    break;
                                }
                                break;
                            case UPDATE:
                                String str3 = this.healthStore.get(data.get("id"));
                                if (str3 != null) {
                                    Map map = (Map) JsonMapper.string2Map(str3).entrySet().stream().collect(Collectors.toMap((v0) -> {
                                        return v0.getKey();
                                    }, entry2 -> {
                                        return (String) entry2.getValue();
                                    }));
                                    if ("true".equals(data.get("pass"))) {
                                        if (!"0".equals(map.get("lastFailedTimestamp"))) {
                                            map.put("lastFailedTimestamp", "0");
                                        }
                                        map.put("lastExecuteTimestamp", String.valueOf(System.currentTimeMillis()));
                                    } else {
                                        String valueOf = String.valueOf(System.currentTimeMillis());
                                        if ("0".equals(map.get("lastFailedTimestamp"))) {
                                            map.put("lastFailedTimestamp", valueOf);
                                        }
                                        map.put("lastExecuteTimestamp", valueOf);
                                    }
                                    this.healthStore.put(data.get("id"), JsonMapper.toJson(map));
                                    break;
                                }
                                break;
                        }
                    }
                } catch (Exception e) {
                    HealthCheckStreams.logger.error("Exception:", (Throwable) e);
                }
            } catch (Exception e2) {
                HealthCheckStreams.logger.error("Exception:", (Throwable) e2);
            }
        }

        @Override // org.apache.kafka.streams.processor.AbstractProcessor, org.apache.kafka.streams.processor.Processor
        public void close() {
            if (HealthCheckStreams.logger.isInfoEnabled()) {
                HealthCheckStreams.logger.info("Closing processor...");
            }
        }

        private void removeNode(Map<String, String> map) {
            if (System.currentTimeMillis() - Long.valueOf(map.get("deregisterCriticalServiceAfter")).longValue() > Long.valueOf(map.get("lastFailedTimestamp")).longValue()) {
                String str = map.get("tag") == null ? map.get("serviceId") : map.get("serviceId") + "|" + map.get("tag");
                ControllerDeregisteredEvent build = ControllerDeregisteredEvent.newBuilder().setEventId(EventId.newBuilder().setId(str).setNonce(ControllerConstants.NONCE).setTimestamp(System.currentTimeMillis()).build()).setHostId(ControllerStartupHook.config.getHostId()).setKey(str).setServiceId(map.get("serviceId")).setProtocol(map.get("protocol")).setTag(map.get("tag") == null ? null : map.get("tag")).setAddress(map.get(ControllerConstants.ADDRESS)).setPort(Integer.valueOf(map.get(ControllerConstants.PORT)).intValue()).build();
                AvroSerializer avroSerializer = new AvroSerializer();
                this.pc.forward(str.getBytes(StandardCharsets.UTF_8), avroSerializer.serialize(build), To.child("ServiceEventProcessor"));
                String str2 = str + ":" + map.get("protocol") + ":" + map.get(ControllerConstants.ADDRESS) + ":" + map.get(ControllerConstants.PORT);
                this.pc.forward(avroSerializer.serialize(TaskDefinitionKey.newBuilder().setName(str2).setHost(ControllerStartupHook.config.getHostId()).build()), avroSerializer.serialize(TaskDefinition.newBuilder().setName(str2).setHost(ControllerStartupHook.config.getHostId()).setAction(DefinitionAction.DELETE).setTopic(ControllerStartupHook.config.getHealthCheckTopic()).setFrequency(TaskFrequency.newBuilder().setTimeUnit(com.networknt.scheduler.TimeUnit.SECONDS).setTime(map.get("checkInterval") == null ? ControllerConstants.CHECK_FREQUENCY : Integer.valueOf(map.get("checkInterval")).intValue() / 1000).build()).setStart(System.currentTimeMillis()).build()), To.child("SchedulerProcessor"));
            }
        }
    }

    public HealthCheckStreams() {
        logger.info("HealthCheckStreams is created");
    }

    public ReadOnlyKeyValueStore<String, String> getHealthStore() {
        return (ReadOnlyKeyValueStore) this.healthStreams.store(StoreQueryParameters.fromNameAndType(health, QueryableStoreTypes.keyValueStore()));
    }

    public KeyQueryMetadata getHealthStreamsMetadata(TaskDefinitionKey taskDefinitionKey) {
        return this.healthStreams.queryMetadataForKey(health, (String) taskDefinitionKey, (Serializer<String>) this.keySpecificAvroSerde.serializer());
    }

    public Collection<StreamsMetadata> getAllHealthStreamsMetadata() {
        return this.healthStreams.streamsMetadataForStore(health);
    }

    private void startHealthStreams(String str, int i) {
        StoreBuilder<?> keyValueStoreBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(health), Serdes.String(), Serdes.String());
        Topology topology = new Topology();
        topology.addSource("SourceTopicProcessor", controllerConfig.getHealthCheckTopic());
        topology.addProcessor("HealthCheckProcessor", HealthCheckProcessor::new, "SourceTopicProcessor");
        topology.addStateStore(keyValueStoreBuilder, "HealthCheckProcessor");
        topology.addSink("ServiceEventProcessor", ControllerStartupHook.config.getTopic(), "HealthCheckProcessor");
        topology.addSink("SchedulerProcessor", ControllerStartupHook.config.getSchedulerTopic(), "HealthCheckProcessor");
        Properties properties = new Properties();
        properties.putAll(streamsConfig.getProperties());
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, controllerConfig.getHealthApplicationId());
        properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, str + ":" + i);
        properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass());
        properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass());
        this.healthStreams = new KafkaStreams(topology, properties);
        this.healthStreams.setUncaughtExceptionHandler(th -> {
            logger.error("Kafka-Streams uncaught exception occurred. Stream will be replaced with new thread", th);
            return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
        });
        this.keySpecificAvroSerde.configure(streamsConfig.getProperties(), true);
        if (streamsConfig.isCleanUp()) {
            this.healthStreams.cleanUp();
        }
        this.healthStreams.start();
    }

    @Override // com.networknt.kafka.streams.LightStreams
    public void start(String str, int i) {
        if (logger.isDebugEnabled()) {
            logger.debug("HealthCheckStreams is starting...");
        }
        startHealthStreams(str, i);
    }

    @Override // com.networknt.kafka.streams.LightStreams
    public void close() {
        if (logger.isDebugEnabled()) {
            logger.debug("HealthCheckStreams is closing...");
        }
        this.healthStreams.close();
    }
}
