package io.helidon.messaging.connectors.kafka;

import io.helidon.common.context.Context;
import io.helidon.common.context.Contexts;
import io.helidon.common.reactive.EmittingPublisher;
import io.helidon.config.Config;
import java.lang.System;
import java.time.Duration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.reactivestreams.FlowAdapters;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;

/* loaded from: input_file:io/helidon/messaging/connectors/kafka/KafkaPublisher.class */
public class KafkaPublisher<K, V> implements Publisher<KafkaMessage<K, V>> {
    private static final System.Logger LOGGER = System.getLogger(KafkaPublisher.class.getName());
    private static final String POLL_TIMEOUT = "poll.timeout";
    private static final String PERIOD_EXECUTIONS = "period.executions";
    private static final String ENABLE_AUTOCOMMIT = "enable.auto.commit";
    private static final String ACK_TIMEOUT = "ack.timeout.millis";
    private static final String LIMIT_NO_ACK = "limit.no.ack";
    private final ScheduledExecutorService scheduler;
    private final List<String> topics;
    private final Pattern topicPattern;
    private final long periodExecutions;
    private final long pollTimeout;
    private final boolean autoCommit;
    private final long ackTimeout;
    private final int limitNoAck;
    private final Supplier<Consumer<K, V>> consumerSupplier;
    private final Config config;
    private Consumer<K, V> kafkaConsumer;
    private volatile boolean stopped;
    private final Lock taskLock = new ReentrantLock();
    private final Queue<ConsumerRecord<K, V>> backPressureBuffer = new LinkedList();
    private final Map<TopicPartition, List<KafkaConsumerMessage<K, V>>> pendingCommits = new HashMap();
    private final PartitionsAssignedLatch partitionsAssignedLatch = new PartitionsAssignedLatch();
    private final AtomicLong requests = new AtomicLong();
    private final EmittingPublisher<KafkaMessage<K, V>> emitter = EmittingPublisher.create();

    /* loaded from: input_file:io/helidon/messaging/connectors/kafka/KafkaPublisher$Builder.class */
    public static final class Builder<K, V> implements io.helidon.common.Builder<Builder<K, V>, KafkaPublisher<K, V>> {
        private Boolean autoCommit;
        private List<String> topics;
        private Pattern topicPattern;
        private ScheduledExecutorService scheduler;
        private Supplier<Consumer<K, V>> consumerSupplier;
        private Config config;
        private long pollTimeout = 50;
        private long periodExecutions = 100;
        private long ackTimeout = Long.MAX_VALUE;
        private int limitNoAck = Integer.MAX_VALUE;

        private Builder() {
        }

        public Builder<K, V> config(Config config) {
            this.config = config;
            KafkaConfig create = KafkaConfig.create(config);
            consumerSupplier(() -> {
                return new KafkaConsumer(create.asMap());
            });
            topics(create.topics());
            create.topicPattern().ifPresent(this::topicPattern);
            config.get(KafkaPublisher.POLL_TIMEOUT).asLong().ifPresent((v1) -> {
                pollTimeout(v1);
            });
            config.get(KafkaPublisher.PERIOD_EXECUTIONS).asLong().ifPresent((v1) -> {
                periodExecutions(v1);
            });
            config.get(KafkaPublisher.ENABLE_AUTOCOMMIT).asBoolean().ifPresent((v1) -> {
                autoCommit(v1);
            });
            config.get(KafkaPublisher.ACK_TIMEOUT).asLong().ifPresent((v1) -> {
                ackTimeout(v1);
            });
            config.get(KafkaPublisher.LIMIT_NO_ACK).asInt().ifPresent((v1) -> {
                limitNoAck(v1);
            });
            return this;
        }

        public Builder<K, V> consumerSupplier(Supplier<Consumer<K, V>> supplier) {
            this.consumerSupplier = supplier;
            return this;
        }

        public Builder<K, V> topics(List<String> list) {
            this.topics = list;
            return this;
        }

        public Builder<K, V> topicPattern(Pattern pattern) {
            this.topicPattern = pattern;
            return this;
        }

        public Builder<K, V> scheduler(ScheduledExecutorService scheduledExecutorService) {
            this.scheduler = scheduledExecutorService;
            return this;
        }

        public Builder<K, V> periodExecutions(long j) {
            this.periodExecutions = j;
            return this;
        }

        public Builder<K, V> pollTimeout(long j) {
            this.pollTimeout = j;
            return this;
        }

        public Builder<K, V> autoCommit(boolean z) {
            this.autoCommit = Boolean.valueOf(z);
            return this;
        }

        public Builder<K, V> ackTimeout(long j) {
            this.ackTimeout = j;
            return this;
        }

        public Builder<K, V> limitNoAck(int i) {
            this.limitNoAck = i;
            return this;
        }

        /* renamed from: build, reason: merged with bridge method [inline-methods] */
        public KafkaPublisher<K, V> m6build() {
            if (Objects.isNull(this.topicPattern) && (Objects.isNull(this.topics) || this.topics.isEmpty())) {
                throw new IllegalArgumentException("The topic is a required value");
            }
            if (Objects.isNull(this.autoCommit)) {
                throw new IllegalArgumentException(String.format("The autoCommit is a required value and be equals to KafkaProperty %s", KafkaPublisher.ENABLE_AUTOCOMMIT));
            }
            if (Objects.isNull(this.scheduler)) {
                throw new IllegalArgumentException("The scheduler is a required value");
            }
            if (Objects.isNull(this.consumerSupplier)) {
                throw new IllegalArgumentException("The kafkaConsumerSupplier is a required value");
            }
            if (Objects.isNull(this.config)) {
                this.config = Config.empty();
            }
            return new KafkaPublisher<>(this.scheduler, this.consumerSupplier, this.topics, this.topicPattern, this.pollTimeout, this.periodExecutions, this.autoCommit.booleanValue(), this.ackTimeout, this.limitNoAck, this.config);
        }
    }

    private KafkaPublisher(ScheduledExecutorService scheduledExecutorService, Supplier<Consumer<K, V>> supplier, List<String> list, Pattern pattern, long j, long j2, boolean z, long j3, int i, Config config) {
        this.scheduler = scheduledExecutorService;
        this.topics = list;
        this.topicPattern = pattern;
        this.periodExecutions = j2;
        this.pollTimeout = j;
        this.autoCommit = z;
        this.ackTimeout = j3;
        this.limitNoAck = i;
        this.consumerSupplier = supplier;
        this.config = config;
        this.emitter.onRequest((l, l2) -> {
            this.requests.updateAndGet(j4 -> {
                if (Long.MAX_VALUE - j4 > l.longValue()) {
                    return l.longValue() + j4;
                }
                return Long.MAX_VALUE;
            });
        });
    }

    private void start() {
        LOGGER.log(System.Logger.Level.DEBUG, () -> {
            return String.format("%s Start to consume", this.topics);
        });
        try {
            this.kafkaConsumer = this.consumerSupplier.get();
            if (this.topicPattern != null) {
                this.kafkaConsumer.subscribe(this.topicPattern, this.partitionsAssignedLatch);
            } else {
                this.kafkaConsumer.subscribe(this.topics, this.partitionsAssignedLatch);
            }
            KafkaNackHandler create = KafkaNackHandler.create(this.emitter, this.config);
            this.scheduler.scheduleAtFixedRate(() -> {
                try {
                    try {
                        this.taskLock.lock();
                        if (!this.scheduler.isShutdown() && !this.emitter.isCompleted() && !this.emitter.isFailed()) {
                            int currentNoAck = currentNoAck();
                            if (currentNoAck >= this.limitNoAck) {
                                throw new IllegalStateException(String.format("Current pending %s acks has overflown the limit of %s ", Integer.valueOf(currentNoAck), Integer.valueOf(this.limitNoAck)));
                            }
                            if (this.backPressureBuffer.isEmpty()) {
                                try {
                                    ConsumerRecords poll = this.kafkaConsumer.poll(Duration.ofMillis(this.pollTimeout));
                                    Queue<ConsumerRecord<K, V>> queue = this.backPressureBuffer;
                                    Objects.requireNonNull(queue);
                                    poll.forEach((v1) -> {
                                        r1.add(v1);
                                    });
                                    if (!this.backPressureBuffer.isEmpty()) {
                                        LOGGER.log(System.Logger.Level.DEBUG, () -> {
                                            return String.format("%s Poll: %s", this.topics, this.backPressureBuffer);
                                        });
                                    }
                                } catch (WakeupException e) {
                                    LOGGER.log(System.Logger.Level.DEBUG, () -> {
                                        return String.format("%s It was requested to stop polling from channel", this.topics);
                                    });
                                }
                            } else {
                                long j = this.requests.get();
                                long min = Math.min(j, this.backPressureBuffer.size());
                                if (min > 0) {
                                    LOGGER.log(System.Logger.Level.DEBUG, () -> {
                                        return String.format("%s %s messages to emit. %s in buffer and %s requested", this.topics, Long.valueOf(min), Integer.valueOf(this.backPressureBuffer.size()), Long.valueOf(j));
                                    });
                                }
                                for (long j2 = 0; j2 < min; j2++) {
                                    ConsumerRecord<K, V> poll2 = this.backPressureBuffer.poll();
                                    CompletableFuture completableFuture = new CompletableFuture();
                                    KafkaConsumerMessage<K, V> kafkaConsumerMessage = new KafkaConsumerMessage<>(poll2, completableFuture, create, this.ackTimeout);
                                    if (this.autoCommit) {
                                        completableFuture.complete(null);
                                    } else {
                                        this.pendingCommits.computeIfAbsent(new TopicPartition(poll2.topic(), poll2.partition()), topicPartition -> {
                                            return new LinkedList();
                                        }).add(kafkaConsumerMessage);
                                    }
                                    this.requests.decrementAndGet();
                                    runInNewContext(() -> {
                                        this.emitter.emit(kafkaConsumerMessage);
                                    });
                                }
                            }
                        }
                        cleanResourcesIfTerminated(this.emitter.isCompleted() || this.emitter.isFailed());
                        if (!this.stopped && !this.autoCommit) {
                            processACK();
                        }
                        this.taskLock.unlock();
                    } catch (Exception e2) {
                        LOGGER.log(System.Logger.Level.ERROR, "KafkaPublisher " + String.valueOf(this.topics) + " failed", e2);
                        this.emitter.fail(e2);
                        this.taskLock.unlock();
                    }
                } catch (Throwable th) {
                    this.taskLock.unlock();
                    throw th;
                }
            }, 0L, this.periodExecutions, TimeUnit.MILLISECONDS);
        } catch (RuntimeException e) {
            this.partitionsAssignedLatch.countDown();
            this.scheduler.execute(() -> {
                this.emitter.fail(e);
            });
        }
    }

    private int currentNoAck() {
        return ((Integer) this.pendingCommits.values().stream().map((v0) -> {
            return v0.size();
        }).reduce((v0, v1) -> {
            return Integer.sum(v0, v1);
        }).orElse(0)).intValue();
    }

    private void processACK() {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        LinkedList linkedList = new LinkedList();
        for (Map.Entry<TopicPartition, List<KafkaConsumerMessage<K, V>>> entry : this.pendingCommits.entrySet()) {
            Iterator<KafkaConsumerMessage<K, V>> it = entry.getValue().iterator();
            KafkaConsumerMessage<K, V> kafkaConsumerMessage = null;
            while (it.hasNext()) {
                KafkaConsumerMessage<K, V> next = it.next();
                if (!next.isAck()) {
                    break;
                }
                linkedList.add(next);
                kafkaConsumerMessage = next;
                it.remove();
            }
            if (kafkaConsumerMessage != null) {
                OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(kafkaConsumerMessage.getOffset().get().longValue() + 1);
                LOGGER.log(System.Logger.Level.DEBUG, () -> {
                    return String.format("%s Will commit %s %s", this.topics, entry.getKey(), offsetAndMetadata);
                });
                linkedHashMap.put(entry.getKey(), offsetAndMetadata);
            }
        }
        if (linkedList.isEmpty()) {
            return;
        }
        LOGGER.log(System.Logger.Level.DEBUG, () -> {
            return String.format("%s Offsets %s", this.topics, linkedHashMap);
        });
        try {
            this.kafkaConsumer.commitSync(linkedHashMap);
            linkedList.forEach(kafkaConsumerMessage2 -> {
                kafkaConsumerMessage2.kafkaCommit().complete(null);
            });
        } catch (RuntimeException e) {
            LOGGER.log(System.Logger.Level.ERROR, "Unable to commit in Kafka " + String.valueOf(linkedHashMap), e);
            linkedList.forEach(kafkaConsumerMessage3 -> {
                kafkaConsumerMessage3.kafkaCommit().completeExceptionally(e);
            });
        }
    }

    public void stop() {
        LOGGER.log(System.Logger.Level.DEBUG, () -> {
            return String.format("%s Requested to stop", this.topics);
        });
        if (this.kafkaConsumer != null) {
            this.kafkaConsumer.wakeup();
            try {
                this.taskLock.lock();
                cleanResourcesIfTerminated(true);
                LOGGER.log(System.Logger.Level.DEBUG, () -> {
                    return String.format("%s Buffered events that were not processed %s", this.topics, this.backPressureBuffer);
                });
                this.emitter.complete();
            } catch (RuntimeException e) {
                this.emitter.fail(e);
            } finally {
                this.taskLock.unlock();
            }
            LOGGER.log(System.Logger.Level.DEBUG, () -> {
                return "Closed";
            });
        }
    }

    private void cleanResourcesIfTerminated(boolean z) {
        if (this.stopped || !z) {
            return;
        }
        this.stopped = true;
        LOGGER.log(System.Logger.Level.DEBUG, () -> {
            return String.format("%s Pending ACKs: %s", this.topics, Integer.valueOf(this.pendingCommits.size()));
        });
        this.pendingCommits.values().stream().flatMap((v0) -> {
            return v0.stream();
        }).forEach(kafkaConsumerMessage -> {
            kafkaConsumerMessage.kafkaCommit().completeExceptionally(new TimeoutException(String.valueOf(this.topics) + " Aborted because KafkaPublisher is terminated"));
        });
        this.kafkaConsumer.close();
    }

    protected void runInNewContext(Runnable runnable) {
        Context.Builder id = Context.builder().id(String.format("kafka-message-%s:", UUID.randomUUID()));
        Optional context = Contexts.context();
        Objects.requireNonNull(id);
        context.ifPresent(id::parent);
        Contexts.runInContext(id.build(), runnable);
    }

    public void subscribe(Subscriber<? super KafkaMessage<K, V>> subscriber) {
        this.emitter.subscribe(FlowAdapters.toFlowSubscriber(subscriber));
        start();
    }

    void waitForPartitionAssigment(long j, TimeUnit timeUnit) throws InterruptedException, TimeoutException {
        if (!this.partitionsAssignedLatch.await(j, timeUnit)) {
            throw new TimeoutException("Timeout for subscription reached");
        }
    }

    List<String> topics() {
        return this.topics;
    }

    public static <K, V> Builder<K, V> builder() {
        return new Builder<>();
    }

    public static <K, V> KafkaPublisher<K, V> create(Config config) {
        return builder().config(config).m6build();
    }
}
