package com.networknt.mesh.kafka;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.networknt.config.Config;
import com.networknt.config.JsonMapper;
import com.networknt.kafka.common.KafkaConsumerConfig;
import com.networknt.kafka.consumer.KafkaConsumerManager;
import com.networknt.kafka.entity.AuditRecord;
import com.networknt.kafka.entity.EmbeddedFormat;
import com.networknt.kafka.entity.ProduceRecord;
import com.networknt.kafka.entity.ProduceRequest;
import com.networknt.kafka.entity.RecordProcessedResult;
import com.networknt.kafka.producer.SidecarProducer;
import com.networknt.server.ServerConfig;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/networknt/mesh/kafka/WriteAuditLog.class */
public class WriteAuditLog {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) WriteAuditLog.class);

    /* JADX INFO: Access modifiers changed from: protected */
    public void activeConsumerAuditLog(RecordProcessedResult recordProcessedResult, String str, String str2) {
        writeAuditLog(auditFromRecordProcessedResult(recordProcessedResult, AuditRecord.AuditType.ACTIVE_CONSUMER), str, str2);
    }

    protected void reactiveConsumerAuditLog(RecordProcessedResult recordProcessedResult, String str, String str2) {
        writeAuditLog(auditFromRecordProcessedResult(recordProcessedResult, AuditRecord.AuditType.REACTIVE_CONSUMER), str, str2);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AuditRecord auditFromRecordProcessedResult(RecordProcessedResult recordProcessedResult, AuditRecord.AuditType auditType) {
        String correlationId;
        String traceabilityId;
        AuditRecord auditRecord = new AuditRecord();
        auditRecord.setId(UUID.randomUUID().toString());
        auditRecord.setServiceId(ServerConfig.getInstance().getServiceId());
        auditRecord.setAuditType(auditType);
        auditRecord.setTopic(recordProcessedResult.getRecord().getTopic());
        auditRecord.setPartition(recordProcessedResult.getRecord().getPartition());
        auditRecord.setOffset(recordProcessedResult.getRecord().getOffset());
        Map<String, String> headers = recordProcessedResult.getRecord().getHeaders();
        if (headers != null) {
            correlationId = headers.get("X-Correlation-Id");
            if (correlationId == null) {
                correlationId = recordProcessedResult.getCorrelationId();
            }
            traceabilityId = headers.get("X-Traceability-Id");
            if (traceabilityId == null) {
                traceabilityId = recordProcessedResult.getTraceabilityId();
            }
        } else {
            correlationId = recordProcessedResult.getCorrelationId();
            traceabilityId = recordProcessedResult.getTraceabilityId();
        }
        auditRecord.setCorrelationId(correlationId);
        auditRecord.setTraceabilityId(traceabilityId);
        auditRecord.setKey(recordProcessedResult.getKey());
        auditRecord.setTimestamp(recordProcessedResult.getTimestamp());
        auditRecord.setAuditStatus(recordProcessedResult.isProcessed() ? AuditRecord.AuditStatus.SUCCESS : AuditRecord.AuditStatus.FAILURE);
        return auditRecord;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void writeAuditLog(AuditRecord auditRecord, String str, String str2) {
        if ("topic".equals(str)) {
            AuditProducerStartupHook.auditProducer.send(new ProducerRecord(str2, null, Long.valueOf(System.currentTimeMillis()), auditRecord.getCorrelationId().getBytes(StandardCharsets.UTF_8), JsonMapper.toJson(auditRecord).getBytes(StandardCharsets.UTF_8), null), (recordMetadata, exc) -> {
                if (exc != null) {
                    logger.error("Exception:" + String.valueOf(exc));
                } else if (logger.isTraceEnabled()) {
                    logger.trace("Write to audit topic meta " + recordMetadata.topic() + " " + recordMetadata.partition() + " " + recordMetadata.offset());
                }
            });
        } else {
            SidecarAuditHelper.logResult(auditRecord);
        }
    }

    public void processResponse(KafkaConsumerManager kafkaConsumerManager, SidecarProducer sidecarProducer, KafkaConsumerConfig kafkaConsumerConfig, String str, int i, int i2, List<AuditRecord> list, boolean z) {
        if (str == null) {
            logger.error("Response body is empty with success status code is " + i);
            throw new RuntimeException("Response Body is empty with success status code " + i);
        }
        long currentTimeMillis = System.currentTimeMillis();
        List<Map<String, Object>> string2List = JsonMapper.string2List(str);
        if (string2List.size() != i2) {
            logger.error("The response size " + string2List.size() + " does not match the record size " + i2);
            throw new RuntimeException("The response size " + string2List.size() + " does not match the record size " + i2);
        }
        for (int i3 = 0; i3 < string2List.size(); i3++) {
            ObjectMapper mapper = Config.getInstance().getMapper();
            RecordProcessedResult recordProcessedResult = (RecordProcessedResult) mapper.convertValue(string2List.get(i3), RecordProcessedResult.class);
            if (kafkaConsumerConfig.isDeadLetterEnabled() && !recordProcessedResult.isProcessed() && !z) {
                try {
                    logger.info("Sending correlation id ::: " + recordProcessedResult.getCorrelationId() + " traceabilityId ::: " + recordProcessedResult.getTraceabilityId() + " to DLQ topic ::: " + (recordProcessedResult.getRecord().getTopic().contains(kafkaConsumerConfig.getDeadLetterTopicExt()) ? recordProcessedResult.getRecord().getTopic() : recordProcessedResult.getRecord().getTopic() + kafkaConsumerConfig.getDeadLetterTopicExt()));
                    ProduceRequest create = ProduceRequest.create(null, null, null, null, null, null, null, null, null, null, null);
                    ProduceRecord create2 = ProduceRecord.create(null, null, null, null, null);
                    create2.setKey(Optional.of(mapper.readTree(mapper.writeValueAsString(recordProcessedResult.getRecord().getKey()))));
                    create2.setValue(Optional.of(mapper.readTree(mapper.writeValueAsString(recordProcessedResult.getRecord().getValue()))));
                    create2.setCorrelationId(Optional.ofNullable(recordProcessedResult.getCorrelationId()));
                    create2.setTraceabilityId(Optional.ofNullable(recordProcessedResult.getTraceabilityId()));
                    create.setRecords(Arrays.asList(create2));
                    if (kafkaConsumerConfig.getKeyFormat() != null) {
                        create.setKeyFormat(Optional.of(EmbeddedFormat.valueOf(kafkaConsumerConfig.getKeyFormat().toUpperCase())));
                    }
                    if (kafkaConsumerConfig.getValueFormat() != null) {
                        create.setValueFormat(Optional.of(EmbeddedFormat.valueOf(kafkaConsumerConfig.getValueFormat().toUpperCase())));
                    }
                    sidecarProducer.produceWithSchema(recordProcessedResult.getRecord().getTopic().contains(kafkaConsumerConfig.getDeadLetterTopicExt()) ? recordProcessedResult.getRecord().getTopic() : recordProcessedResult.getRecord().getTopic() + kafkaConsumerConfig.getDeadLetterTopicExt(), ServerConfig.getInstance().getServiceId(), Optional.empty(), create, kafkaConsumerManager.populateHeaders(recordProcessedResult), list).whenCompleteAsync((produceResponse, th) -> {
                        long currentTimeMillis2 = System.currentTimeMillis();
                        synchronized (list) {
                            if (list != null) {
                                if (list.size() > 0) {
                                    list.forEach(auditRecord -> {
                                        writeAuditLog(auditRecord, kafkaConsumerConfig.getAuditTarget(), kafkaConsumerConfig.getAuditTopic());
                                    });
                                    list.clear();
                                }
                            }
                        }
                        if (logger.isDebugEnabled()) {
                            logger.debug("Writing audit log takes " + (System.currentTimeMillis() - currentTimeMillis2));
                        }
                    });
                } catch (Exception e) {
                    logger.error("Could not process record for traceability id ::: " + recordProcessedResult.getTraceabilityId() + ", correlation id ::: " + recordProcessedResult.getCorrelationId() + " to produce record for DLQ, will skip and proceed for next record ", (Throwable) e);
                    AuditRecord auditRecord = new AuditRecord();
                    auditRecord.setTopic(recordProcessedResult.getRecord().getTopic().contains(kafkaConsumerConfig.getDeadLetterTopicExt()) ? recordProcessedResult.getRecord().getTopic() : recordProcessedResult.getRecord().getTopic() + kafkaConsumerConfig.getDeadLetterTopicExt());
                    auditRecord.setAuditType(AuditRecord.AuditType.PRODUCER);
                    auditRecord.setAuditStatus(AuditRecord.AuditStatus.FAILURE);
                    auditRecord.setServiceId(ServerConfig.getInstance().getServiceId());
                    auditRecord.setStacktrace(e.getMessage());
                    auditRecord.setOffset(0L);
                    auditRecord.setPartition(0);
                    auditRecord.setTraceabilityId(recordProcessedResult.getCorrelationId());
                    auditRecord.setCorrelationId(recordProcessedResult.getCorrelationId());
                    auditRecord.setTimestamp(Long.valueOf(System.currentTimeMillis()));
                    auditRecord.setKey(recordProcessedResult.getKey());
                    auditRecord.setId(UUID.randomUUID().toString());
                    writeAuditLog(auditRecord, kafkaConsumerConfig.getAuditTarget(), kafkaConsumerConfig.getAuditTopic());
                }
            }
            if (kafkaConsumerConfig.isAuditEnabled()) {
                reactiveConsumerAuditLog(recordProcessedResult, kafkaConsumerConfig.getAuditTarget(), kafkaConsumerConfig.getAuditTopic());
            }
        }
        if (logger.isDebugEnabled()) {
            logger.debug("Response processing total time is " + (System.currentTimeMillis() - currentTimeMillis));
        }
    }
}
