package com.networknt.mesh.kafka.handler;

import com.damnhandy.uri.template.UriTemplate;
import com.networknt.client.Http2Client;
import com.networknt.client.simplepool.SimpleConnectionHolder;
import com.networknt.config.Config;
import com.networknt.config.JsonMapper;
import com.networknt.exception.FrameworkException;
import com.networknt.handler.LightHttpHandler;
import com.networknt.kafka.common.KafkaConsumerConfig;
import com.networknt.kafka.consumer.ConsumerReadCallback;
import com.networknt.kafka.consumer.KafkaConsumerReadTask;
import com.networknt.kafka.consumer.KafkaConsumerState;
import com.networknt.kafka.entity.AuditRecord;
import com.networknt.kafka.entity.ConsumerOffsetCommitRequest;
import com.networknt.kafka.entity.ConsumerRecord;
import com.networknt.kafka.entity.ConsumerSubscriptionRecord;
import com.networknt.kafka.entity.CreateConsumerInstanceRequest;
import com.networknt.kafka.entity.DeadLetterQueueReplayResponse;
import com.networknt.kafka.entity.SidecarConsumerRecord;
import com.networknt.kafka.entity.TopicPartitionOffsetMetadata;
import com.networknt.kafka.producer.NativeLightProducer;
import com.networknt.kafka.producer.SidecarProducer;
import com.networknt.mesh.kafka.ProducerStartupHook;
import com.networknt.mesh.kafka.ReactiveConsumerStartupHook;
import com.networknt.mesh.kafka.WriteAuditLog;
import com.networknt.monad.Failure;
import com.networknt.monad.Result;
import com.networknt.monad.Success;
import com.networknt.server.Server;
import com.networknt.service.SingletonServiceFactory;
import com.networknt.status.Status;
import com.networknt.utility.Constants;
import com.networknt.utility.NetUtils;
import com.networknt.utility.ObjectUtils;
import com.networknt.utility.StringUtils;
import io.undertow.UndertowOptions;
import io.undertow.client.ClientConnection;
import io.undertow.client.ClientRequest;
import io.undertow.client.ClientResponse;
import io.undertow.server.HttpServerExchange;
import io.undertow.util.Headers;
import io.undertow.util.Methods;
import java.net.URI;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Deque;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xnio.OptionMap;

/* loaded from: input_file:com/networknt/mesh/kafka/handler/DeadlettersQueueReactiveGetHandler.class */
public class DeadlettersQueueReactiveGetHandler extends WriteAuditLog implements LightHttpHandler {
    String instanceId;
    String groupId;
    SidecarProducer lightProducer;
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) DeadlettersQueueReactiveGetHandler.class);
    public static KafkaConsumerConfig config = (KafkaConsumerConfig) Config.getInstance().getJsonObjectConfig(KafkaConsumerConfig.CONFIG_NAME, KafkaConsumerConfig.class);
    private static String UNEXPECTED_CONSUMER_READ_EXCEPTION = KafkaConsumerReadTask.UNEXPECTED_CONSUMER_READ_EXCEPTION;
    private static String INVALID_TOPIC_NAME = "ERR30001";
    private static String REPLAY_DEFAULT_INSTANCE = "Reactive-Replay-" + getIP();
    public static Http2Client client = Http2Client.getInstance();
    long maxBytes = -1;
    private boolean lastRetry = false;
    private AtomicReference<Result<List<ConsumerRecord<Object, Object>>>> result = new AtomicReference<>();
    public List<AuditRecord> auditRecords = new ArrayList();

    public DeadlettersQueueReactiveGetHandler() {
        if (config.isDeadLetterEnabled()) {
            if (ProducerStartupHook.producer == null) {
                logger.error("ProducerStartupHook is not configured and it is needed if DLQ is enabled");
                throw new RuntimeException("ProducerStartupHook is not loaded!");
            }
            this.lightProducer = (SidecarProducer) SingletonServiceFactory.getBean(NativeLightProducer.class);
        }
        if (logger.isDebugEnabled()) {
            logger.debug("DeadlettersQueueReactiveGetHandler constructed!");
        }
    }

    @Override // io.undertow.server.HttpHandler
    public void handleRequest(HttpServerExchange httpServerExchange) throws Exception {
        List asList;
        this.groupId = httpServerExchange.getQueryParameters().get(Constants.GROUP) == null ? config.getGroupId() : httpServerExchange.getQueryParameters().get(Constants.GROUP).getFirst();
        this.instanceId = REPLAY_DEFAULT_INSTANCE;
        if (httpServerExchange.getQueryParameters().get("lastretry") != null) {
            this.lastRetry = Boolean.parseBoolean(httpServerExchange.getQueryParameters().get("lastretry").getFirst());
        }
        Deque<String> deque = httpServerExchange.getQueryParameters().get("timeout");
        long longValue = deque != null ? Long.valueOf(deque.getFirst()).longValue() : -1L;
        String topic = config.getTopic();
        List asList2 = topic.contains(UriTemplate.DEFAULT_SEPARATOR) ? Arrays.asList(topic.replaceAll("\\s+", "").split(UriTemplate.DEFAULT_SEPARATOR, -1)) : Collections.singletonList(topic);
        if (httpServerExchange.getQueryParameters().get("topic") == null) {
            asList = asList2;
        } else {
            String first = httpServerExchange.getQueryParameters().get("topic").getFirst();
            asList = first.contains(UriTemplate.DEFAULT_SEPARATOR) ? Arrays.asList(first.replaceAll("\\s+", "").split(UriTemplate.DEFAULT_SEPARATOR, -1)) : Collections.singletonList(first);
            if (!asList2.containsAll(asList)) {
                setExchangeStatus(httpServerExchange, INVALID_TOPIC_NAME, new Object[0]);
                return;
            }
        }
        List<String> list = (List) asList.stream().map(str -> {
            return str + config.getDeadLetterTopicExt();
        }).collect(Collectors.toList());
        ConsumerSubscriptionRecord subscribeTopic = subscribeTopic(list);
        httpServerExchange.dispatch();
        long j = 0;
        AtomicReference<Result<List<ConsumerRecord<Object, Object>>>> atomicReference = null;
        try {
            try {
                SimpleConnectionHolder.ConnectionToken borrow = config.getBackendApiHost().startsWith("https") ? client.borrow(new URI(config.getBackendApiHost()), Http2Client.WORKER, client.getDefaultXnioSsl(), Http2Client.BUFFER_POOL, OptionMap.create(UndertowOptions.ENABLE_HTTP2, true)) : client.borrow(new URI(config.getBackendApiHost()), Http2Client.WORKER, Http2Client.BUFFER_POOL, OptionMap.EMPTY);
                ClientConnection clientConnection = (ClientConnection) borrow.getRawConnection();
                for (int i = 0; i < 20 && j == 0; i++) {
                    atomicReference = readRecords(httpServerExchange, this.groupId, this.instanceId, Duration.ofMillis(longValue), subscribeTopic.getTopics(), KafkaConsumerState.class, SidecarConsumerRecord::fromConsumerRecord);
                    if (ObjectUtils.isEmpty(atomicReference) || ObjectUtils.isEmpty(atomicReference.get()) || !atomicReference.get().isSuccess() || ObjectUtils.isEmpty(atomicReference.get().getResult())) {
                        Thread.sleep(config.getWaitPeriod());
                    } else {
                        j = atomicReference.get().getResult().size();
                    }
                }
                if (ObjectUtils.isEmpty(atomicReference) || (!ObjectUtils.isEmpty(atomicReference.get()) && atomicReference.get().isSuccess() && ObjectUtils.isEmpty(atomicReference.get().getResult()))) {
                    httpServerExchange.getResponseHeaders().put(Headers.CONTENT_TYPE, "application/json");
                    httpServerExchange.setStatusCode(200);
                    DeadLetterQueueReplayResponse deadLetterQueueReplayResponse = new DeadLetterQueueReplayResponse();
                    deadLetterQueueReplayResponse.setGroup(this.groupId);
                    deadLetterQueueReplayResponse.setTopics(list);
                    deadLetterQueueReplayResponse.setInstance(this.instanceId);
                    deadLetterQueueReplayResponse.setRecords(0L);
                    deadLetterQueueReplayResponse.setDescription("Dead letter queue process successful to end, no records processed");
                    httpServerExchange.getResponseSender().send(JsonMapper.toJson(deadLetterQueueReplayResponse));
                } else {
                    if (ObjectUtils.isEmpty(atomicReference) || ObjectUtils.isEmpty(atomicReference.get()) || !atomicReference.get().isSuccess() || ObjectUtils.isEmpty(atomicReference.get().getResult()) || atomicReference.get().getResult().size() == 0) {
                        setExchangeStatus(httpServerExchange, atomicReference.get().getError());
                        client.restore(borrow);
                        return;
                    }
                    List<ConsumerRecord<Object, Object>> result = atomicReference.get().getResult();
                    if (logger.isDebugEnabled()) {
                        logger.debug("polled records size = " + result.size());
                    }
                    CountDownLatch countDownLatch = new CountDownLatch(1);
                    AtomicReference<ClientResponse> atomicReference2 = new AtomicReference<>();
                    try {
                        ClientRequest path = new ClientRequest().setMethod(Methods.POST).setPath(config.getBackendApiPath());
                        path.getRequestHeaders().put(Headers.CONTENT_TYPE, "application/json");
                        path.getRequestHeaders().put(Headers.TRANSFER_ENCODING, "chunked");
                        if (config.isBackendConnectionReset()) {
                            path.getRequestHeaders().put(Headers.CONNECTION, "close");
                        }
                        path.getRequestHeaders().put(Headers.HOST, "localhost");
                        if (logger.isInfoEnabled()) {
                            logger.info("Send a batch to the backend API");
                        }
                        clientConnection.sendRequest(path, Http2Client.getInstance().createClientCallback(atomicReference2, countDownLatch, JsonMapper.toJson(result.stream().map(SidecarConsumerRecord::fromConsumerRecord).collect(Collectors.toList()))));
                        countDownLatch.await();
                        int responseCode = atomicReference2.get().getResponseCode();
                        String str2 = (String) atomicReference2.get().getAttachment(Http2Client.RESPONSE_BODY);
                        if (null == ReactiveConsumerStartupHook.kafkaConsumerManager.getExistingConsumerInstance(this.groupId, this.instanceId) || null == ReactiveConsumerStartupHook.kafkaConsumerManager.getExistingConsumerInstance(this.groupId, this.instanceId).getId() || StringUtils.isEmpty(ReactiveConsumerStartupHook.kafkaConsumerManager.getExistingConsumerInstance(this.groupId, this.instanceId).getId().getInstance())) {
                            subscribeTopic(list);
                            logger.info("Resubscribed to topic as consumer had exited .");
                        }
                        if (logger.isDebugEnabled()) {
                            logger.debug("statusCode = " + responseCode + " body  = " + str2);
                        }
                        if (responseCode >= 400) {
                            logger.error("Rollback due to error response from backend with status code = " + responseCode + " body = " + str2);
                            ReactiveConsumerStartupHook.kafkaConsumerManager.rollback(result, this.groupId, this.instanceId);
                            ReactiveConsumerStartupHook.kafkaConsumerManager.rollbackExchangeDefinition(httpServerExchange, this.groupId, this.instanceId, list, result);
                        } else {
                            if (logger.isInfoEnabled()) {
                                logger.info("Got successful response from the backend API");
                            }
                            processResponse(ReactiveConsumerStartupHook.kafkaConsumerManager, this.lightProducer, config, str2, responseCode, result.size(), this.auditRecords, this.lastRetry);
                            List<TopicPartitionOffsetMetadata> list2 = ReactiveConsumerStartupHook.topicPartitionOffsetMetadataUtility(result);
                            ReactiveConsumerStartupHook.kafkaConsumerManager.commitOffsets(this.groupId, this.instanceId, false, new ConsumerOffsetCommitRequest(list2), (list3, frameworkException) -> {
                                if (null != frameworkException) {
                                    logger.error("Error committing offset, will force a restart ", (Throwable) frameworkException);
                                    throw new RuntimeException(frameworkException.getMessage());
                                }
                                list2.forEach(topicPartitionOffsetMetadata -> {
                                    logger.info("Committed to topic = " + topicPartitionOffsetMetadata.getTopic() + " partition = " + topicPartitionOffsetMetadata.getPartition() + " offset = " + topicPartitionOffsetMetadata.getOffset());
                                });
                            });
                            if (logger.isDebugEnabled()) {
                                logger.debug("total dlq records processed:" + result.size());
                            }
                            ReactiveConsumerStartupHook.kafkaConsumerManager.successExchangeDefinition(httpServerExchange, this.groupId, this.instanceId, list, result);
                        }
                    } catch (Exception e) {
                        logger.error("Rollback due to process response exception: ", (Throwable) e);
                        if (null == ReactiveConsumerStartupHook.kafkaConsumerManager.getExistingConsumerInstance(this.groupId, this.instanceId) || null == ReactiveConsumerStartupHook.kafkaConsumerManager.getExistingConsumerInstance(this.groupId, this.instanceId).getId() || StringUtils.isEmpty(ReactiveConsumerStartupHook.kafkaConsumerManager.getExistingConsumerInstance(this.groupId, this.instanceId).getId().getInstance())) {
                            subscribeTopic(list);
                            logger.info("Resubscribed to topic as consumer had exited .");
                        }
                        ReactiveConsumerStartupHook.kafkaConsumerManager.rollback(result, this.groupId, this.instanceId);
                        ReactiveConsumerStartupHook.kafkaConsumerManager.rollbackExchangeDefinition(httpServerExchange, this.groupId, this.instanceId, list, result);
                    }
                }
                client.restore(borrow);
            } catch (Exception e2) {
                logger.error("Exception:", (Throwable) e2);
                setExchangeStatus(httpServerExchange, UNEXPECTED_CONSUMER_READ_EXCEPTION, new Object[0]);
                httpServerExchange.endExchange();
                client.restore(null);
            }
        } catch (Throwable th) {
            client.restore(null);
            throw th;
        }
    }

    private AtomicReference<Result<List<ConsumerRecord<Object, Object>>>> readRecords(HttpServerExchange httpServerExchange, String str, String str2, Duration duration, List<String> list, Class<KafkaConsumerState> cls, Function<ConsumerRecord<Object, Object>, ?> function) {
        this.maxBytes = this.maxBytes <= 0 ? Long.MAX_VALUE : this.maxBytes;
        try {
            ReactiveConsumerStartupHook.kafkaConsumerManager.readRecords(str, str2, cls, duration, this.maxBytes, new ConsumerReadCallback<Object, Object>() { // from class: com.networknt.mesh.kafka.handler.DeadlettersQueueReactiveGetHandler.1
                @Override // com.networknt.kafka.consumer.ConsumerReadCallback
                public void onCompletion(List<ConsumerRecord<Object, Object>> list2, FrameworkException frameworkException) {
                    if (frameworkException != null) {
                        DeadlettersQueueReactiveGetHandler.logger.error("FrameworkException:", (Throwable) frameworkException);
                        DeadlettersQueueReactiveGetHandler.this.result.set(Failure.of(new Status(DeadlettersQueueReactiveGetHandler.UNEXPECTED_CONSUMER_READ_EXCEPTION, frameworkException.getMessage())));
                    } else {
                        if (list2.size() <= 0) {
                            DeadlettersQueueReactiveGetHandler.this.result.set(Success.of(null));
                            return;
                        }
                        DeadlettersQueueReactiveGetHandler.this.result.set(Success.of(list2));
                        if (DeadlettersQueueReactiveGetHandler.logger.isDebugEnabled()) {
                            DeadlettersQueueReactiveGetHandler.logger.debug("polled records size = " + list2.size());
                        }
                    }
                }
            });
            return this.result;
        } catch (Exception e) {
            logger.info("readRecords from Kafka exception, please retry!!!", (Throwable) e);
            return this.result;
        }
    }

    public ConsumerSubscriptionRecord subscribeTopic(List<String> list) {
        if (ReactiveConsumerStartupHook.kafkaConsumerManager.getExistingConsumerInstance(this.groupId, REPLAY_DEFAULT_INSTANCE) == null) {
            this.instanceId = ReactiveConsumerStartupHook.kafkaConsumerManager.createConsumer(this.groupId, new CreateConsumerInstanceRequest(REPLAY_DEFAULT_INSTANCE, null, config.getKeyFormat(), config.getValueFormat(), null, null, null, null).toConsumerInstanceConfig());
        }
        ConsumerSubscriptionRecord consumerSubscriptionRecord = new ConsumerSubscriptionRecord(list, null);
        ReactiveConsumerStartupHook.kafkaConsumerManager.subscribe(this.groupId, this.instanceId, consumerSubscriptionRecord);
        return consumerSubscriptionRecord;
    }

    private static String getIP() {
        return !StringUtils.isEmpty(System.getenv(Server.STATUS_HOST_IP)) ? System.getenv(Server.STATUS_HOST_IP) : NetUtils.getLocalAddressByDatagram();
    }
}
