package com.networknt.mesh.kafka.handler;

import com.networknt.body.BodyHandler;
import com.networknt.config.Config;
import com.networknt.config.JsonMapper;
import com.networknt.handler.LightHttpHandler;
import com.networknt.kafka.common.KafkaProducerConfig;
import com.networknt.kafka.entity.AuditRecord;
import com.networknt.kafka.entity.EmbeddedFormat;
import com.networknt.kafka.entity.ProduceRequest;
import com.networknt.kafka.producer.NativeLightProducer;
import com.networknt.kafka.producer.SidecarProducer;
import com.networknt.mesh.kafka.ProducerStartupHook;
import com.networknt.mesh.kafka.WriteAuditLog;
import com.networknt.server.ServerConfig;
import com.networknt.service.SingletonServiceFactory;
import com.networknt.utility.Constants;
import io.undertow.server.HttpServerExchange;
import io.undertow.util.Headers;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/networknt/mesh/kafka/handler/ProducersTopicPostHandler.class */
public class ProducersTopicPostHandler extends WriteAuditLog implements LightHttpHandler {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) ProducersTopicPostHandler.class);
    private static String STATUS_ACCEPTED = "SUC10202";
    private static String PRODUCER_NOT_ENABLED = "ERR12216";
    private String callerId;
    SidecarProducer lightProducer;
    private KafkaProducerConfig config;
    public List<AuditRecord> auditRecords = new ArrayList();

    public ProducersTopicPostHandler() {
        ServerConfig serverConfig;
        this.callerId = "unknown";
        if (ProducerStartupHook.producer != null) {
            this.lightProducer = (SidecarProducer) SingletonServiceFactory.getBean(NativeLightProducer.class);
            SidecarProducer sidecarProducer = this.lightProducer;
            this.config = SidecarProducer.config;
            if (!this.config.isInjectCallerId() || (serverConfig = ServerConfig.getInstance()) == null) {
                return;
            }
            this.callerId = serverConfig.getServiceId();
        }
    }

    @Override // io.undertow.server.HttpHandler
    public void handleRequest(HttpServerExchange httpServerExchange) throws Exception {
        if (ProducerStartupHook.producer == null) {
            setExchangeStatus(httpServerExchange, PRODUCER_NOT_ENABLED, new Object[0]);
            return;
        }
        String first = httpServerExchange.getQueryParameters().get("topic").getFirst();
        long currentTimeMillis = System.currentTimeMillis();
        if (logger.isInfoEnabled()) {
            logger.info("ProducerTopicPostHandler handleRequest start with topic " + first);
        }
        httpServerExchange.dispatch();
        ProduceRequest produceRequest = (ProduceRequest) Config.getInstance().getMapper().convertValue((Map) httpServerExchange.getAttachment(BodyHandler.REQUEST_BODY), ProduceRequest.class);
        if (produceRequest.getKeyFormat().isEmpty() && this.config.getKeyFormat() != null) {
            produceRequest.setKeyFormat(Optional.of(EmbeddedFormat.valueOf(this.config.getKeyFormat().toUpperCase())));
        }
        if (produceRequest.getValueFormat().isEmpty() && this.config.getValueFormat() != null) {
            produceRequest.setValueFormat(Optional.of(EmbeddedFormat.valueOf(this.config.getValueFormat().toUpperCase())));
        }
        this.lightProducer.produceWithSchema(first, ServerConfig.getInstance().getServiceId(), Optional.empty(), produceRequest, populateHeaders(httpServerExchange, this.config, first), this.auditRecords).whenCompleteAsync((produceResponse, th) -> {
            long currentTimeMillis2 = System.currentTimeMillis();
            synchronized (this.auditRecords) {
                if (this.auditRecords != null && this.auditRecords.size() > 0) {
                    this.auditRecords.forEach(auditRecord -> {
                        writeAuditLog(auditRecord, this.config.getAuditTarget(), this.config.getAuditTopic());
                    });
                    this.auditRecords.clear();
                }
            }
            if (logger.isDebugEnabled()) {
                logger.debug("Writing audit log takes " + (System.currentTimeMillis() - currentTimeMillis2));
                logger.debug("ProducerTopicPostHandler handleRequest total time is " + (System.currentTimeMillis() - currentTimeMillis));
            }
            httpServerExchange.getResponseHeaders().put(Headers.CONTENT_TYPE, "application/json");
            httpServerExchange.getResponseSender().send(JsonMapper.toJson(produceResponse));
        });
    }

    public org.apache.kafka.common.header.Headers populateHeaders(HttpServerExchange httpServerExchange, KafkaProducerConfig kafkaProducerConfig, String str) {
        RecordHeaders recordHeaders = new RecordHeaders();
        String first = httpServerExchange.getRequestHeaders().getFirst(Constants.AUTHORIZATION_STRING);
        if (first != null) {
            recordHeaders.add(Constants.AUTHORIZATION_STRING, first.getBytes(StandardCharsets.UTF_8));
        }
        if (kafkaProducerConfig.isInjectCallerId()) {
            recordHeaders.add(Constants.CALLER_ID_STRING, this.callerId.getBytes(StandardCharsets.UTF_8));
        }
        return recordHeaders;
    }
}
