package io.helidon.messaging.connectors.kafka;

import io.helidon.common.configurable.ScheduledThreadPoolSupplier;
import io.helidon.config.Config;
import io.helidon.config.mp.MpConfig;
import io.helidon.messaging.Stoppable;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.context.BeforeDestroyed;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import java.lang.System;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.ScheduledExecutorService;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.spi.Connector;
import org.eclipse.microprofile.reactive.messaging.spi.ConnectorAttribute;
import org.eclipse.microprofile.reactive.messaging.spi.ConnectorAttributes;
import org.eclipse.microprofile.reactive.messaging.spi.IncomingConnectorFactory;
import org.eclipse.microprofile.reactive.messaging.spi.OutgoingConnectorFactory;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder;

@ApplicationScoped
@Connector(KafkaConnector.CONNECTOR_NAME)
@ConnectorAttributes({@ConnectorAttribute(name = "bootstrap.servers", description = "A list of comma separated host:port pairs to use for establishing the initial connection to the Kafka cluster.", mandatory = true, direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, type = "string"), @ConnectorAttribute(name = "key.deserializer", description = "Fully qualified name of key deserializer class that implements the org.apache.kafka.common.serialization.Deserializer interface.", mandatory = true, direction = ConnectorAttribute.Direction.INCOMING, type = "string"), @ConnectorAttribute(name = "value.deserializer", description = "Fully qualified name of value deserializer class that implements the org.apache.kafka.common.serialization.Deserializer interface.", mandatory = true, direction = ConnectorAttribute.Direction.INCOMING, type = "string"), @ConnectorAttribute(name = "key.serializer", description = "Fully qualified name of key serializer class that implements the org.apache.kafka.common.serialization.Serializer interface.", mandatory = true, direction = ConnectorAttribute.Direction.OUTGOING, type = "string"), @ConnectorAttribute(name = "value.serializer", description = "Fully qualified name of value serializer class that implements the org.apache.kafka.common.serialization.Serializer interface.", mandatory = true, direction = ConnectorAttribute.Direction.OUTGOING, type = "string"), @ConnectorAttribute(name = "topic", description = "Comma separated names of the topics to consume from.", mandatory = true, direction = ConnectorAttribute.Direction.INCOMING, type = "string"), @ConnectorAttribute(name = "topic", description = "Comma separated names of the topics to produce to.", mandatory = true, direction = ConnectorAttribute.Direction.OUTGOING, type = "string"), @ConnectorAttribute(name = "nack-dlq", description = "\"Dead Letter Queue\" topic name to send NACKED messages to, other connection properties are going to be derived from consumer config.", direction = ConnectorAttribute.Direction.INCOMING, type = "string"), @ConnectorAttribute(name = "nack-dlq.topic", description = "\"Dead Letter Queue\" topic name to send NACKED messages to.", direction = ConnectorAttribute.Direction.INCOMING, type = "string"), @ConnectorAttribute(name = "nack-dlq.bootstrap.servers", description = "A list of comma separated host:port pairs to use for establishing the initial connection to the Kafka cluster with the \"Dead Letter Queue\".", direction = ConnectorAttribute.Direction.INCOMING, type = "string"), @ConnectorAttribute(name = "nack-dlq.key.serializer", description = "Fully qualified name of key deserializer class that implements the org.apache.kafka.common.serialization.Serializer interface.", direction = ConnectorAttribute.Direction.INCOMING, type = "string"), @ConnectorAttribute(name = "nack-dlq.value.serializer", description = "Fully qualified name of value deserializer class that implements the org.apache.kafka.common.serialization.Serializer interface.", direction = ConnectorAttribute.Direction.INCOMING, type = "string"), @ConnectorAttribute(name = "auto.offset.reset", description = "What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server. Valid Values: [latest, earliest, none].", direction = ConnectorAttribute.Direction.INCOMING, type = "string"), @ConnectorAttribute(name = "poll.timeout", description = "The maximum time to block polling loop in milliseconds.", direction = ConnectorAttribute.Direction.INCOMING, defaultValue = "50", type = "long"), @ConnectorAttribute(name = "period.executions", description = "Period between successive executions of polling loop in milliseconds.", direction = ConnectorAttribute.Direction.INCOMING, defaultValue = "100", type = "long"), @ConnectorAttribute(name = "batch.size", description = "Producer will attempt to batch records together into fewer requests whenever multiple records are being sent to the same partition.", direction = ConnectorAttribute.Direction.OUTGOING, defaultValue = "16384", type = "int"), @ConnectorAttribute(name = "acks", description = "The number of acknowledgments the producer requires the leader to have received before considering a request complete. Valid Values: [all, -1, 0, 1].", direction = ConnectorAttribute.Direction.OUTGOING, defaultValue = "1", type = "string"), @ConnectorAttribute(name = "buffer.memory", description = "The total bytes of memory the producer can use to buffer records waiting to be sent to the server.", direction = ConnectorAttribute.Direction.OUTGOING, defaultValue = "33554432", type = "long"), @ConnectorAttribute(name = "compression.type", description = "The compression type for all data generated by the producer. The default is none (i.e. no compression). Valid Values: [none, gzip, snappy, lz4, zstd].", direction = ConnectorAttribute.Direction.OUTGOING, defaultValue = "none", type = "string")})
/* loaded from: input_file:io/helidon/messaging/connectors/kafka/KafkaConnector.class */
public class KafkaConnector implements IncomingConnectorFactory, OutgoingConnectorFactory, Stoppable {
    private static final System.Logger LOGGER = System.getLogger(KafkaConnector.class.getName());
    static final String CONNECTOR_NAME = "helidon-kafka";
    private final ScheduledExecutorService scheduler;
    private final Queue<KafkaPublisher<?, ?>> resources = new LinkedList();

    @Inject
    KafkaConnector(Config config) {
        this.scheduler = ScheduledThreadPoolSupplier.builder().threadNamePrefix("kafka-").config(config).build().get();
    }

    void terminate(@Observes @BeforeDestroyed(ApplicationScoped.class) Object obj) {
        stop();
    }

    Queue<KafkaPublisher<?, ?>> resources() {
        return this.resources;
    }

    public PublisherBuilder<? extends Message<?>> getPublisherBuilder(org.eclipse.microprofile.config.Config config) {
        KafkaPublisher<?, ?> m6build = KafkaPublisher.builder().config(MpConfig.toHelidonConfig(config)).scheduler(this.scheduler).m6build();
        LOGGER.log(System.Logger.Level.DEBUG, () -> {
            return String.format("Resource %s added", m6build);
        });
        this.resources.add(m6build);
        return ReactiveStreams.fromPublisher(m6build);
    }

    public SubscriberBuilder<? extends Message<?>, Void> getSubscriberBuilder(org.eclipse.microprofile.config.Config config) {
        return ReactiveStreams.fromSubscriber(KafkaSubscriber.create(MpConfig.toHelidonConfig(config)));
    }

    public static KafkaConnector create(Config config) {
        return new KafkaConnector(config);
    }

    public static KafkaConnector create() {
        return new KafkaConnector(Config.empty());
    }

    public void stop() {
        LOGGER.log(System.Logger.Level.DEBUG, () -> {
            return "Terminating KafkaConnector...";
        });
        this.scheduler.shutdown();
        LinkedList linkedList = new LinkedList();
        while (true) {
            KafkaPublisher<?, ?> poll = this.resources.poll();
            if (poll == null) {
                break;
            }
            try {
                poll.stop();
            } catch (Exception e) {
                linkedList.add(e);
            }
        }
        if (linkedList.isEmpty()) {
            LOGGER.log(System.Logger.Level.DEBUG, "KafkaConnector terminated successfuly");
        } else {
            linkedList.forEach(exc -> {
                LOGGER.log(System.Logger.Level.ERROR, "An error happened closing resource", exc);
            });
        }
    }

    public static KafkaConfigBuilder configBuilder() {
        return new KafkaConfigBuilder();
    }
}
