package com.networknt.mesh.kafka.handler;

import com.networknt.body.BodyHandler;
import com.networknt.config.Config;
import com.networknt.config.JsonMapper;
import com.networknt.handler.LightHttpHandler;
import com.networknt.kafka.common.KafkaConsumerConfig;
import com.networknt.kafka.common.KafkaProducerConfig;
import com.networknt.kafka.common.KafkaStreamsConfig;
import com.networknt.kafka.entity.AuditRecord;
import com.networknt.kafka.entity.EmbeddedFormat;
import com.networknt.kafka.entity.ProduceRecord;
import com.networknt.kafka.entity.ProduceRequest;
import com.networknt.kafka.entity.TopicReplayMetadata;
import com.networknt.kafka.producer.NativeLightProducer;
import com.networknt.kafka.producer.SidecarProducer;
import com.networknt.mesh.kafka.ProducerStartupHook;
import com.networknt.mesh.kafka.WriteAuditLog;
import com.networknt.server.ServerConfig;
import com.networknt.service.SingletonServiceFactory;
import com.networknt.utility.Constants;
import com.networknt.utility.StringUtils;
import io.undertow.server.HttpServerExchange;
import io.undertow.util.Headers;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/networknt/mesh/kafka/handler/TopicReplayPostHandler.class */
public class TopicReplayPostHandler extends WriteAuditLog implements LightHttpHandler {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) TopicReplayPostHandler.class);
    public static KafkaConsumerConfig config = (KafkaConsumerConfig) Config.getInstance().getJsonObjectConfig(KafkaConsumerConfig.CONFIG_NAME, KafkaConsumerConfig.class);
    public static KafkaStreamsConfig streamsConfig = (KafkaStreamsConfig) Config.getInstance().getJsonObjectConfig(KafkaStreamsConfig.CONFIG_NAME, KafkaStreamsConfig.class);
    public static KafkaProducerConfig producerConfig = (KafkaProducerConfig) Config.getInstance().getJsonObjectConfig(KafkaProducerConfig.CONFIG_NAME, KafkaProducerConfig.class);
    private static String STATUS_ACCEPTED = "SUC10202";
    private static String PRODUCER_NOT_ENABLED = "ERR12216";
    private static String TOPICREPLAY_METADATA_ERROR = "ERR12218";
    private static String RUNTIME_EXCEPTION = "ERR10010";
    SidecarProducer lightProducer;
    private String callerId = "unknown";
    public List<AuditRecord> auditRecords = new ArrayList();

    @Override // io.undertow.server.HttpHandler
    public void handleRequest(HttpServerExchange httpServerExchange) throws Exception {
        if (ProducerStartupHook.producer == null) {
            setExchangeStatus(httpServerExchange, PRODUCER_NOT_ENABLED, new Object[0]);
            return;
        }
        httpServerExchange.dispatch();
        try {
            long currentTimeMillis = System.currentTimeMillis();
            this.lightProducer = (SidecarProducer) SingletonServiceFactory.getBean(NativeLightProducer.class);
            TopicReplayMetadata topicReplayMetadata = (TopicReplayMetadata) Config.getInstance().getMapper().convertValue((Map) httpServerExchange.getAttachment(BodyHandler.REQUEST_BODY), TopicReplayMetadata.class);
            if (StringUtils.isEmpty(topicReplayMetadata.getTopicName())) {
                setExchangeStatus(httpServerExchange, TOPICREPLAY_METADATA_ERROR, "topic name can not be empty");
                return;
            }
            if (!config.getTopic().contains(topicReplayMetadata.getTopicName())) {
                setExchangeStatus(httpServerExchange, TOPICREPLAY_METADATA_ERROR, "provided topic in request must match one of the topics in consuming topic list");
                return;
            }
            if (topicReplayMetadata.getPartition() < 0) {
                setExchangeStatus(httpServerExchange, TOPICREPLAY_METADATA_ERROR, "partition can not be negative");
                return;
            }
            if (topicReplayMetadata.getStartOffset() < 0) {
                setExchangeStatus(httpServerExchange, TOPICREPLAY_METADATA_ERROR, "replay start offset can not be negative");
                return;
            }
            if (topicReplayMetadata.getEndOffset() < 0) {
                setExchangeStatus(httpServerExchange, TOPICREPLAY_METADATA_ERROR, "replay end offset can not be negative");
                return;
            }
            if (topicReplayMetadata.getStartOffset() > topicReplayMetadata.getEndOffset()) {
                setExchangeStatus(httpServerExchange, TOPICREPLAY_METADATA_ERROR, "replay end offset can not be less than start offset");
                return;
            }
            if (topicReplayMetadata.getStartOffset() == topicReplayMetadata.getEndOffset()) {
                setExchangeStatus(httpServerExchange, TOPICREPLAY_METADATA_ERROR, "replay end offset can not be equal to start offset");
                return;
            }
            if (StringUtils.isEmpty(topicReplayMetadata.getConsumerGroup())) {
                setExchangeStatus(httpServerExchange, TOPICREPLAY_METADATA_ERROR, "replay consumer group can not be empty");
                return;
            }
            if (topicReplayMetadata.isStreamingApp() && StringUtils.isEmpty(topicReplayMetadata.getDestinationTopic())) {
                setExchangeStatus(httpServerExchange, TOPICREPLAY_METADATA_ERROR, "Destination topic can not be empty for a streaming app");
                return;
            }
            String topicName = topicReplayMetadata.getTopicName();
            if (topicReplayMetadata.isDlqIndicator()) {
                topicName = topicReplayMetadata.getTopicName().concat(config.getDeadLetterTopicExt());
                topicReplayMetadata.setTopicName(topicName);
            }
            if (logger.isInfoEnabled()) {
                logger.info("TopicReplayPostHandler handleRequest send replay metadata for topic {}", topicName);
            }
            ProduceRecord produceRecord = new ProduceRecord(Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty());
            produceRecord.setKey(Optional.of(JsonMapper.objectMapper.readTree(JsonMapper.objectMapper.writeValueAsString(topicName))));
            produceRecord.setValue(Optional.of(JsonMapper.objectMapper.readTree(JsonMapper.objectMapper.writeValueAsString(topicReplayMetadata))));
            produceRecord.setTraceabilityId(Optional.of(UUID.randomUUID().toString()));
            produceRecord.setCorrelationId(Optional.of(UUID.randomUUID().toString()));
            ProduceRequest produceRequest = new ProduceRequest(Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), null);
            produceRequest.setRecords(Arrays.asList(produceRecord));
            produceRequest.setValueFormat(Optional.of(EmbeddedFormat.JSONSCHEMA));
            produceRequest.setKeyFormat(Optional.of(EmbeddedFormat.STRING));
            this.lightProducer.produceWithSchema(streamsConfig.getDeadLetterControllerTopic(), ServerConfig.getInstance().getServiceId(), Optional.empty(), produceRequest, populateHeaders(httpServerExchange, producerConfig, topicName), this.auditRecords).whenCompleteAsync((produceResponse, th) -> {
                synchronized (this.auditRecords) {
                    if (this.auditRecords != null && this.auditRecords.size() > 0) {
                        this.auditRecords.forEach(auditRecord -> {
                            writeAuditLog(auditRecord, config.getAuditTarget(), config.getAuditTopic());
                        });
                        this.auditRecords.clear();
                    }
                }
                if (logger.isDebugEnabled()) {
                    logger.debug("TopicReplayPostHandler handleRequest total time is {}", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
                    logger.debug("Message produced to offset {}", produceResponse.getOffsets().get(0));
                }
                httpServerExchange.getResponseHeaders().put(Headers.CONTENT_TYPE, "application/json");
                httpServerExchange.getResponseSender().send(JsonMapper.toJson(produceResponse));
            });
        } catch (Exception e) {
            setExchangeStatus(httpServerExchange, RUNTIME_EXCEPTION, new Object[0]);
            logger.error("Error while producing message to replay topic ", (Throwable) e);
        }
    }

    public org.apache.kafka.common.header.Headers populateHeaders(HttpServerExchange httpServerExchange, KafkaProducerConfig kafkaProducerConfig, String str) {
        RecordHeaders recordHeaders = new RecordHeaders();
        String first = httpServerExchange.getRequestHeaders().getFirst(Constants.AUTHORIZATION_STRING);
        if (first != null) {
            recordHeaders.add(Constants.AUTHORIZATION_STRING, first.getBytes(StandardCharsets.UTF_8));
        }
        if (kafkaProducerConfig.isInjectCallerId()) {
            recordHeaders.add(Constants.CALLER_ID_STRING, this.callerId.getBytes(StandardCharsets.UTF_8));
        }
        return recordHeaders;
    }
}
