package com.networknt.mesh.kafka;

import com.damnhandy.uri.template.UriTemplate;
import com.networknt.client.Http2Client;
import com.networknt.client.simplepool.SimpleConnectionHolder;
import com.networknt.config.Config;
import com.networknt.config.JsonMapper;
import com.networknt.exception.FrameworkException;
import com.networknt.kafka.common.KafkaConsumerConfig;
import com.networknt.kafka.consumer.ConsumerReadCallback;
import com.networknt.kafka.consumer.KafkaConsumerManager;
import com.networknt.kafka.consumer.KafkaConsumerState;
import com.networknt.kafka.entity.AuditRecord;
import com.networknt.kafka.entity.ConsumerOffsetCommitRequest;
import com.networknt.kafka.entity.ConsumerRecord;
import com.networknt.kafka.entity.ConsumerSubscriptionRecord;
import com.networknt.kafka.entity.CreateConsumerInstanceRequest;
import com.networknt.kafka.entity.SidecarConsumerRecord;
import com.networknt.kafka.entity.TopicPartitionOffsetMetadata;
import com.networknt.kafka.producer.NativeLightProducer;
import com.networknt.kafka.producer.SidecarProducer;
import com.networknt.mesh.kafka.handler.SidecarHealthHandler;
import com.networknt.mesh.kafka.util.KafkaConsumerManagerFactory;
import com.networknt.server.Server;
import com.networknt.server.StartupHookProvider;
import com.networknt.service.SingletonServiceFactory;
import com.networknt.utility.ObjectUtils;
import com.networknt.utility.StringUtils;
import io.undertow.UndertowOptions;
import io.undertow.client.ClientConnection;
import io.undertow.client.ClientRequest;
import io.undertow.client.ClientResponse;
import io.undertow.util.Headers;
import io.undertow.util.Methods;
import java.net.URI;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xnio.OptionMap;

/* loaded from: input_file:com/networknt/mesh/kafka/ReactiveConsumerStartupHook.class */
public class ReactiveConsumerStartupHook extends WriteAuditLog implements StartupHookProvider {
    public static KafkaConsumerManager kafkaConsumerManager;
    public List<AuditRecord> auditRecords = new ArrayList();
    long timeoutMs = -1;
    long maxBytes = -1;
    String instanceId;
    String groupId;
    public SidecarProducer lightProducer;
    private static Logger logger = LoggerFactory.getLogger((Class<?>) ReactiveConsumerStartupHook.class);
    public static KafkaConsumerConfig config = (KafkaConsumerConfig) Config.getInstance().getJsonObjectConfig(KafkaConsumerConfig.CONFIG_NAME, KafkaConsumerConfig.class);
    public static boolean healthy = true;
    public static Http2Client client = Http2Client.getInstance();
    private static ExecutorService executor = Executors.newSingleThreadExecutor();
    public static boolean done = false;
    public static boolean readyForNextBatch = false;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/networknt/mesh/kafka/ReactiveConsumerStartupHook$ConsumerTask.class */
    public class ConsumerTask implements Runnable {
        ConsumerTask() {
        }

        @Override // java.lang.Runnable
        public void run() {
            if (ReactiveConsumerStartupHook.logger.isDebugEnabled()) {
                ReactiveConsumerStartupHook.logger.debug("done is {} and healthy is {} server ready is {}", Boolean.valueOf(ReactiveConsumerStartupHook.done), Boolean.valueOf(ReactiveConsumerStartupHook.healthy), Boolean.valueOf(Server.ready));
            }
            boolean z = false;
            while (true) {
                if (!z && SidecarHealthHandler.backendHealth().equals("OK")) {
                    z = true;
                }
                if (z && Server.ready) {
                    break;
                }
                ReactiveConsumerStartupHook.logger.info("Could not connect to the backend API or Server is not ready, wait for 1 second and try again.");
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            while (!ReactiveConsumerStartupHook.done) {
                ReactiveConsumerStartupHook.readyForNextBatch = false;
                ReactiveConsumerStartupHook.this.readRecords(KafkaConsumerState.class, SidecarConsumerRecord::fromConsumerRecord);
                while (!ReactiveConsumerStartupHook.readyForNextBatch) {
                    try {
                        Thread.sleep(ReactiveConsumerStartupHook.config.getWaitPeriod());
                    } catch (InterruptedException e2) {
                        ReactiveConsumerStartupHook.logger.error("InterruptedException", (Throwable) e2);
                    }
                }
            }
        }
    }

    @Override // com.networknt.server.StartupHookProvider
    public void onStartup() {
        logger.info("ReactiveConsumerStartupHook begins");
        if (config.isDeadLetterEnabled()) {
            if (ProducerStartupHook.producer == null) {
                logger.error("ProducerStartupHook is not configured and it is needed if DLQ is enabled");
                throw new RuntimeException("ProducerStartupHook is not loaded!");
            }
            this.lightProducer = (SidecarProducer) SingletonServiceFactory.getBean(NativeLightProducer.class);
        }
        kafkaConsumerManager = KafkaConsumerManagerFactory.createKafkaConsumerManager(config);
        this.groupId = (String) config.getProperties().get("group.id");
        subscribeTopic();
        runConsumer();
        logger.info("ReactiveConsumerStartupHook ends");
    }

    private void runConsumer() {
        executor.execute(new ConsumerTask());
        executor.shutdown();
    }

    public <KafkaKeyT, KafkaValueT, ClientKeyT, ClientValueT> void readRecords(Class<KafkaConsumerState> cls, final Function<ConsumerRecord<ClientKeyT, ClientValueT>, ?> function) {
        this.maxBytes = this.maxBytes <= 0 ? Long.MAX_VALUE : this.maxBytes;
        Duration ofMillis = Duration.ofMillis(this.timeoutMs);
        try {
            if (null == kafkaConsumerManager.getExistingConsumerInstance(this.groupId, this.instanceId) || null == kafkaConsumerManager.getExistingConsumerInstance(this.groupId, this.instanceId).getId() || StringUtils.isEmpty(kafkaConsumerManager.getExistingConsumerInstance(this.groupId, this.instanceId).getId().getInstance())) {
                healthy = false;
                logger.error("Consumer instance not found, marking health as false for group id: ", this.groupId);
            }
            kafkaConsumerManager.readRecords(this.groupId, this.instanceId, cls, ofMillis, this.maxBytes, new ConsumerReadCallback<ClientKeyT, ClientValueT>() { // from class: com.networknt.mesh.kafka.ReactiveConsumerStartupHook.1
                @Override // com.networknt.kafka.consumer.ConsumerReadCallback
                public void onCompletion(List<ConsumerRecord<ClientKeyT, ClientValueT>> list, FrameworkException frameworkException) {
                    if (frameworkException != null) {
                        ReactiveConsumerStartupHook.logger.error("FrameworkException: ", (Throwable) frameworkException);
                        ReactiveConsumerStartupHook.healthy = false;
                        return;
                    }
                    if (!ReactiveConsumerStartupHook.healthy) {
                        ReactiveConsumerStartupHook.healthy = true;
                    }
                    if (list.size() <= 0) {
                        if (ReactiveConsumerStartupHook.logger.isTraceEnabled()) {
                            ReactiveConsumerStartupHook.logger.trace("Polled nothing from the Kafka cluster");
                        }
                        ReactiveConsumerStartupHook.readyForNextBatch = true;
                        return;
                    }
                    if (ReactiveConsumerStartupHook.logger.isDebugEnabled()) {
                        ReactiveConsumerStartupHook.logger.debug("polled records size = " + list.size());
                    }
                    AtomicReference<ClientResponse> atomicReference = new AtomicReference<>();
                    SimpleConnectionHolder simpleConnectionHolder = null;
                    try {
                        try {
                            SimpleConnectionHolder.ConnectionToken borrow = ReactiveConsumerStartupHook.config.getBackendApiHost().startsWith("https") ? ReactiveConsumerStartupHook.client.borrow(new URI(ReactiveConsumerStartupHook.config.getBackendApiHost()), Http2Client.WORKER, ReactiveConsumerStartupHook.client.getDefaultXnioSsl(), Http2Client.BUFFER_POOL, OptionMap.create(UndertowOptions.ENABLE_HTTP2, true)) : ReactiveConsumerStartupHook.client.borrow(new URI(ReactiveConsumerStartupHook.config.getBackendApiHost()), Http2Client.WORKER, Http2Client.BUFFER_POOL, OptionMap.EMPTY);
                            ClientConnection clientConnection = (ClientConnection) borrow.getRawConnection();
                            borrow.holder();
                            ClientRequest path = new ClientRequest().setMethod(Methods.POST).setPath(ReactiveConsumerStartupHook.config.getBackendApiPath());
                            path.getRequestHeaders().put(Headers.CONTENT_TYPE, "application/json");
                            path.getRequestHeaders().put(Headers.TRANSFER_ENCODING, "chunked");
                            if (ReactiveConsumerStartupHook.config.isBackendConnectionReset()) {
                                path.getRequestHeaders().put(Headers.CONNECTION, "close");
                            }
                            path.getRequestHeaders().put(Headers.HOST, "localhost");
                            if (ReactiveConsumerStartupHook.logger.isInfoEnabled()) {
                                ReactiveConsumerStartupHook.logger.info("Send a batch to the backend API of size {}", Integer.valueOf(list.size()));
                            }
                            CountDownLatch countDownLatch = new CountDownLatch(1);
                            clientConnection.sendRequest(path, ReactiveConsumerStartupHook.client.createClientCallback(atomicReference, countDownLatch, JsonMapper.toJson(list.stream().map(function).collect(Collectors.toList()))));
                            countDownLatch.await(ReactiveConsumerStartupHook.config.getInstanceTimeoutMs(), TimeUnit.MILLISECONDS);
                            if (ObjectUtils.isEmpty(atomicReference) || ObjectUtils.isEmpty(atomicReference.get())) {
                                throw new TimeoutException("Rest Call to backend latch timeout");
                            }
                            int responseCode = atomicReference.get().getResponseCode();
                            String str = (String) atomicReference.get().getAttachment(Http2Client.RESPONSE_BODY);
                            if (null == ReactiveConsumerStartupHook.kafkaConsumerManager.getExistingConsumerInstance(ReactiveConsumerStartupHook.this.groupId, ReactiveConsumerStartupHook.this.instanceId) || null == ReactiveConsumerStartupHook.kafkaConsumerManager.getExistingConsumerInstance(ReactiveConsumerStartupHook.this.groupId, ReactiveConsumerStartupHook.this.instanceId).getId() || StringUtils.isEmpty(ReactiveConsumerStartupHook.kafkaConsumerManager.getExistingConsumerInstance(ReactiveConsumerStartupHook.this.groupId, ReactiveConsumerStartupHook.this.instanceId).getId().getInstance())) {
                                ReactiveConsumerStartupHook.logger.info("Marking health status false as consumer had exited , increase instance time out MS or preferably reduce batch size");
                                ReactiveConsumerStartupHook.healthy = false;
                                ReactiveConsumerStartupHook.readyForNextBatch = false;
                                ReactiveConsumerStartupHook.client.restore(borrow);
                                return;
                            }
                            if (ReactiveConsumerStartupHook.logger.isDebugEnabled()) {
                                ReactiveConsumerStartupHook.logger.debug("statusCode = " + responseCode + " body  = " + str);
                            }
                            if (responseCode >= 400) {
                                ReactiveConsumerStartupHook.logger.error("Rollback due to error response from backend with status code = " + responseCode + " body = " + str);
                                ReactiveConsumerStartupHook.kafkaConsumerManager.rollback(list, ReactiveConsumerStartupHook.this.groupId, ReactiveConsumerStartupHook.this.instanceId);
                                ReactiveConsumerStartupHook.readyForNextBatch = true;
                            } else {
                                if (ReactiveConsumerStartupHook.logger.isInfoEnabled()) {
                                    ReactiveConsumerStartupHook.logger.info("Got successful response from the backend API");
                                }
                                ReactiveConsumerStartupHook.this.processResponse(ReactiveConsumerStartupHook.kafkaConsumerManager, ReactiveConsumerStartupHook.this.lightProducer, ReactiveConsumerStartupHook.config, str, responseCode, list.size(), ReactiveConsumerStartupHook.this.auditRecords, false);
                                List<TopicPartitionOffsetMetadata> list2 = ReactiveConsumerStartupHook.topicPartitionOffsetMetadataUtility(list);
                                ReactiveConsumerStartupHook.kafkaConsumerManager.commitOffsets(ReactiveConsumerStartupHook.this.groupId, ReactiveConsumerStartupHook.this.instanceId, false, new ConsumerOffsetCommitRequest(list2), (list3, frameworkException2) -> {
                                    if (null != frameworkException2) {
                                        ReactiveConsumerStartupHook.logger.error("Error committing offset, will force a restart ", (Throwable) frameworkException2);
                                        throw new RuntimeException(frameworkException2.getMessage());
                                    }
                                    list2.forEach(topicPartitionOffsetMetadata -> {
                                        ReactiveConsumerStartupHook.logger.info("Committed to topic = " + topicPartitionOffsetMetadata.getTopic() + " partition = " + topicPartitionOffsetMetadata.getPartition() + " offset = " + topicPartitionOffsetMetadata.getOffset());
                                    });
                                });
                                ReactiveConsumerStartupHook.readyForNextBatch = true;
                            }
                            ReactiveConsumerStartupHook.client.restore(borrow);
                        } catch (Exception e) {
                            ReactiveConsumerStartupHook.logger.error("Process response exception: ", (Throwable) e);
                            if (0 != 0) {
                                simpleConnectionHolder.safeClose(System.currentTimeMillis());
                            }
                            ReactiveConsumerStartupHook.logger.info("Marking health status false as consumer had exited , increase instance time out MS or preferably reduce batch size");
                            ReactiveConsumerStartupHook.healthy = false;
                            ReactiveConsumerStartupHook.readyForNextBatch = false;
                            ReactiveConsumerStartupHook.client.restore(null);
                        }
                    } catch (Throwable th) {
                        ReactiveConsumerStartupHook.client.restore(null);
                        throw th;
                    }
                }
            });
        } catch (Exception e) {
            logger.info("Exception while reading messages", (Throwable) e);
            healthy = false;
            readyForNextBatch = false;
        }
    }

    public void subscribeTopic() {
        this.instanceId = kafkaConsumerManager.createConsumer(this.groupId, new CreateConsumerInstanceRequest(null, null, config.getKeyFormat(), config.getValueFormat(), null, null, null, null).toConsumerInstanceConfig());
        String topic = config.getTopic();
        kafkaConsumerManager.subscribe(this.groupId, this.instanceId, topic.contains(UriTemplate.DEFAULT_SEPARATOR) ? new ConsumerSubscriptionRecord(Arrays.asList(topic.replaceAll("\\s+", "").split(UriTemplate.DEFAULT_SEPARATOR, -1)), null) : new ConsumerSubscriptionRecord(Collections.singletonList(config.getTopic()), null));
    }

    public static <ClientKeyT, ClientValueT> List<TopicPartitionOffsetMetadata> topicPartitionOffsetMetadataUtility(List<ConsumerRecord<ClientKeyT, ClientValueT>> list) {
        HashMap hashMap = new HashMap();
        for (ConsumerRecord<ClientKeyT, ClientValueT> consumerRecord : list) {
            String topic = consumerRecord.getTopic();
            int partition = consumerRecord.getPartition();
            long offset = consumerRecord.getOffset();
            TopicPartitionOffsetMetadata topicPartitionOffsetMetadata = (TopicPartitionOffsetMetadata) hashMap.get(topic + ":" + partition);
            if (topicPartitionOffsetMetadata == null) {
                hashMap.put(topic + ":" + partition, new TopicPartitionOffsetMetadata(topic, Integer.valueOf(partition), Long.valueOf(offset), null));
            } else if (topicPartitionOffsetMetadata.getOffset().longValue() < offset) {
                hashMap.put(topic + ":" + partition, new TopicPartitionOffsetMetadata(topic, Integer.valueOf(partition), Long.valueOf(offset), null));
            }
        }
        return (List) hashMap.values().stream().collect(Collectors.toList());
    }
}
