package com.networknt.mesh.kafka.streams;

import com.networknt.config.Config;
import com.networknt.config.JsonMapper;
import com.networknt.kafka.common.KafkaConsumerConfig;
import com.networknt.kafka.entity.AuditRecord;
import com.networknt.kafka.entity.ConsumerSeekRequest;
import com.networknt.kafka.entity.TopicReplayMetadata;
import com.networknt.kafka.producer.NativeLightProducer;
import com.networknt.kafka.producer.SidecarProducer;
import com.networknt.mesh.kafka.ActiveConsumerStartupHook;
import com.networknt.mesh.kafka.ProducerStartupHook;
import com.networknt.mesh.kafka.util.ActiveConsumerCleanup;
import com.networknt.mesh.kafka.util.ActiveConsumerMessageHandle;
import com.networknt.mesh.kafka.util.ActiveConsumerStreamsAppMessageHandle;
import com.networknt.mesh.kafka.util.AuditRecordUtil;
import com.networknt.mesh.kafka.util.StreamsFactory;
import com.networknt.mesh.kafka.util.SubscribeTopic;
import com.networknt.service.SingletonServiceFactory;
import com.networknt.utility.ObjectUtils;
import com.networknt.utility.StringUtils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Optional;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/networknt/mesh/kafka/streams/MessageReplayProcessor.class */
public class MessageReplayProcessor implements Processor<String, TopicReplayMetadata, String, TopicReplayMetadata> {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) MessageReplayProcessor.class);
    static final KafkaConsumerConfig consumerConfig = (KafkaConsumerConfig) Config.getInstance().getJsonObjectConfig(KafkaConsumerConfig.CONFIG_NAME, KafkaConsumerConfig.class);
    private ProcessorContext processorContext;
    private SubscribeTopic subscribeTopic;
    private ActiveConsumerMessageHandle activeConsumerMessageHandle;
    private ActiveConsumerStreamsAppMessageHandle activeConsumerStreamsAppMessageHandle;
    public SidecarProducer lightProducer;

    @Override // org.apache.kafka.streams.processor.api.Processor
    public void init(ProcessorContext<String, TopicReplayMetadata> processorContext) {
        this.processorContext = processorContext;
        this.activeConsumerMessageHandle = StreamsFactory.createActiveConsumerMessageHandle();
        this.activeConsumerStreamsAppMessageHandle = StreamsFactory.createActiveConsumerStreamsAppMessageHandle();
        if (ProducerStartupHook.producer != null) {
            this.lightProducer = (SidecarProducer) SingletonServiceFactory.getBean(NativeLightProducer.class);
        } else {
            logger.error("ProducerStartupHook is not configured and it is needed if DLQ is enabled");
            throw new RuntimeException("ProducerStartupHook is not loaded!");
        }
    }

    @Override // org.apache.kafka.streams.processor.api.Processor
    public void process(Record<String, TopicReplayMetadata> record) {
        boolean z;
        TopicReplayMetadata topicReplayMetadata = (TopicReplayMetadata) JsonMapper.objectMapper.convertValue(record.value(), TopicReplayMetadata.class);
        if (topicReplayMetadata.isDlqIndicator() && consumerConfig.getTopic().contains(topicReplayMetadata.getTopicName().split(consumerConfig.getDeadLetterTopicExt())[0])) {
            z = true;
            logger.debug("Consumer configured with topic {} , replay attempt message received for topic {}, decision is to continue", consumerConfig.getTopic(), topicReplayMetadata.getTopicName());
        } else if (!consumerConfig.getTopic().contains(topicReplayMetadata.getTopicName())) {
            logger.debug("Consumer configured with topic {} , replay attempt message received for topic {}, decision is to return", consumerConfig.getTopic(), topicReplayMetadata.getTopicName());
            return;
        } else {
            z = true;
            logger.debug("Consumer configured with topic {} , replay attempt message received for topic {}, decision is to continue", consumerConfig.getTopic(), topicReplayMetadata.getTopicName());
        }
        if (z) {
            Optional<RecordMetadata> recordMetadata = this.processorContext.recordMetadata();
            AuditRecordUtil.publishAuditRecord(this.processorContext, record, recordMetadata.get(), "", AuditRecord.AuditType.STREAMING_CONSUMER, AuditRecord.AuditStatus.SUCCESS);
            if (ObjectUtils.isEmpty(this.subscribeTopic) || StringUtils.isEmpty(this.subscribeTopic.getInstanceId()) || ObjectUtils.isEmpty(ActiveConsumerStartupHook.kafkaConsumerManager) || ObjectUtils.isEmpty(ActiveConsumerStartupHook.kafkaConsumerManager.getExistingConsumerInstance(topicReplayMetadata.getConsumerGroup(), this.subscribeTopic.getInstanceId()))) {
                this.subscribeTopic = StreamsFactory.createSubscribeTopic(topicReplayMetadata.getConsumerGroup());
                if (!this.subscribeTopic.subscribeToTopic(topicReplayMetadata)) {
                    logger.error("Can not subscribe to topic, reattempt. In case of reattempt error, check the payload or further restart the pod");
                    return;
                }
                try {
                    try {
                        ActiveConsumerStartupHook.kafkaConsumerManager.seek(topicReplayMetadata.getConsumerGroup(), this.subscribeTopic.getInstanceId(), new ConsumerSeekRequest(Arrays.asList(new ConsumerSeekRequest.PartitionOffset(topicReplayMetadata.getTopicName(), topicReplayMetadata.getPartition(), topicReplayMetadata.getStartOffset(), null)), new ArrayList()));
                        logger.info("Seeking to offset {} is successful ", Long.valueOf(topicReplayMetadata.getStartOffset()));
                        ActiveConsumerMessageHandle.firstBatch = true;
                        ActiveConsumerStreamsAppMessageHandle.firstBatch = true;
                        long startOffset = topicReplayMetadata.getStartOffset();
                        int i = 0;
                        while (startOffset < topicReplayMetadata.getEndOffset() && i < 3) {
                            try {
                                String activeReadRecordUtil = StreamsFactory.createConsumersGroupInstancesInstanceRecordsGetHandler().activeReadRecordUtil(topicReplayMetadata.getConsumerGroup(), this.subscribeTopic.getInstanceId(), null, null);
                                logger.debug("The Read Records are {}", activeReadRecordUtil);
                                if (StringUtils.isEmpty(activeReadRecordUtil) || activeReadRecordUtil.length() <= 2) {
                                    i++;
                                    logger.info("No message was retrieved from topic. Response code is {} , reattempt ", activeReadRecordUtil);
                                } else {
                                    startOffset = topicReplayMetadata.isStreamingApp() ? this.activeConsumerStreamsAppMessageHandle.listenOnMessage(this.lightProducer, activeReadRecordUtil, startOffset, topicReplayMetadata, ActiveConsumerStartupHook.kafkaConsumerManager, this.subscribeTopic.getInstanceId()) : this.activeConsumerMessageHandle.listenOnMessage(activeReadRecordUtil, startOffset, topicReplayMetadata, ActiveConsumerStartupHook.kafkaConsumerManager, this.subscribeTopic.getInstanceId(), this.lightProducer);
                                    i = 0;
                                }
                            } catch (Exception e) {
                                throw new RuntimeException("Read records exception");
                            }
                        }
                        ActiveConsumerCleanup.cleanUp(ActiveConsumerStartupHook.kafkaConsumerManager.getExistingConsumerInstance(topicReplayMetadata.getConsumerGroup(), this.subscribeTopic.getInstanceId()), topicReplayMetadata.getConsumerGroup(), this.subscribeTopic.getInstanceId());
                    } catch (Exception e2) {
                        throw new RuntimeException("Seeking to offset failed for topic " + topicReplayMetadata.getStartOffset());
                    }
                } catch (Exception e3) {
                    logger.error("Exception occurred while reading record and processing batch ::: ", (Throwable) e3);
                    AuditRecordUtil.publishAuditRecord(this.processorContext, record, recordMetadata.get(), e3.getMessage(), AuditRecord.AuditType.STREAMING_CONSUMER, AuditRecord.AuditStatus.FAILURE);
                    ActiveConsumerCleanup.cleanUp(ActiveConsumerStartupHook.kafkaConsumerManager.getExistingConsumerInstance(topicReplayMetadata.getConsumerGroup(), this.subscribeTopic.getInstanceId()), topicReplayMetadata.getConsumerGroup(), this.subscribeTopic.getInstanceId());
                }
            }
        }
    }

    @Override // org.apache.kafka.streams.processor.api.Processor
    public void close() {
    }
}
