package io.helidon.messaging.connectors.kafka;

import io.helidon.config.Config;
import java.lang.System;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

/* loaded from: input_file:io/helidon/messaging/connectors/kafka/KafkaSubscriber.class */
public class KafkaSubscriber<K, V> implements Subscriber<Message<V>> {
    private static final System.Logger LOGGER = System.getLogger(KafkaSubscriber.class.getName());
    private static final String BACKPRESSURE_SIZE_KEY = "backpressure.size";
    private final long backpressure;
    private final Supplier<Producer<K, V>> producerSupplier;
    private final List<String> topics;
    private final AtomicLong backpressureCounter = new AtomicLong();
    private Subscription subscription;
    private Producer<K, V> kafkaProducer;

    /* loaded from: input_file:io/helidon/messaging/connectors/kafka/KafkaSubscriber$Builder.class */
    public static final class Builder<K, V> implements io.helidon.common.Builder<Builder<K, V>, KafkaSubscriber<K, V>> {
        private Supplier<Producer<K, V>> producerSupplier;
        private List<String> topics;
        private long backpressure = 5;

        private Builder() {
        }

        /* renamed from: build, reason: merged with bridge method [inline-methods] */
        public KafkaSubscriber<K, V> m8build() {
            if (Objects.isNull(this.topics) || this.topics.isEmpty()) {
                throw new IllegalArgumentException("The topic is a required value");
            }
            if (Objects.isNull(this.producerSupplier)) {
                throw new IllegalArgumentException("The producerSupplier is a required value");
            }
            return new KafkaSubscriber<>(this.producerSupplier, this.topics, this.backpressure);
        }

        public Builder<K, V> config(Config config) {
            KafkaConfig create = KafkaConfig.create(config);
            producerSupplier(() -> {
                return new KafkaProducer(create.asMap());
            });
            topics(create.topics());
            config.get(KafkaSubscriber.BACKPRESSURE_SIZE_KEY).asLong().ifPresent((v1) -> {
                backpressure(v1);
            });
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Builder<K, V> config(KafkaConfig kafkaConfig) {
            producerSupplier(() -> {
                return new KafkaProducer(kafkaConfig.asMap());
            });
            topics(kafkaConfig.topics());
            kafkaConfig.get(KafkaSubscriber.BACKPRESSURE_SIZE_KEY).map(String::valueOf).map(Long::valueOf).ifPresent((v1) -> {
                backpressure(v1);
            });
            return this;
        }

        public Builder<K, V> producerSupplier(Supplier<Producer<K, V>> supplier) {
            this.producerSupplier = supplier;
            return this;
        }

        public Builder<K, V> backpressure(long j) {
            this.backpressure = j;
            return this;
        }

        public Builder<K, V> topics(List<String> list) {
            this.topics = list;
            return this;
        }
    }

    private KafkaSubscriber(Supplier<Producer<K, V>> supplier, List<String> list, long j) {
        this.backpressure = j;
        this.producerSupplier = supplier;
        this.topics = list;
    }

    public void onSubscribe(Subscription subscription) {
        try {
            if (this.subscription == null) {
                this.kafkaProducer = this.producerSupplier.get();
                this.subscription = subscription;
                this.subscription.request(this.backpressure);
            } else {
                subscription.cancel();
            }
        } catch (RuntimeException e) {
            LOGGER.log(System.Logger.Level.ERROR, "Cannot start the Kafka producer", e);
            subscription.cancel();
        }
    }

    public void onNext(Message<V> message) {
        ProducerRecord producerRecord;
        Objects.requireNonNull(message);
        ArrayList arrayList = new ArrayList(this.topics.size());
        for (String str : this.topics) {
            CompletableFuture completableFuture = new CompletableFuture();
            arrayList.add(completableFuture);
            if (message instanceof KafkaMessage) {
                KafkaMessage kafkaMessage = (KafkaMessage) message;
                producerRecord = new ProducerRecord(str, (Integer) null, (Long) null, kafkaMessage.getKey().orElse(null), kafkaMessage.getPayload(), kafkaMessage.getHeaders());
            } else {
                producerRecord = new ProducerRecord(str, message.getPayload());
            }
            this.kafkaProducer.send(producerRecord, (recordMetadata, exc) -> {
                if (exc == null) {
                    completableFuture.complete(null);
                    return;
                }
                this.subscription.cancel();
                LOGGER.log(System.Logger.Level.WARNING, "Error when sending kafka message to topic: " + str, exc);
                completableFuture.completeExceptionally(exc);
            });
        }
        CompletableFuture.allOf((CompletableFuture[]) arrayList.toArray(new CompletableFuture[0])).whenComplete((r5, th) -> {
            if (th == null) {
                message.ack().whenComplete((r8, th) -> {
                    if (this.backpressureCounter.getAndUpdate(j -> {
                        long j = j + 1;
                        if (j == this.backpressure) {
                            return 0L;
                        }
                        return j;
                    }) >= this.backpressure - 1) {
                        this.subscription.request(this.backpressure);
                    }
                });
            } else {
                message.nack(th);
            }
        });
    }

    public void onError(Throwable th) {
        Objects.requireNonNull(th);
        LOGGER.log(System.Logger.Level.ERROR, "The Kafka subscription has failed", th);
        this.kafkaProducer.close();
    }

    public void onComplete() {
        LOGGER.log(System.Logger.Level.DEBUG, () -> {
            return "Subscriber has finished";
        });
        this.kafkaProducer.close();
    }

    public static <K, V> Builder<K, V> builder() {
        return new Builder<>();
    }

    public static <K, V> KafkaSubscriber<K, V> create(Config config) {
        return builder().config(config).m8build();
    }
}
