package com.networknt.kafka.producer;

import com.fasterxml.jackson.databind.node.NullNode;
import com.google.protobuf.ByteString;
import com.networknt.config.Config;
import com.networknt.exception.FrameworkException;
import com.networknt.kafka.common.KafkaProducerConfig;
import com.networknt.kafka.entity.AuditRecord;
import com.networknt.kafka.entity.EmbeddedFormat;
import com.networknt.kafka.entity.PartitionOffset;
import com.networknt.kafka.entity.ProduceRecord;
import com.networknt.kafka.entity.ProduceRequest;
import com.networknt.kafka.entity.ProduceResponse;
import com.networknt.status.Status;
import com.networknt.utility.ObjectUtils;
import com.networknt.utility.UuidUtil;
import io.confluent.kafka.schemaregistry.SchemaProvider;
import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.RestService;
import io.confluent.kafka.schemaregistry.json.JsonSchemaProvider;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider;
import io.confluent.kafka.serializers.subject.TopicNameStrategy;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/networknt/kafka/producer/SidecarProducer.class */
public class SidecarProducer implements NativeLightProducer {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) SidecarProducer.class);
    public static final KafkaProducerConfig config = (KafkaProducerConfig) Config.getInstance().getJsonObjectConfig(KafkaProducerConfig.CONFIG_NAME, KafkaProducerConfig.class);
    public static Map<String, Optional<RegisteredSchema>> schemaCache = new ConcurrentHashMap();
    private static final String FAILED_TO_GET_SCHEMA = "ERR12208";
    private SchemaManager schemaManager;
    private SchemaRecordSerializer schemaRecordSerializer;
    private NoSchemaRecordSerializer noSchemaRecordSerializer;
    public Producer<byte[], byte[]> producer;

    @Override // com.networknt.kafka.producer.LightProducer
    public void open() {
        if (logger.isTraceEnabled()) {
            logger.trace("config properties: {}", config.getProperties());
        }
        this.producer = new KafkaProducer(config.getProperties());
        HashMap hashMap = new HashMap();
        hashMap.putAll(config.getProperties());
        String str = (String) config.getProperties().get("schema.registry.url");
        Object obj = config.getProperties().get("schema.registry.cache");
        int i = 100;
        if (obj != null && (obj instanceof String)) {
            i = Integer.valueOf((String) obj).intValue();
        }
        CachedSchemaRegistryClient cachedSchemaRegistryClient = new CachedSchemaRegistryClient(new RestService((List<String>) Collections.singletonList(str)), i, (List<SchemaProvider>) Arrays.asList(new AvroSchemaProvider(), new JsonSchemaProvider(), new ProtobufSchemaProvider()), hashMap, (Map<String, String>) null);
        this.noSchemaRecordSerializer = new NoSchemaRecordSerializer(new HashMap());
        this.schemaRecordSerializer = new SchemaRecordSerializer(cachedSchemaRegistryClient, hashMap, hashMap, hashMap);
        this.schemaManager = new SchemaManagerImpl(cachedSchemaRegistryClient, new TopicNameStrategy());
        registerModule();
    }

    @Override // com.networknt.kafka.producer.NativeLightProducer
    public Producer getProducer() {
        return this.producer;
    }

    @Override // com.networknt.kafka.producer.LightProducer
    public void close() {
        if (this.producer != null) {
            this.producer.close();
        }
    }

    public final CompletableFuture<ProduceResponse> produceWithSchema(String str, String str2, Optional<Integer> optional, ProduceRequest produceRequest, Headers headers, List<AuditRecord> list, boolean z) {
        long currentTimeMillis = System.currentTimeMillis();
        Optional<RegisteredSchema> empty = Optional.empty();
        if (null != produceRequest.getKeySchemaId() && produceRequest.getKeySchemaId().isPresent()) {
            empty = schemaCache.get(str + "k" + String.valueOf(produceRequest.getKeySchemaId().get()));
        } else if (null != produceRequest.getKeySchemaVersion() && produceRequest.getKeySchemaVersion().isPresent()) {
            empty = (null == produceRequest.getKeySchemaSubject() || !produceRequest.getKeySchemaSubject().isPresent()) ? schemaCache.get(str + "k" + String.valueOf(produceRequest.getKeySchemaVersion().get())) : schemaCache.get(produceRequest.getKeySchemaSubject().get() + String.valueOf(produceRequest.getKeySchemaVersion().get()));
        } else if (z) {
            empty = schemaCache.get(str + "k");
        }
        if (empty == null) {
            empty = Optional.empty();
        }
        if (empty.isEmpty() && produceRequest.getKeyFormat().isPresent() && produceRequest.getKeyFormat().get().requiresSchema()) {
            empty = getSchema(str, produceRequest.getKeyFormat(), produceRequest.getKeySchemaSubject(), produceRequest.getKeySchemaId(), produceRequest.getKeySchemaVersion(), produceRequest.getKeySchema(), true);
            if (empty.isPresent()) {
                if (produceRequest.getKeySchemaId().isPresent()) {
                    schemaCache.put(str + "k" + String.valueOf(produceRequest.getKeySchemaId().get()), empty);
                } else if (produceRequest.getKeySchemaVersion().isPresent()) {
                    if (produceRequest.getKeySchemaSubject().isPresent()) {
                        schemaCache.put(produceRequest.getKeySchemaSubject().get() + String.valueOf(produceRequest.getKeySchemaVersion().get()), empty);
                    } else {
                        schemaCache.put(str + "k" + String.valueOf(produceRequest.getKeySchemaVersion().get()), empty);
                    }
                } else if (z) {
                    schemaCache.put(str + "k", empty);
                } else {
                    logger.error("Could not put key schema into the cache. It means that neither keySchemaId nor keySchemaVersion is supplied and Kafka Schema Registry will be overloaded.");
                }
            }
        }
        Optional<EmbeddedFormat> optional2 = (Optional) empty.map(registeredSchema -> {
            return Optional.of(registeredSchema.getFormat());
        }).orElse(produceRequest.getKeyFormat());
        Optional<RegisteredSchema> empty2 = Optional.empty();
        if (null != produceRequest.getValueSchemaId() && produceRequest.getValueSchemaId().isPresent()) {
            empty2 = schemaCache.get(str + "v" + String.valueOf(produceRequest.getValueSchemaId().get()));
        } else if (null != produceRequest.getValueSchemaVersion() && produceRequest.getValueSchemaVersion().isPresent()) {
            empty2 = (null == produceRequest.getValueSchemaSubject() || !produceRequest.getValueSchemaSubject().isPresent()) ? schemaCache.get(str + "v" + String.valueOf(produceRequest.getValueSchemaVersion().get())) : schemaCache.get(produceRequest.getValueSchemaSubject().get() + String.valueOf(produceRequest.getValueSchemaVersion().get()));
        } else if (z) {
            empty2 = schemaCache.get(str + "v");
        }
        if (empty2 == null) {
            empty2 = Optional.empty();
        }
        if (empty2.isEmpty() && produceRequest.getValueFormat().isPresent() && produceRequest.getValueFormat().get().requiresSchema()) {
            empty2 = getSchema(str, produceRequest.getValueFormat(), produceRequest.getValueSchemaSubject(), produceRequest.getValueSchemaId(), produceRequest.getValueSchemaVersion(), produceRequest.getValueSchema(), false);
            if (empty2.isPresent()) {
                if (produceRequest.getValueSchemaId().isPresent()) {
                    schemaCache.put(str + "v" + String.valueOf(produceRequest.getValueSchemaId().get()), empty2);
                } else if (produceRequest.getValueSchemaVersion().isPresent()) {
                    if (produceRequest.getValueSchemaSubject().isPresent()) {
                        schemaCache.put(produceRequest.getValueSchemaSubject().get() + String.valueOf(produceRequest.getValueSchemaVersion().get()), empty2);
                    } else {
                        schemaCache.put(str + "v" + String.valueOf(produceRequest.getValueSchemaVersion().get()), empty2);
                    }
                } else if (z) {
                    schemaCache.put(str + "v", empty2);
                } else {
                    logger.error("Could not put value schema into the cache. It means that neither valueSchemaId nor valueSchemaVersion is supplied and Kafka Schema Registry will be overloaded.");
                }
            }
        }
        List<SerializedKeyAndValue> serialize = serialize(optional2, (Optional) empty2.map(registeredSchema2 -> {
            return Optional.of(registeredSchema2.getFormat());
        }).orElse(produceRequest.getValueFormat()), str, optional, empty, empty2, produceRequest.getRecords());
        if (logger.isDebugEnabled()) {
            logger.debug("Serializing key and value with schema registry takes " + (System.currentTimeMillis() - currentTimeMillis));
        }
        long currentTimeMillis2 = System.currentTimeMillis();
        List<CompletableFuture<ProduceResult>> doProduce = doProduce(str, str2, serialize, headers, list);
        if (logger.isDebugEnabled()) {
            logger.debug("Producing the entire batch to Kafka takes " + (System.currentTimeMillis() - currentTimeMillis2));
        }
        return produceResultsToResponse(empty, empty2, doProduce);
    }

    public final CompletableFuture<ProduceResponse> produceWithSchema(String str, String str2, Optional<Integer> optional, ProduceRequest produceRequest, Headers headers, List<AuditRecord> list) {
        return produceWithSchema(str, str2, optional, produceRequest, headers, list, false);
    }

    private List<SerializedKeyAndValue> serialize(Optional<EmbeddedFormat> optional, Optional<EmbeddedFormat> optional2, String str, Optional<Integer> optional3, Optional<RegisteredSchema> optional4, Optional<RegisteredSchema> optional5, List<ProduceRecord> list) {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        return (List) list.stream().map(produceRecord -> {
            int andIncrement = atomicInteger.getAndIncrement();
            return new SerializedKeyAndValue((Optional) produceRecord.getPartition().map((v0) -> {
                return Optional.of(v0);
            }).orElse(optional3), produceRecord.getTraceabilityId(), produceRecord.getCorrelationId(), (optional.isPresent() && ((EmbeddedFormat) optional.get()).requiresSchema()) ? this.schemaRecordSerializer.serialize(andIncrement, (EmbeddedFormat) optional.get(), str, optional4, produceRecord.getKey().orElse(NullNode.getInstance()), true) : this.noSchemaRecordSerializer.serialize(andIncrement, (EmbeddedFormat) optional.orElse(EmbeddedFormat.valueOf(config.getKeyFormat().toUpperCase())), produceRecord.getKey().orElse(NullNode.getInstance())), (optional2.isPresent() && ((EmbeddedFormat) optional2.get()).requiresSchema()) ? this.schemaRecordSerializer.serialize(andIncrement, (EmbeddedFormat) optional2.get(), str, optional5, produceRecord.getValue().orElse(NullNode.getInstance()), false) : this.noSchemaRecordSerializer.serialize(andIncrement, (EmbeddedFormat) optional2.orElse(EmbeddedFormat.valueOf(config.getValueFormat().toUpperCase())), produceRecord.getValue().orElse(NullNode.getInstance())), produceRecord.getHeaders(), produceRecord.getTimestamp());
        }).collect(Collectors.toList());
    }

    private Optional<RegisteredSchema> getSchema(String str, Optional<EmbeddedFormat> optional, Optional<String> optional2, Optional<Integer> optional3, Optional<Integer> optional4, Optional<String> optional5, boolean z) {
        try {
            return Optional.of(this.schemaManager.getSchema(str, optional, optional2, Optional.empty(), optional3, optional4, optional5, z));
        } catch (IllegalStateException e) {
            logger.error("IllegalStateException:", (Throwable) e);
            throw new FrameworkException(new Status(FAILED_TO_GET_SCHEMA, new Object[0]), e);
        } catch (RuntimeException e2) {
            return Optional.empty();
        }
    }

    private List<CompletableFuture<ProduceResult>> doProduce(String str, String str2, List<SerializedKeyAndValue> list, Headers headers, List<AuditRecord> list2) {
        return (List) list.stream().map(serializedKeyAndValue -> {
            return produce(str, serializedKeyAndValue.getPartitionId(), serializedKeyAndValue.getTraceabilityId(), serializedKeyAndValue.getCorrelationId().isPresent() ? serializedKeyAndValue.getCorrelationId() : Optional.of(UuidUtil.uuidToBase64(UuidUtil.getUUID())), str2, headers, list2, serializedKeyAndValue.getKey(), serializedKeyAndValue.getValue(), serializedKeyAndValue.getHeaders(), (ObjectUtils.isEmpty(serializedKeyAndValue.getTimestamp()) || !serializedKeyAndValue.getTimestamp().isPresent() || serializedKeyAndValue.getTimestamp().get().longValue() <= 0) ? Instant.now() : Instant.ofEpochMilli(serializedKeyAndValue.getTimestamp().get().longValue()));
        }).collect(Collectors.toList());
    }

    public CompletableFuture<ProduceResult> produce(String str, Optional<Integer> optional, Optional<String> optional2, Optional<String> optional3, String str2, Headers headers, List<AuditRecord> list, Optional<ByteString> optional4, Optional<ByteString> optional5, Optional<Map<String, String>> optional6, Instant instant) {
        RecordHeaders recordHeaders = new RecordHeaders();
        headers.forEach(header -> {
            if (header.key().equalsIgnoreCase("X-Traceability-Id") || header.key().equalsIgnoreCase("X-Correlation-Id")) {
                return;
            }
            recordHeaders.add(header);
        });
        if (optional2.isPresent()) {
            recordHeaders.add("X-Traceability-Id", optional2.get().getBytes(StandardCharsets.UTF_8));
        }
        if (optional3.isPresent()) {
            recordHeaders.add("X-Correlation-Id", optional3.get().getBytes(StandardCharsets.UTF_8));
        }
        if (optional6.isPresent() && !optional6.get().isEmpty()) {
            optional6.get().entrySet().removeIf(entry -> {
                return ((String) entry.getKey()).equalsIgnoreCase("X-Traceability-Id") || ((String) entry.getKey()).equalsIgnoreCase("X-Correlation-Id");
            });
            optional6.get().entrySet().forEach(entry2 -> {
                recordHeaders.add((String) entry2.getKey(), ((String) entry2.getValue()).getBytes(StandardCharsets.UTF_8));
            });
        }
        if (optional2.isPresent()) {
            logger.info("Associate traceability Id " + optional2.get() + " with correlation Id " + optional3.get());
        }
        CompletableFuture<ProduceResult> completableFuture = new CompletableFuture<>();
        this.producer.send(new ProducerRecord<>(str, optional.orElse(null), Long.valueOf(instant.toEpochMilli()), (byte[]) optional4.map((v0) -> {
            return v0.toByteArray();
        }).orElse(null), (byte[]) optional5.map((v0) -> {
            return v0.toByteArray();
        }).orElse(null), recordHeaders), (recordMetadata, exc) -> {
            if (exc != null) {
                if (config.isAuditEnabled()) {
                    synchronized (list) {
                        list.add(auditFromRecordMetadata(null, str, exc, str2, optional4, optional2, optional3, false));
                    }
                }
                completableFuture.completeExceptionally(exc);
                return;
            }
            if (config.isAuditEnabled()) {
                synchronized (list) {
                    list.add(auditFromRecordMetadata(recordMetadata, str, null, str2, optional4, optional2, optional3, true));
                }
            }
            completableFuture.complete(ProduceResult.fromRecordMetadata(recordMetadata));
        });
        return completableFuture;
    }

    private static CompletableFuture<ProduceResponse> produceResultsToResponse(Optional<RegisteredSchema> optional, Optional<RegisteredSchema> optional2, List<CompletableFuture<ProduceResult>> list) {
        return CompletableFutures.allAsList((List) list.stream().map(completableFuture -> {
            return completableFuture.thenApply(produceResult -> {
                return new PartitionOffset(Integer.valueOf(produceResult.getPartitionId()), Long.valueOf(produceResult.getOffset()), null, null);
            });
        }).map(completableFuture2 -> {
            return completableFuture2.exceptionally(th -> {
                return new PartitionOffset(null, null, Integer.valueOf(errorCodeFromProducerException(th.getCause())), th.getCause().getMessage());
            });
        }).collect(Collectors.toList())).thenApply(list2 -> {
            return new ProduceResponse(list2, (Integer) optional.map((v0) -> {
                return v0.getSchemaId();
            }).orElse(null), (Integer) optional2.map((v0) -> {
                return v0.getSchemaId();
            }).orElse(null));
        });
    }

    private static int errorCodeFromProducerException(Throwable th) {
        if (th instanceof AuthenticationException) {
            return ProduceResponse.KAFKA_AUTHENTICATION_ERROR_CODE;
        }
        if (th instanceof AuthorizationException) {
            return ProduceResponse.KAFKA_AUTHORIZATION_ERROR_CODE;
        }
        if (th instanceof RetriableException) {
            return ProduceResponse.KAFKA_RETRIABLE_ERROR_ERROR_CODE;
        }
        if (th instanceof KafkaException) {
            return ProduceResponse.KAFKA_ERROR_ERROR_CODE;
        }
        logger.error("Unexpected Producer Exception", th);
        throw new RuntimeException("Unexpected Producer Exception", th);
    }

    protected AuditRecord auditFromRecordMetadata(RecordMetadata recordMetadata, String str, Exception exc, String str2, Optional<ByteString> optional, Optional<String> optional2, Optional<String> optional3, boolean z) {
        AuditRecord auditRecord = new AuditRecord();
        auditRecord.setTopic(str);
        auditRecord.setId(UUID.randomUUID().toString());
        auditRecord.setServiceId(str2);
        auditRecord.setAuditType(AuditRecord.AuditType.PRODUCER);
        if (recordMetadata != null) {
            auditRecord.setPartition(recordMetadata.partition());
            auditRecord.setOffset(recordMetadata.offset());
        } else {
            StringWriter stringWriter = new StringWriter();
            exc.printStackTrace(new PrintWriter(stringWriter));
            auditRecord.setStacktrace(stringWriter.toString());
        }
        if (optional3.isPresent()) {
            auditRecord.setCorrelationId(optional3.get());
        }
        if (optional2.isPresent()) {
            auditRecord.setTraceabilityId(optional2.get());
        }
        auditRecord.setAuditStatus(z ? AuditRecord.AuditStatus.SUCCESS : AuditRecord.AuditStatus.FAILURE);
        auditRecord.setTimestamp(Long.valueOf(System.currentTimeMillis()));
        if (optional.isPresent()) {
            auditRecord.setKey(optional.get().toString(StandardCharsets.UTF_8));
        }
        return auditRecord;
    }
}
