package com.networknt.kafka.producer;

import com.networknt.config.Config;
import com.networknt.httpstring.HttpStringConstants;
import com.networknt.kafka.common.FlinkKafkaProducer;
import com.networknt.kafka.common.KafkaProducerConfig;
import com.networknt.kafka.common.TransactionalKafkaException;
import com.networknt.server.ServerConfig;
import io.undertow.server.HttpServerExchange;
import java.nio.charset.StandardCharsets;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.InvalidTxnStateException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.header.Headers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/networknt/kafka/producer/TransactionalProducer.class */
public class TransactionalProducer implements Runnable, QueuedLightProducer {
    static String callerId;
    static final String topic;
    private KafkaTransactionState currentTransaction;
    private transient Callback callback;
    private volatile transient Exception asyncException;
    private volatile long transactionTimeout;
    private static final Logger logger = LoggerFactory.getLogger(TransactionalProducer.class);
    static final KafkaProducerConfig config = (KafkaProducerConfig) Config.getInstance().getJsonObjectConfig("kafka-producer", KafkaProducerConfig.class);
    private BlockingQueue<ProducerRecord<byte[], byte[]>> txQueue = new LinkedBlockingQueue();
    private final BlockingDeque<String> availableTransactionalIds = new LinkedBlockingDeque();
    private final AtomicLong pendingRecords = new AtomicLong();
    private final AtomicBoolean stopped = new AtomicBoolean(false);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/networknt/kafka/producer/TransactionalProducer$KafkaTransactionState.class */
    public static class KafkaTransactionState {
        private final transient FlinkKafkaProducer<byte[], byte[]> producer;
        final String transactionalId;
        final long producerId;
        final short epoch;

        KafkaTransactionState(String str, FlinkKafkaProducer<byte[], byte[]> flinkKafkaProducer) {
            this(str, flinkKafkaProducer.getProducerId(), flinkKafkaProducer.getEpoch(), flinkKafkaProducer);
        }

        KafkaTransactionState(FlinkKafkaProducer<byte[], byte[]> flinkKafkaProducer) {
            this(null, -1L, (short) -1, flinkKafkaProducer);
        }

        KafkaTransactionState(String str, long j, short s, FlinkKafkaProducer<byte[], byte[]> flinkKafkaProducer) {
            this.transactionalId = str;
            this.producerId = j;
            this.epoch = s;
            this.producer = flinkKafkaProducer;
        }

        public String toString() {
            return String.format("%s [transactionalId=%s, producerId=%s, epoch=%s]", getClass().getSimpleName(), this.transactionalId, Long.valueOf(this.producerId), Short.valueOf(this.epoch));
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            KafkaTransactionState kafkaTransactionState = (KafkaTransactionState) obj;
            if (this.producerId == kafkaTransactionState.producerId && this.epoch == kafkaTransactionState.epoch) {
                return this.transactionalId != null ? this.transactionalId.equals(kafkaTransactionState.transactionalId) : kafkaTransactionState.transactionalId == null;
            }
            return false;
        }

        public int hashCode() {
            return (31 * ((31 * (this.transactionalId != null ? this.transactionalId.hashCode() : 0)) + ((int) (this.producerId ^ (this.producerId >>> 32))))) + this.epoch;
        }
    }

    /* loaded from: input_file:com/networknt/kafka/producer/TransactionalProducer$NextTransactionalIdHint.class */
    public static class NextTransactionalIdHint {
        public int lastParallelism;
        public long nextFreeTransactionalId;

        public NextTransactionalIdHint() {
            this(0, 0L);
        }

        public NextTransactionalIdHint(int i, long j) {
            this.lastParallelism = 0;
            this.nextFreeTransactionalId = 0L;
            this.lastParallelism = i;
            this.nextFreeTransactionalId = j;
        }
    }

    /* loaded from: input_file:com/networknt/kafka/producer/TransactionalProducer$TransactionHolder.class */
    public static final class TransactionHolder<KafkaTransactionState> {
        private final KafkaTransactionState handle;
        private final long transactionStartTime;

        public TransactionHolder(KafkaTransactionState kafkaTransactionState, long j) {
            this.handle = kafkaTransactionState;
            this.transactionStartTime = j;
        }

        long elapsedTime(Clock clock) {
            return clock.millis() - this.transactionStartTime;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            TransactionHolder transactionHolder = (TransactionHolder) obj;
            if (this.transactionStartTime != transactionHolder.transactionStartTime) {
                return false;
            }
            return this.handle != null ? this.handle.equals(transactionHolder.handle) : transactionHolder.handle == null;
        }

        public int hashCode() {
            return (31 * (this.handle != null ? this.handle.hashCode() : 0)) + ((int) (this.transactionStartTime ^ (this.transactionStartTime >>> 32)));
        }

        public String toString() {
            return "TransactionHolder{handle=" + String.valueOf(this.handle) + ", transactionStartTime=" + this.transactionStartTime + "}";
        }
    }

    @Override // com.networknt.kafka.producer.QueuedLightProducer
    public BlockingQueue<ProducerRecord<byte[], byte[]>> getTxQueue() {
        return this.txQueue;
    }

    public TransactionalProducer() {
        logger.info("The TransactionalProducer is created");
    }

    @Override // java.lang.Runnable
    public void run() {
        while (!this.stopped.get()) {
            try {
                this.currentTransaction = beginTransaction();
                ArrayList arrayList = new ArrayList();
                while (!this.stopped.get()) {
                    int drain = drain(this.txQueue, arrayList, 5000, 1000L, TimeUnit.MILLISECONDS);
                    if (logger.isTraceEnabled() && drain > 0) {
                        logger.trace("drained transactions = " + drain);
                    }
                    for (int i = 0; i < drain; i++) {
                        invoke(this.currentTransaction, topic, (ProducerRecord) arrayList.get(i));
                    }
                    if (drain > 0) {
                        break;
                    }
                }
                long producerId = this.currentTransaction.producer.getProducerId();
                this.currentTransaction.producer.getEpoch();
                if (logger.isDebugEnabled()) {
                    Logger logger2 = logger;
                    logger2.debug("producerId = " + producerId + " epoch = " + logger2);
                }
                commit(this.currentTransaction);
            } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
                logger.error("One of the three exceptions", e);
                try {
                    close();
                } catch (Exception e2) {
                    e2.printStackTrace();
                }
            } catch (TransactionalKafkaException e3) {
                logger.error("TransactionalKafkaException", e3);
                abort(this.currentTransaction);
            } catch (InterruptedException e4) {
                logger.error("InterruptedException", e4);
            } catch (KafkaException e5) {
                logger.error("KafkaException", e5);
                abort(this.currentTransaction);
                if (e5 instanceof ConfigException) {
                    throw new RuntimeException("Kafka is down!");
                }
            }
        }
    }

    public static <E> int drain(BlockingQueue<E> blockingQueue, Collection<? super E> collection, int i, long j, TimeUnit timeUnit) throws InterruptedException {
        long nanoTime = System.nanoTime() + timeUnit.toNanos(j);
        int i2 = 0;
        while (i2 < i) {
            i2 += blockingQueue.drainTo(collection, i - i2);
            if (i2 < i) {
                E poll = blockingQueue.poll(nanoTime - System.nanoTime(), TimeUnit.NANOSECONDS);
                if (poll == null) {
                    break;
                }
                collection.add(poll);
                i2++;
            }
        }
        return i2;
    }

    public void invoke(KafkaTransactionState kafkaTransactionState, String str, ProducerRecord<byte[], byte[]> producerRecord) throws TransactionalKafkaException {
        this.pendingRecords.incrementAndGet();
        kafkaTransactionState.producer.send(producerRecord, this.callback);
    }

    @Override // com.networknt.kafka.producer.QueuedLightProducer, com.networknt.kafka.producer.LightProducer
    public void open() {
        this.callback = new Callback() { // from class: com.networknt.kafka.producer.TransactionalProducer.1
            public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
                if (exc != null) {
                    TransactionalProducer.logger.error("Error while sending record to Kafka: " + exc.getMessage(), exc);
                }
                TransactionalProducer.this.acknowledgeMessage();
            }
        };
    }

    protected void preCommit(KafkaTransactionState kafkaTransactionState) throws TransactionalKafkaException {
        flush(kafkaTransactionState);
    }

    protected void commit(KafkaTransactionState kafkaTransactionState) {
        kafkaTransactionState.producer.commitTransaction();
        recycleTransactionalProducer(kafkaTransactionState.producer);
    }

    protected void recoverAndCommit(KafkaTransactionState kafkaTransactionState) {
        try {
            FlinkKafkaProducer<byte[], byte[]> initTransactionalProducer = initTransactionalProducer(kafkaTransactionState.transactionalId);
            try {
                initTransactionalProducer.resumeTransaction(kafkaTransactionState.producerId, kafkaTransactionState.epoch);
                initTransactionalProducer.commitTransaction();
                if (initTransactionalProducer != null) {
                    initTransactionalProducer.close();
                }
            } finally {
            }
        } catch (InvalidTxnStateException | ProducerFencedException e) {
            logger.warn("Encountered error {} while recovering transaction {}. Presumably this transaction has been already committed before", e, kafkaTransactionState);
        }
    }

    protected void abort(KafkaTransactionState kafkaTransactionState) {
        if (kafkaTransactionState != null) {
            kafkaTransactionState.producer.abortTransaction();
            recycleTransactionalProducer(kafkaTransactionState.producer);
        }
    }

    private void recycleTransactionalProducer(FlinkKafkaProducer<byte[], byte[]> flinkKafkaProducer) {
        this.availableTransactionalIds.add(flinkKafkaProducer.getTransactionalId());
        flinkKafkaProducer.close();
    }

    protected void recoverAndAbort(KafkaTransactionState kafkaTransactionState) {
        FlinkKafkaProducer<byte[], byte[]> initTransactionalProducer = initTransactionalProducer(kafkaTransactionState.transactionalId);
        try {
            initTransactionalProducer.initTransactions();
            if (initTransactionalProducer != null) {
                initTransactionalProducer.close();
            }
        } catch (Throwable th) {
            if (initTransactionalProducer != null) {
                try {
                    initTransactionalProducer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private void acknowledgeMessage() {
        this.pendingRecords.decrementAndGet();
    }

    @Override // com.networknt.kafka.producer.LightProducer
    public void close() {
        KafkaTransactionState currentTransaction = currentTransaction();
        if (currentTransaction != null) {
            flush(currentTransaction);
            commit(currentTransaction);
        }
        this.stopped.getAndSet(true);
    }

    private void flush(KafkaTransactionState kafkaTransactionState) {
        if (kafkaTransactionState.producer != null) {
            kafkaTransactionState.producer.flush();
        }
        long j = this.pendingRecords.get();
        if (j != 0) {
            throw new IllegalStateException("Pending record count must be zero at this point: " + j);
        }
    }

    public KafkaTransactionState currentTransaction() {
        return this.currentTransaction;
    }

    public KafkaTransactionState beginTransaction() throws TransactionalKafkaException {
        FlinkKafkaProducer<byte[], byte[]> createTransactionalProducer = createTransactionalProducer();
        createTransactionalProducer.beginTransaction();
        return new KafkaTransactionState(createTransactionalProducer.getTransactionalId(), createTransactionalProducer);
    }

    private FlinkKafkaProducer<byte[], byte[]> createTransactionalProducer() throws TransactionalKafkaException {
        FlinkKafkaProducer<byte[], byte[]> initTransactionalProducer = initTransactionalProducer((String) config.getProperties().get("transactional.id"));
        initTransactionalProducer.initTransactions();
        return initTransactionalProducer;
    }

    private FlinkKafkaProducer<byte[], byte[]> initTransactionalProducer(String str) {
        config.getProperties().put("transactional.id", str);
        return initProducer();
    }

    private FlinkKafkaProducer<byte[], byte[]> initProducer() {
        FlinkKafkaProducer<byte[], byte[]> flinkKafkaProducer = new FlinkKafkaProducer<>(config.getProperties());
        logger.info("Starting FlinkKafkaProducer");
        return flinkKafkaProducer;
    }

    @Override // com.networknt.kafka.producer.QueuedLightProducer
    public void propagateHeaders(ProducerRecord producerRecord, HttpServerExchange httpServerExchange) {
        Headers headers = producerRecord.headers();
        headers.add("authorization", httpServerExchange.getRequestHeaders().getFirst("authorization").getBytes(StandardCharsets.UTF_8));
        headers.add("X-Correlation-Id", httpServerExchange.getRequestHeaders().getFirst(HttpStringConstants.CORRELATION_ID).getBytes(StandardCharsets.UTF_8));
        String first = httpServerExchange.getRequestHeaders().getFirst(HttpStringConstants.TRACEABILITY_ID);
        if (first != null) {
            headers.add("X-Traceability-Id", first.getBytes(StandardCharsets.UTF_8));
        }
        if (config.isInjectCallerId()) {
            headers.add("caller_id", callerId.getBytes(StandardCharsets.UTF_8));
        }
    }

    public static int addressToPartition(String str) {
        return Integer.valueOf(str.substring(0, 4)).intValue();
    }

    static {
        ServerConfig serverConfig;
        callerId = "unknown";
        if (config.isInjectCallerId() && (serverConfig = ServerConfig.getInstance()) != null) {
            callerId = serverConfig.getServiceId();
        }
        topic = config.getTopic();
    }
}
