package com.networknt.mesh.kafka.streams;

import com.networknt.config.Config;
import com.networknt.kafka.common.KafkaStreamsConfig;
import com.networknt.kafka.streams.LightStreams;
import com.networknt.mesh.kafka.util.StreamsFactory;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/networknt/mesh/kafka/streams/MessageReplayStreams.class */
public class MessageReplayStreams implements LightStreams {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) MessageReplayStreams.class);
    static final KafkaStreamsConfig replayStreamsConfig = (KafkaStreamsConfig) Config.getInstance().getJsonObjectConfigNoCache(KafkaStreamsConfig.CONFIG_NAME, KafkaStreamsConfig.class);
    private KafkaStreams kafkaStreams;

    @Override // com.networknt.kafka.streams.LightStreams
    public void start(String str, int i) {
        Properties properties = new Properties();
        replayStreamsConfig.getProperties().put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        replayStreamsConfig.getProperties().put(StreamsConfig.APPLICATION_ID_CONFIG, replayStreamsConfig.getProperties().get(StreamsConfig.APPLICATION_ID_CONFIG).toString().concat("-replaystream"));
        replayStreamsConfig.getProperties().put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false");
        properties.putAll(replayStreamsConfig.getProperties());
        properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, str + ":" + i);
        properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        MessageReplayStreamTopology messageReplayStreamTopology = new MessageReplayStreamTopology();
        Topology buildReplayTopology = messageReplayStreamTopology.buildReplayTopology();
        try {
            this.kafkaStreams = StreamsFactory.createKafkaStreams(buildReplayTopology, properties);
            this.kafkaStreams.setUncaughtExceptionHandler(th -> {
                logger.error("Kafka-Streams uncaught exception occurred. Stream will be replaced with new thread", th);
                return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
            });
            if (replayStreamsConfig.isCleanUp()) {
                this.kafkaStreams.cleanUp();
            }
            this.kafkaStreams = startStream(str, i, buildReplayTopology, replayStreamsConfig, messageReplayStreamTopology.getDlqTopicMetadataMap(), MessageReplayStreamTopology.replayMetadataProcessor);
        } catch (Exception e) {
            logger.error(e.getMessage());
            this.kafkaStreams = null;
        }
    }

    @Override // com.networknt.kafka.streams.LightStreams
    public void close() {
        if (this.kafkaStreams != null) {
            this.kafkaStreams.close();
        }
    }
}
