package com.networknt.controller;

import com.networknt.config.Config;
import com.networknt.config.JsonMapper;
import com.networknt.kafka.common.AvroDeserializer;
import com.networknt.kafka.common.KafkaStreamsConfig;
import com.networknt.kafka.streams.LightStreams;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import net.lightapi.portal.controller.ControllerDeregisteredEvent;
import net.lightapi.portal.controller.ControllerRegisteredEvent;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyQueryMetadata;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsMetadata;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/networknt/controller/ServiceRegistrationStreams.class */
public class ServiceRegistrationStreams implements LightStreams {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) ServiceRegistrationStreams.class);
    static final KafkaStreamsConfig streamsConfig = (KafkaStreamsConfig) Config.getInstance().getJsonObjectConfig(KafkaStreamsConfig.CONFIG_NAME, KafkaStreamsConfig.class);
    static final ControllerConfig controllerConfig = (ControllerConfig) Config.getInstance().getJsonObjectConfig(ControllerConfig.CONFIG_NAME, ControllerConfig.class);
    private static final String service = "service-store";
    KafkaStreams serviceStreams;

    /* loaded from: input_file:com/networknt/controller/ServiceRegistrationStreams$ServiceEventProcessor.class */
    public static class ServiceEventProcessor extends AbstractProcessor<byte[], byte[]> {
        private ProcessorContext pc;
        private KeyValueStore<String, String> serviceStore;

        @Override // org.apache.kafka.streams.processor.AbstractProcessor, org.apache.kafka.streams.processor.Processor
        public void init(ProcessorContext processorContext) {
            this.pc = processorContext;
            this.serviceStore = (KeyValueStore) processorContext.getStateStore(ServiceRegistrationStreams.service);
            if (ServiceRegistrationStreams.logger.isInfoEnabled()) {
                ServiceRegistrationStreams.logger.info("Processor initialized");
            }
        }

        @Override // org.apache.kafka.streams.processor.Processor
        public void process(byte[] bArr, byte[] bArr2) {
            if (ServiceRegistrationStreams.logger.isDebugEnabled()) {
                ServiceRegistrationStreams.logger.debug("ServiceRegistrationStreams.process is called!");
            }
            try {
                Object deserialize = new AvroDeserializer(true).deserialize(bArr2);
                try {
                    if (deserialize instanceof ControllerRegisteredEvent) {
                        ControllerRegisteredEvent controllerRegisteredEvent = (ControllerRegisteredEvent) deserialize;
                        if (ServiceRegistrationStreams.logger.isTraceEnabled()) {
                            ServiceRegistrationStreams.logger.trace("Event = " + String.valueOf(controllerRegisteredEvent));
                        }
                        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
                        concurrentHashMap.put("protocol", controllerRegisteredEvent.getProtocol());
                        concurrentHashMap.put(ControllerConstants.ADDRESS, controllerRegisteredEvent.getAddress());
                        concurrentHashMap.put(ControllerConstants.PORT, Integer.valueOf(controllerRegisteredEvent.getPort()));
                        String str = this.serviceStore.get(controllerRegisteredEvent.getKey());
                        List<Map<String, Object>> arrayList = str == null ? new ArrayList() : JsonMapper.string2List(str);
                        arrayList.removeIf(map -> {
                            return map.get("protocol").equals(controllerRegisteredEvent.getProtocol()) && map.get(ControllerConstants.ADDRESS).equals(controllerRegisteredEvent.getAddress()) && controllerRegisteredEvent.getPort() == ((Integer) map.get(ControllerConstants.PORT)).intValue();
                        });
                        arrayList.add(concurrentHashMap);
                        this.serviceStore.put(controllerRegisteredEvent.getKey(), JsonMapper.toJson(arrayList));
                    } else if (deserialize instanceof ControllerDeregisteredEvent) {
                        ControllerDeregisteredEvent controllerDeregisteredEvent = (ControllerDeregisteredEvent) deserialize;
                        if (ServiceRegistrationStreams.logger.isTraceEnabled()) {
                            ServiceRegistrationStreams.logger.trace("Event = " + String.valueOf(controllerDeregisteredEvent));
                        }
                        String str2 = this.serviceStore.get(controllerDeregisteredEvent.getKey());
                        if (str2 != null) {
                            List<Map<String, Object>> string2List = JsonMapper.string2List(str2);
                            String protocol = controllerDeregisteredEvent.getProtocol();
                            String address = controllerDeregisteredEvent.getAddress();
                            int port = controllerDeregisteredEvent.getPort();
                            string2List.removeIf(map2 -> {
                                return map2.get("protocol").equals(protocol) && map2.get(ControllerConstants.ADDRESS).equals(address) && port == ((Integer) map2.get(ControllerConstants.PORT)).intValue();
                            });
                            str2 = string2List.size() == 0 ? null : JsonMapper.toJson(string2List);
                        }
                        if (str2 == null) {
                            this.serviceStore.delete(controllerDeregisteredEvent.getKey());
                        } else {
                            this.serviceStore.put(controllerDeregisteredEvent.getKey(), str2);
                        }
                    }
                } catch (Exception e) {
                    ServiceRegistrationStreams.logger.error("Exception:", (Throwable) e);
                }
            } catch (Exception e2) {
                ServiceRegistrationStreams.logger.error("Exception:", (Throwable) e2);
            }
        }

        @Override // org.apache.kafka.streams.processor.AbstractProcessor, org.apache.kafka.streams.processor.Processor
        public void close() {
            if (ServiceRegistrationStreams.logger.isInfoEnabled()) {
                ServiceRegistrationStreams.logger.info("Closing processor...");
            }
        }
    }

    public ServiceRegistrationStreams() {
        logger.info("ServiceRegistrationStreams is created");
    }

    public ReadOnlyKeyValueStore<String, String> getServiceStore() {
        return (ReadOnlyKeyValueStore) this.serviceStreams.store(StoreQueryParameters.fromNameAndType(service, QueryableStoreTypes.keyValueStore()));
    }

    public KeyQueryMetadata getServiceStreamsMetadata(String str) {
        return this.serviceStreams.queryMetadataForKey(service, str, Serdes.String().serializer());
    }

    public Collection<StreamsMetadata> getAllServiceStreamsMetadata() {
        return this.serviceStreams.streamsMetadataForStore(service);
    }

    private void startServiceStreams(String str, int i) {
        StoreBuilder<?> keyValueStoreBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(service), Serdes.String(), Serdes.String());
        Topology topology = new Topology();
        topology.addSource("SourceTopicProcessor", controllerConfig.getTopic());
        topology.addProcessor("ServiceEventProcessor", ServiceEventProcessor::new, "SourceTopicProcessor");
        topology.addStateStore(keyValueStoreBuilder, "ServiceEventProcessor");
        Properties properties = new Properties();
        properties.putAll(streamsConfig.getProperties());
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, controllerConfig.getRegistryApplicationId());
        properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, str + ":" + i);
        properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass());
        properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass());
        this.serviceStreams = new KafkaStreams(topology, properties);
        this.serviceStreams.setUncaughtExceptionHandler(th -> {
            logger.error("Kafka-Streams uncaught exception occurred. Stream will be replaced with new thread", th);
            return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
        });
        if (streamsConfig.isCleanUp()) {
            this.serviceStreams.cleanUp();
        }
        this.serviceStreams.start();
    }

    @Override // com.networknt.kafka.streams.LightStreams
    public void start(String str, int i) {
        if (logger.isDebugEnabled()) {
            logger.debug("ServiceStreams is starting...");
        }
        startServiceStreams(str, i);
    }

    @Override // com.networknt.kafka.streams.LightStreams
    public void close() {
        if (logger.isDebugEnabled()) {
            logger.debug("ServiceStreams is closing...");
        }
        this.serviceStreams.close();
    }
}
