package com.networknt.mesh.kafka.util;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.networknt.client.Http2Client;
import com.networknt.config.Config;
import com.networknt.kafka.common.KafkaConsumerConfig;
import com.networknt.kafka.consumer.KafkaConsumerManager;
import com.networknt.kafka.entity.AuditRecord;
import com.networknt.kafka.entity.ConsumerOffsetCommitRequest;
import com.networknt.kafka.entity.ConsumerRecord;
import com.networknt.kafka.entity.EmbeddedFormat;
import com.networknt.kafka.entity.ProduceRecord;
import com.networknt.kafka.entity.ProduceRequest;
import com.networknt.kafka.entity.TopicPartitionOffsetMetadata;
import com.networknt.kafka.entity.TopicReplayMetadata;
import com.networknt.kafka.producer.SidecarProducer;
import com.networknt.mesh.kafka.WriteAuditLog;
import com.networknt.server.ServerConfig;
import com.networknt.utility.ObjectUtils;
import java.net.http.HttpClient;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/networknt/mesh/kafka/util/ActiveConsumerStreamsAppMessageHandle.class */
public class ActiveConsumerStreamsAppMessageHandle extends WriteAuditLog {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) ActiveConsumerStreamsAppMessageHandle.class);
    static final KafkaConsumerConfig consumerConfig = (KafkaConsumerConfig) Config.getInstance().getJsonObjectConfig(KafkaConsumerConfig.CONFIG_NAME, KafkaConsumerConfig.class);
    private static Http2Client client = Http2Client.getInstance();
    private static HttpClient httpClient = HttpClient.newBuilder().build();
    public List<AuditRecord> auditRecords = new ArrayList();
    public static boolean firstBatch;

    public long listenOnMessage(SidecarProducer sidecarProducer, String str, long j, TopicReplayMetadata topicReplayMetadata, KafkaConsumerManager kafkaConsumerManager, String str2) {
        long currentTimeMillis = System.currentTimeMillis();
        ObjectMapper mapper = Config.getInstance().getMapper();
        try {
            List<Map<String, Object>> string2List = ConvertToList.string2List(Config.getInstance().getMapper(), str);
            logger.info("Parsed received replay records at {} , batch size is  {}", LocalDateTime.now(), Integer.valueOf(string2List.size()));
            AtomicLong atomicLong = new AtomicLong(j);
            ProduceRequest produceRequest = new ProduceRequest(Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), null);
            produceRequest.setKeyFormat(Optional.of(EmbeddedFormat.valueOf(consumerConfig.getKeyFormat().toUpperCase())));
            produceRequest.setValueFormat(Optional.of(EmbeddedFormat.valueOf(consumerConfig.getValueFormat().toUpperCase())));
            if (ObjectUtils.isEmpty(string2List) || string2List.isEmpty()) {
                return j + 1;
            }
            ArrayList arrayList = new ArrayList();
            string2List.forEach(map -> {
                Map map = (Map) map.get("headers");
                ProduceRecord create = ProduceRecord.create(null, null, null, null, null);
                ConsumerRecord consumerRecord = (ConsumerRecord) Config.getInstance().getMapper().convertValue(map, ConsumerRecord.class);
                if (firstBatch && topicReplayMetadata.getStartOffset() != consumerRecord.getOffset()) {
                    throw new RuntimeException("For the first batch , start offset and first read offset do not match, possible error in input start offset, abort");
                }
                firstBatch = false;
                if (consumerRecord.getOffset() < topicReplayMetadata.getStartOffset() || consumerRecord.getOffset() >= topicReplayMetadata.getEndOffset()) {
                    return;
                }
                try {
                    create.setKey(Optional.of(mapper.readTree(mapper.writeValueAsString(consumerRecord.getKey()))));
                    create.setValue(Optional.of(mapper.readTree(mapper.writeValueAsString(consumerRecord.getValue()))));
                    if (!ObjectUtils.isEmpty(map) && !ObjectUtils.isEmpty(map.get("X-Traceability-Id"))) {
                        create.setTraceabilityId(Optional.ofNullable((String) map.get("X-Traceability-Id")));
                    }
                    if (!ObjectUtils.isEmpty(map) && !ObjectUtils.isEmpty(map.get("X-Correlation-Id"))) {
                        create.setCorrelationId(Optional.ofNullable((String) map.get("X-Correlation-Id")));
                    }
                    arrayList.add(create);
                    atomicLong.set(consumerRecord.getOffset());
                } catch (JsonProcessingException e) {
                    throw new RuntimeException("ActiveConsumerStreamsAppMessageHandle exception while parsing read message ", e);
                }
            });
            produceRequest.setRecords(arrayList);
            if (logger.isDebugEnabled()) {
                logger.debug("Forwarding message ::: {} to sidecar at ::: {}", mapper.writeValueAsString(produceRequest), LocalDateTime.now());
            }
            if (ObjectUtils.isEmpty(produceRequest) || produceRequest.getRecords().isEmpty()) {
                return j + 1;
            }
            if (logger.isInfoEnabled()) {
                logger.info("Send a batch to the destination topic API, size {}", Integer.valueOf(produceRequest.getRecords().size()));
            }
            try {
                sidecarProducer.produceWithSchema(topicReplayMetadata.getDestinationTopic(), ServerConfig.getInstance().getServiceId(), Optional.empty(), produceRequest, new RecordHeaders(), this.auditRecords).whenCompleteAsync((produceResponse, th) -> {
                    long currentTimeMillis2 = System.currentTimeMillis();
                    synchronized (this.auditRecords) {
                        if (this.auditRecords != null && this.auditRecords.size() > 0) {
                            this.auditRecords.forEach(auditRecord -> {
                                writeAuditLog(auditRecord, consumerConfig.getAuditTarget(), consumerConfig.getAuditTopic());
                            });
                            this.auditRecords.clear();
                        }
                    }
                    if (logger.isDebugEnabled()) {
                        logger.debug("Writing audit log takes " + (System.currentTimeMillis() - currentTimeMillis2));
                        logger.debug("ProducerTopicPostHandler handleRequest total time is " + (System.currentTimeMillis() - currentTimeMillis));
                    }
                });
                if (logger.isDebugEnabled()) {
                    logger.debug("Forwarded transformed message to sidecar at ::: {}", LocalDateTime.now());
                }
                List asList = Arrays.asList(new TopicPartitionOffsetMetadata(topicReplayMetadata.getTopicName(), Integer.valueOf(topicReplayMetadata.getPartition()), Long.valueOf(atomicLong.get()), null));
                Future commitOffsets = kafkaConsumerManager.commitOffsets(topicReplayMetadata.getConsumerGroup(), str2, false, new ConsumerOffsetCommitRequest(asList), (list, frameworkException) -> {
                    if (null != frameworkException) {
                        logger.error("Error committing offset, will force a restart ", (Throwable) frameworkException);
                        throw new RuntimeException(frameworkException.getMessage());
                    }
                    asList.forEach(topicPartitionOffsetMetadata -> {
                        logger.info("Committed to topic = " + topicPartitionOffsetMetadata.getTopic() + " partition = " + topicPartitionOffsetMetadata.getPartition() + " offset = " + topicPartitionOffsetMetadata.getOffset());
                    });
                });
                if (logger.isDebugEnabled()) {
                    logger.debug("time taken to process one batch and get response from backend in MS {} ", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
                }
                commitOffsets.get();
                return atomicLong.get() + 1;
            } catch (Exception e) {
                throw new RuntimeException(e.getMessage());
            }
        } catch (Exception e2) {
            throw new RuntimeException("ActiveConsumerStreamsAppMessageHandle exception while processing read message " + e2.getMessage());
        }
    }
}
