package loghub.receivers;

import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import loghub.BuilderClass;
import loghub.ConnectionContext;
import loghub.Helpers;
import loghub.configuration.Properties;
import loghub.receivers.Receiver;
import lombok.Generated;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;

@Blocking
@BuilderClass(Builder.class)
/* loaded from: input_file:loghub/receivers/Kafka.class */
public class Kafka extends Receiver<Kafka, Builder> {
    private Consumer<Long, byte[]> consumer;
    private final String[] brokers;
    private final int port;
    private final String topic;
    private final String group;
    private final String keyDeserializer;

    /* loaded from: input_file:loghub/receivers/Kafka$Builder.class */
    public static class Builder extends Receiver.Builder<Kafka, Builder> {
        private String topic;
        private String[] brokers = {"localhost"};
        private int port = 9092;
        private String group = "loghub";
        private String keyDeserializer = ByteArrayDeserializer.class.getName();

        /* renamed from: build, reason: merged with bridge method [inline-methods] */
        public Kafka m0build() {
            return new Kafka(this);
        }

        @Generated
        public void setBrokers(String[] strArr) {
            this.brokers = strArr;
        }

        @Generated
        public void setPort(int i) {
            this.port = i;
        }

        @Generated
        public void setTopic(String str) {
            this.topic = str;
        }

        @Generated
        public void setGroup(String str) {
            this.group = str;
        }

        @Generated
        public void setKeyDeserializer(String str) {
            this.keyDeserializer = str;
        }
    }

    /* loaded from: input_file:loghub/receivers/Kafka$KafkaContext.class */
    public static class KafkaContext extends ConnectionContext<Object> {
        public final String topic;

        KafkaContext(String str) {
            this.topic = str;
        }

        public Object getLocalAddress() {
            return null;
        }

        public Object getRemoteAddress() {
            return null;
        }
    }

    public static Builder getBuilder() {
        return new Builder();
    }

    protected Kafka(Builder builder) {
        super(builder);
        this.brokers = (String[]) Arrays.copyOf(builder.brokers, builder.brokers.length);
        this.port = builder.port;
        this.topic = builder.topic;
        this.group = builder.group;
        this.keyDeserializer = builder.keyDeserializer;
    }

    public String getReceiverName() {
        return String.format("Kafka/%s/%s", this.topic, Integer.valueOf(hashCode()));
    }

    public boolean configure(Properties properties) {
        java.util.Properties properties2 = new java.util.Properties();
        properties2.put("bootstrap.servers", (String) Arrays.stream(Helpers.stringsToUri(this.brokers, this.port, "http", this.logger)).map(uri -> {
            return uri.getHost() + ":" + uri.getPort();
        }).collect(Collectors.joining(",")));
        properties2.put(ConsumerConfig.GROUP_ID_CONFIG, this.group);
        properties2.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, this.keyDeserializer);
        properties2.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
        this.consumer = new KafkaConsumer(properties2);
        return super.configure(properties);
    }

    public void run() {
        this.consumer.subscribe(Collections.singletonList(this.topic));
        boolean z = false;
        while (!isInterrupted()) {
            ConsumerRecords<Long, byte[]> poll = this.consumer.poll(100L);
            if (poll.count() != 0) {
                Iterator<ConsumerRecord<Long, byte[]>> it = poll.iterator();
                while (true) {
                    if (!it.hasNext()) {
                        break;
                    }
                    ConsumerRecord<Long, byte[]> next = it.next();
                    KafkaContext kafkaContext = new KafkaContext(next.topic());
                    Optional map = Optional.empty().map(obj -> {
                        if (next.timestampType() == TimestampType.CREATE_TIME) {
                            return new Date(next.timestamp());
                        }
                        return null;
                    });
                    Optional map2 = Optional.empty().map(obj2 -> {
                        Header[] array = next.headers().toArray();
                        if (array.length <= 0) {
                            return null;
                        }
                        HashMap hashMap = new HashMap(array.length);
                        Arrays.stream(array).forEach(header -> {
                            hashMap.put(header.key(), header.value());
                        });
                        return hashMap;
                    });
                    decodeStream(kafkaContext, next.value()).forEach(event -> {
                        Objects.requireNonNull(event);
                        map.ifPresent(event::setTimestamp);
                        map2.ifPresent(map3 -> {
                            event.put("headers", map3);
                        });
                        send(event);
                    });
                    if (isInterrupted()) {
                        this.consumer.commitSync(Collections.singletonMap(new TopicPartition(next.topic(), next.partition()), new OffsetAndMetadata(next.offset())));
                        z = true;
                        break;
                    }
                }
                if (z) {
                    break;
                } else {
                    this.consumer.commitAsync();
                }
            }
        }
        this.consumer.close();
        close();
    }

    public String[] getBrokers() {
        return (String[]) Arrays.copyOf(this.brokers, this.brokers.length);
    }

    @Generated
    public int getPort() {
        return this.port;
    }

    @Generated
    public String getTopic() {
        return this.topic;
    }

    @Generated
    public String getGroup() {
        return this.group;
    }

    @Generated
    public String getKeyDeserializer() {
        return this.keyDeserializer;
    }
}
