package com.networknt.mesh.kafka.handler;

import com.networknt.config.JsonMapper;
import com.networknt.exception.FrameworkException;
import com.networknt.handler.LightHttpHandler;
import com.networknt.kafka.consumer.ConsumerReadCallback;
import com.networknt.kafka.consumer.KafkaConsumerState;
import com.networknt.kafka.entity.ConsumerRecord;
import com.networknt.kafka.entity.SidecarConsumerRecord;
import com.networknt.mesh.kafka.ActiveConsumerStartupHook;
import com.networknt.utility.Constants;
import io.undertow.server.HttpServerExchange;
import io.undertow.util.Headers;
import java.time.Duration;
import java.util.Deque;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/networknt/mesh/kafka/handler/ConsumersGroupInstancesInstanceRecordsGetHandler.class */
public class ConsumersGroupInstancesInstanceRecordsGetHandler implements LightHttpHandler {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) ConsumersGroupInstancesInstanceDeleteHandler.class);

    public ConsumersGroupInstancesInstanceRecordsGetHandler() {
        if (logger.isDebugEnabled()) {
            logger.debug("ConsumersGroupInstancesInstanceRecordsGetHandler constructed!");
        }
    }

    @Override // io.undertow.server.HttpHandler
    public void handleRequest(HttpServerExchange httpServerExchange) throws Exception {
        httpServerExchange.dispatch();
        try {
            String activeReadRecordUtil = activeReadRecordUtil(httpServerExchange.getPathParameters().get(Constants.GROUP).getFirst(), httpServerExchange.getPathParameters().get("instance").getFirst(), httpServerExchange.getQueryParameters().get("timeout"), httpServerExchange.getQueryParameters().get("maxBytes"));
            httpServerExchange.getResponseHeaders().put(Headers.CONTENT_TYPE, "application/json");
            httpServerExchange.setStatusCode(200);
            httpServerExchange.getResponseSender().send(activeReadRecordUtil);
        } catch (FrameworkException e) {
            if (logger.isDebugEnabled()) {
                logger.debug("FrameworkException: ", (Throwable) e);
            }
            setExchangeStatus(httpServerExchange, e.getStatus());
        } catch (Exception e2) {
            if (logger.isDebugEnabled()) {
                logger.debug("Exception: ", (Throwable) e2);
            }
            setExchangeStatus(httpServerExchange, e2.getMessage(), new Object[0]);
        }
    }

    public String activeReadRecordUtil(String str, String str2, Deque<String> deque, Deque<String> deque2) {
        long j = -1;
        if (deque != null) {
            j = Long.valueOf(deque.getFirst()).longValue();
        }
        long j2 = -1;
        if (deque2 != null) {
            j2 = Long.valueOf(deque2.getFirst()).longValue();
        }
        try {
            return readRecords(str, str2, Duration.ofMillis(j), j2, KafkaConsumerState.class, SidecarConsumerRecord::fromConsumerRecord).get();
        } catch (FrameworkException e) {
            throw new FrameworkException(e.getStatus());
        } catch (InterruptedException e2) {
            throw new RuntimeException(e2.getMessage());
        } catch (ExecutionException e3) {
            throw new RuntimeException(e3.getMessage());
        }
    }

    private <KafkaKeyT, KafkaValueT, ClientKeyT, ClientValueT> CompletableFuture<String> readRecords(String str, String str2, Duration duration, long j, Class<KafkaConsumerState> cls, final Function<ConsumerRecord<ClientKeyT, ClientValueT>, ?> function) {
        final CompletableFuture<String> completableFuture = new CompletableFuture<>();
        ActiveConsumerStartupHook.kafkaConsumerManager.readRecords(str, str2, cls, duration, j <= 0 ? Long.MAX_VALUE : j, new ConsumerReadCallback<ClientKeyT, ClientValueT>() { // from class: com.networknt.mesh.kafka.handler.ConsumersGroupInstancesInstanceRecordsGetHandler.1
            @Override // com.networknt.kafka.consumer.ConsumerReadCallback
            public void onCompletion(List<ConsumerRecord<ClientKeyT, ClientValueT>> list, FrameworkException frameworkException) {
                if (frameworkException != null) {
                    throw new FrameworkException(frameworkException.getStatus());
                }
                completableFuture.complete(JsonMapper.toJson(list.stream().map(function).collect(Collectors.toList())));
            }
        });
        return completableFuture;
    }
}
