package com.networknt.mesh.kafka.util;

import com.networknt.client.Http2Client;
import com.networknt.client.simplepool.SimpleConnectionHolder;
import com.networknt.config.Config;
import com.networknt.http.HttpStatus;
import com.networknt.kafka.common.KafkaConsumerConfig;
import com.networknt.kafka.consumer.KafkaConsumerManager;
import com.networknt.kafka.entity.AuditRecord;
import com.networknt.kafka.entity.ConsumerOffsetCommitRequest;
import com.networknt.kafka.entity.ConsumerRecord;
import com.networknt.kafka.entity.SidecarConsumerRecord;
import com.networknt.kafka.entity.TopicPartitionOffsetMetadata;
import com.networknt.kafka.entity.TopicReplayMetadata;
import com.networknt.kafka.producer.SidecarProducer;
import com.networknt.mesh.kafka.ActiveConsumerStartupHook;
import com.networknt.mesh.kafka.WriteAuditLog;
import com.networknt.utility.ObjectUtils;
import com.networknt.utility.StringUtils;
import io.undertow.UndertowOptions;
import io.undertow.client.ClientConnection;
import io.undertow.client.ClientRequest;
import io.undertow.client.ClientResponse;
import io.undertow.util.Headers;
import io.undertow.util.Methods;
import java.net.URI;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xnio.OptionMap;

/* loaded from: input_file:com/networknt/mesh/kafka/util/ActiveConsumerMessageHandle.class */
public class ActiveConsumerMessageHandle extends WriteAuditLog {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) ActiveConsumerMessageHandle.class);
    static final KafkaConsumerConfig consumerConfig = (KafkaConsumerConfig) Config.getInstance().getJsonObjectConfig(KafkaConsumerConfig.CONFIG_NAME, KafkaConsumerConfig.class);
    public static Http2Client client = Http2Client.getInstance();
    public static ClientConnection connection;
    public List<AuditRecord> auditRecords = new ArrayList();
    public static boolean firstBatch;

    public long listenOnMessage(String str, long j, TopicReplayMetadata topicReplayMetadata, KafkaConsumerManager kafkaConsumerManager, String str2, SidecarProducer sidecarProducer) {
        long currentTimeMillis = System.currentTimeMillis();
        ArrayList arrayList = new ArrayList();
        SimpleConnectionHolder.ConnectionToken connectionToken = null;
        try {
            try {
                List<Map<String, Object>> string2List = ConvertToList.string2List(Config.getInstance().getMapper(), str);
                logger.info("Parsed received replay records at {} , batch size is  {}", LocalDateTime.now(), Integer.valueOf(string2List.size()));
                AtomicLong atomicLong = new AtomicLong(j);
                if (ObjectUtils.isEmpty(string2List) || string2List.isEmpty()) {
                    long j2 = j + 1;
                    client.restore(null);
                    return j2;
                }
                string2List.forEach(map -> {
                    ConsumerRecord consumerRecord = (ConsumerRecord) Config.getInstance().getMapper().convertValue(map, ConsumerRecord.class);
                    if (firstBatch && topicReplayMetadata.getStartOffset() != consumerRecord.getOffset()) {
                        throw new RuntimeException("For the first batch , start offset and first read offset do not match, possible error in input start offset, abort");
                    }
                    firstBatch = false;
                    if (consumerRecord.getOffset() < topicReplayMetadata.getStartOffset() || consumerRecord.getOffset() >= topicReplayMetadata.getEndOffset()) {
                        return;
                    }
                    arrayList.add(SidecarConsumerRecord.fromConsumerRecord(consumerRecord));
                    atomicLong.set(consumerRecord.getOffset());
                });
                if (ObjectUtils.isEmpty(arrayList) || arrayList.isEmpty()) {
                    long j3 = j + 1;
                    client.restore(null);
                    return j3;
                }
                if (connection == null || !connection.isOpen()) {
                    connectionToken = consumerConfig.getBackendApiHost().startsWith("https") ? client.borrow(new URI(consumerConfig.getBackendApiHost()), Http2Client.WORKER, client.getDefaultXnioSsl(), Http2Client.BUFFER_POOL, OptionMap.create(UndertowOptions.ENABLE_HTTP2, true)) : client.borrow(new URI(consumerConfig.getBackendApiHost()), Http2Client.WORKER, Http2Client.BUFFER_POOL, OptionMap.EMPTY);
                    connection = (ClientConnection) connectionToken.getRawConnection();
                }
                AtomicReference<ClientResponse> atomicReference = new AtomicReference<>();
                ClientRequest path = new ClientRequest().setMethod(Methods.POST).setPath(consumerConfig.getBackendApiPath());
                path.getRequestHeaders().put(Headers.CONTENT_TYPE, "application/json");
                path.getRequestHeaders().put(Headers.TRANSFER_ENCODING, "chunked");
                if (consumerConfig.isBackendConnectionReset()) {
                    path.getRequestHeaders().put(Headers.CONNECTION, "close");
                }
                path.getRequestHeaders().put(Headers.HOST, "localhost");
                if (logger.isInfoEnabled()) {
                    logger.info("Send a batch to the backend API, size {}", Integer.valueOf(arrayList.size()));
                }
                CountDownLatch countDownLatch = new CountDownLatch(1);
                connection.sendRequest(path, client.createClientCallback(atomicReference, countDownLatch, Config.getInstance().getMapper().writeValueAsString(arrayList)));
                countDownLatch.await(consumerConfig.getInstanceTimeoutMs(), TimeUnit.MILLISECONDS);
                if (ObjectUtils.isEmpty(atomicReference) || ObjectUtils.isEmpty(atomicReference.get()) || atomicReference.get().getResponseCode() >= HttpStatus.BAD_REQUEST.value()) {
                    if (ObjectUtils.isEmpty(atomicReference) || ObjectUtils.isEmpty(atomicReference.get()) || atomicReference.get().getResponseCode() < HttpStatus.BAD_REQUEST.value()) {
                        logger.error("Timeout exception with backendAPI call , will throw exception and silently exit the processing.");
                        throw new RuntimeException("Timeout exception with backendAPI call");
                    }
                    logger.error("Received bad response from backendAPI call, status code {}, will throw exception and silently exit the processing.", Integer.valueOf(atomicReference.get().getResponseCode()));
                    throw new RuntimeException("Received bad response from backendAPI call, status code " + atomicReference.get().getResponseCode());
                }
                int responseCode = atomicReference.get().getResponseCode();
                String str3 = (String) atomicReference.get().getAttachment(Http2Client.RESPONSE_BODY);
                if (logger.isInfoEnabled()) {
                    logger.info("Got successful response from the backend API");
                }
                if (null == kafkaConsumerManager.getExistingConsumerInstance(topicReplayMetadata.getConsumerGroup(), str2) || null == kafkaConsumerManager.getExistingConsumerInstance(topicReplayMetadata.getConsumerGroup(), str2).getId() || StringUtils.isEmpty(kafkaConsumerManager.getExistingConsumerInstance(topicReplayMetadata.getConsumerGroup(), str2).getId().getInstance())) {
                    logger.info("Reinitiating the consumer subscription as consumer instance had died , increase instance time out MS or preferably reduce batch size ");
                    new SubscribeTopic(topicReplayMetadata.getConsumerGroup()).subscribeToTopic(topicReplayMetadata);
                }
                if (logger.isDebugEnabled()) {
                    logger.debug("statusCode = " + responseCode + " body  = " + str3);
                }
                processResponse(ActiveConsumerStartupHook.kafkaConsumerManager, sidecarProducer, consumerConfig, str3, responseCode, arrayList.size(), this.auditRecords, topicReplayMetadata.isLastRetry());
                List asList = Arrays.asList(new TopicPartitionOffsetMetadata(topicReplayMetadata.getTopicName(), Integer.valueOf(topicReplayMetadata.getPartition()), Long.valueOf(atomicLong.get()), null));
                Future commitOffsets = kafkaConsumerManager.commitOffsets(topicReplayMetadata.getConsumerGroup(), str2, false, new ConsumerOffsetCommitRequest(asList), (list, frameworkException) -> {
                    if (null != frameworkException) {
                        logger.error("Error committing offset, will force a restart ", (Throwable) frameworkException);
                        throw new RuntimeException(frameworkException.getMessage());
                    }
                    asList.forEach(topicPartitionOffsetMetadata -> {
                        logger.info("Committed to topic = " + topicPartitionOffsetMetadata.getTopic() + " partition = " + topicPartitionOffsetMetadata.getPartition() + " offset = " + topicPartitionOffsetMetadata.getOffset());
                    });
                });
                if (logger.isDebugEnabled()) {
                    logger.debug("time taken to process one batch and get response from backend in MS {} ", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
                }
                commitOffsets.get();
                long j4 = atomicLong.get() + 1;
                client.restore(connectionToken);
                return j4;
            } catch (Exception e) {
                throw new RuntimeException("ActiveConsumerMessageHandle exception while processing read message " + e.getMessage());
            }
        } catch (Throwable th) {
            client.restore(null);
            throw th;
        }
    }
}
