package com.networknt.mesh.kafka.handler;

import com.fasterxml.jackson.core.type.TypeReference;
import com.networknt.body.BodyHandler;
import com.networknt.config.Config;
import com.networknt.config.JsonMapper;
import com.networknt.handler.LightHttpHandler;
import com.networknt.kafka.common.KafkaConsumerConfig;
import com.networknt.kafka.entity.RecordProcessedResult;
import com.networknt.kafka.producer.CompletableFutures;
import com.networknt.kafka.producer.ProduceResult;
import com.networknt.mesh.kafka.ProducerStartupHook;
import com.networknt.mesh.kafka.WriteAuditLog;
import com.networknt.status.Status;
import com.networknt.utility.Constants;
import io.undertow.server.HttpServerExchange;
import io.undertow.util.Headers;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/networknt/mesh/kafka/handler/DeadlettersQueueActivePostHandler.class */
public class DeadlettersQueueActivePostHandler extends WriteAuditLog implements LightHttpHandler {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) DeadlettersQueueActivePostHandler.class);
    public static KafkaConsumerConfig config = (KafkaConsumerConfig) Config.getInstance().getJsonObjectConfig(KafkaConsumerConfig.CONFIG_NAME, KafkaConsumerConfig.class);
    private static String PRODUCER_NOT_ENABLED = "ERR12216";
    private static String DLQ_ACTIVE_PROCEDURE_ERROR = "ERR30003";

    @Override // io.undertow.server.HttpHandler
    public void handleRequest(HttpServerExchange httpServerExchange) throws Exception {
        List<RecordProcessedResult> list = (List) Config.getInstance().getMapper().convertValue((List) httpServerExchange.getAttachment(BodyHandler.REQUEST_BODY), new TypeReference<List<RecordProcessedResult>>(this) { // from class: com.networknt.mesh.kafka.handler.DeadlettersQueueActivePostHandler.1
        });
        if (ProducerStartupHook.producer == null) {
            setExchangeStatus(httpServerExchange, PRODUCER_NOT_ENABLED, new Object[0]);
            return;
        }
        httpServerExchange.dispatch();
        try {
            List<ProduceResult> list2 = doProduce(list).get();
            httpServerExchange.getResponseHeaders().put(Headers.CONTENT_TYPE, "application/json");
            httpServerExchange.setStatusCode(200);
            httpServerExchange.getResponseSender().send(JsonMapper.toJson(list2));
        } catch (Exception e) {
            logger.error("error happen: " + String.valueOf(e));
            Status status = new Status(DLQ_ACTIVE_PROCEDURE_ERROR, new Object[0]);
            status.setDescription(e.getMessage());
            setExchangeStatus(httpServerExchange, status);
        }
    }

    private CompletableFuture<List<ProduceResult>> doProduce(List<RecordProcessedResult> list) {
        return CompletableFutures.allAsList((List) ((List) list.stream().map(recordProcessedResult -> {
            return produce(recordProcessedResult);
        }).collect(Collectors.toList())).stream().map(completableFuture -> {
            return completableFuture.thenApply(produceResult -> {
                return produceResult;
            });
        }).collect(Collectors.toList()));
    }

    public CompletableFuture<ProduceResult> produce(RecordProcessedResult recordProcessedResult) {
        CompletableFuture<ProduceResult> completableFuture = new CompletableFuture<>();
        ProducerStartupHook.producer.send(new ProducerRecord(recordProcessedResult.getRecord().getTopic() + config.getDeadLetterTopicExt(), null, Long.valueOf(System.currentTimeMillis()), recordProcessedResult.getKey().getBytes(StandardCharsets.UTF_8), JsonMapper.toJson(recordProcessedResult.getRecord().getValue()).getBytes(StandardCharsets.UTF_8), populateHeaders(recordProcessedResult)), (recordMetadata, exc) -> {
            if (exc != null) {
                completableFuture.completeExceptionally(exc);
                return;
            }
            if (config.isAuditEnabled()) {
                activeConsumerAuditLog(recordProcessedResult, config.getAuditTarget(), config.getAuditTopic());
            }
            completableFuture.complete(ProduceResult.fromRecordMetadata(recordMetadata));
        });
        return completableFuture;
    }

    public org.apache.kafka.common.header.Headers populateHeaders(RecordProcessedResult recordProcessedResult) {
        RecordHeaders recordHeaders = new RecordHeaders();
        if (recordProcessedResult.getCorrelationId() != null) {
            recordHeaders.add("X-Correlation-Id", recordProcessedResult.getCorrelationId().getBytes(StandardCharsets.UTF_8));
        }
        if (recordProcessedResult.getTraceabilityId() != null) {
            recordHeaders.add("X-Traceability-Id", recordProcessedResult.getTraceabilityId().getBytes(StandardCharsets.UTF_8));
        }
        if (recordProcessedResult.getStacktrace() != null) {
            recordHeaders.add(Constants.STACK_TRACE, recordProcessedResult.getStacktrace().getBytes(StandardCharsets.UTF_8));
        }
        Map<String, String> headers = recordProcessedResult.getRecord().getHeaders();
        if (headers != null && headers.size() > 0) {
            headers.keySet().stream().forEach(str -> {
                if (headers.get(str) != null) {
                    recordHeaders.add(str, ((String) headers.get(str)).getBytes(StandardCharsets.UTF_8));
                }
            });
        }
        return recordHeaders;
    }
}
