package com.networknt.mesh.kafka.handler;

import com.damnhandy.uri.template.UriTemplate;
import com.networknt.client.Http2Client;
import com.networknt.config.Config;
import com.networknt.config.JsonMapper;
import com.networknt.exception.FrameworkException;
import com.networknt.handler.LightHttpHandler;
import com.networknt.kafka.common.KafkaConsumerConfig;
import com.networknt.kafka.consumer.ConsumerReadCallback;
import com.networknt.kafka.consumer.KafkaConsumerState;
import com.networknt.kafka.entity.ConsumerRecord;
import com.networknt.kafka.entity.ConsumerSubscriptionRecord;
import com.networknt.kafka.entity.CreateConsumerInstanceRequest;
import com.networknt.kafka.entity.EmbeddedFormat;
import com.networknt.kafka.entity.SidecarConsumerRecord;
import com.networknt.mesh.kafka.ActiveConsumerStartupHook;
import com.networknt.utility.Constants;
import io.undertow.client.ClientConnection;
import io.undertow.server.HttpServerExchange;
import io.undertow.util.Headers;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Deque;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/networknt/mesh/kafka/handler/DeadlettersQueueActiveGetHandler.class */
public class DeadlettersQueueActiveGetHandler implements LightHttpHandler {
    long maxBytes = -1;
    public static ClientConnection connection;
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) DeadlettersQueueActiveGetHandler.class);
    public static KafkaConsumerConfig config = (KafkaConsumerConfig) Config.getInstance().getJsonObjectConfig(KafkaConsumerConfig.CONFIG_NAME, KafkaConsumerConfig.class);
    public static Http2Client client = Http2Client.getInstance();
    private static String REPLAY_DEFAULT_INSTANCE = "Active-Replay-1289990";

    public DeadlettersQueueActiveGetHandler() {
        if (logger.isDebugEnabled()) {
            logger.debug("ReplayDeadLetterTopicGetHandler constructed!");
        }
    }

    @Override // io.undertow.server.HttpHandler
    public void handleRequest(HttpServerExchange httpServerExchange) throws Exception {
        String groupId = httpServerExchange.getQueryParameters().get(Constants.GROUP) == null ? config.getGroupId() : httpServerExchange.getQueryParameters().get(Constants.GROUP).getFirst();
        KafkaConsumerState<?, ?, ?, ?> existingConsumerInstance = ActiveConsumerStartupHook.kafkaConsumerManager.getExistingConsumerInstance(groupId, REPLAY_DEFAULT_INSTANCE);
        String str = REPLAY_DEFAULT_INSTANCE;
        if (existingConsumerInstance == null) {
            str = ActiveConsumerStartupHook.kafkaConsumerManager.createConsumer(groupId, new CreateConsumerInstanceRequest(REPLAY_DEFAULT_INSTANCE, null, EmbeddedFormat.STRING.name(), EmbeddedFormat.STRING.name(), null, null, null, null).toConsumerInstanceConfig());
        }
        Deque<String> deque = httpServerExchange.getQueryParameters().get("timeout");
        long j = -1;
        if (deque != null) {
            j = Long.valueOf(deque.getFirst()).longValue();
        }
        String topic = httpServerExchange.getQueryParameters().get("topic") == null ? config.getTopic() : httpServerExchange.getQueryParameters().get("topic").getFirst();
        ConsumerSubscriptionRecord consumerSubscriptionRecord = topic.contains(UriTemplate.DEFAULT_SEPARATOR) ? new ConsumerSubscriptionRecord((List) Arrays.asList(topic.replaceAll("\\s+", "").split(UriTemplate.DEFAULT_SEPARATOR, -1)).stream().map(str2 -> {
            return str2 + config.getDeadLetterTopicExt();
        }).collect(Collectors.toList()), null) : new ConsumerSubscriptionRecord(Collections.singletonList(topic + config.getDeadLetterTopicExt()), null);
        ActiveConsumerStartupHook.kafkaConsumerManager.subscribe(groupId, str, consumerSubscriptionRecord);
        httpServerExchange.dispatch();
        readRecords(httpServerExchange, groupId, str, Duration.ofMillis(j), consumerSubscriptionRecord.getTopics(), KafkaConsumerState.class, SidecarConsumerRecord::fromConsumerRecord);
    }

    private <ClientKeyT, ClientValueT> void readRecords(final HttpServerExchange httpServerExchange, final String str, final String str2, Duration duration, List<String> list, Class<KafkaConsumerState> cls, final Function<ConsumerRecord<ClientKeyT, ClientValueT>, ?> function) {
        this.maxBytes = this.maxBytes <= 0 ? Long.MAX_VALUE : this.maxBytes;
        ActiveConsumerStartupHook.kafkaConsumerManager.readRecords(str, str2, cls, duration, this.maxBytes, new ConsumerReadCallback<ClientKeyT, ClientValueT>() { // from class: com.networknt.mesh.kafka.handler.DeadlettersQueueActiveGetHandler.1
            @Override // com.networknt.kafka.consumer.ConsumerReadCallback
            public void onCompletion(List<ConsumerRecord<ClientKeyT, ClientValueT>> list2, FrameworkException frameworkException) {
                if (frameworkException != null) {
                    if (DeadlettersQueueActiveGetHandler.logger.isDebugEnabled()) {
                        DeadlettersQueueActiveGetHandler.logger.debug("FrameworkException:", (Throwable) frameworkException);
                    }
                    DeadlettersQueueActiveGetHandler.this.setExchangeStatus(httpServerExchange, frameworkException.getStatus());
                    return;
                }
                if (DeadlettersQueueActiveGetHandler.logger.isDebugEnabled()) {
                    DeadlettersQueueActiveGetHandler.logger.debug("polled records size = " + list2.size());
                }
                if (DeadlettersQueueActiveGetHandler.logger.isDebugEnabled()) {
                    DeadlettersQueueActiveGetHandler.logger.debug("total dlq records processed:" + list2.size());
                }
                ActiveConsumerStartupHook.kafkaConsumerManager.commitCurrentOffsets(str, str2);
                httpServerExchange.getResponseHeaders().put(Headers.CONTENT_TYPE, "application/json");
                httpServerExchange.setStatusCode(200);
                httpServerExchange.getResponseSender().send(JsonMapper.toJson(list2.stream().map(function).collect(Collectors.toList())));
            }
        });
    }
}
