package io.strimzi.kafka.bridge;

import io.strimzi.kafka.bridge.config.KafkaConfig;
import java.time.Duration;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:io/strimzi/kafka/bridge/KafkaBridgeConsumer.class */
public class KafkaBridgeConsumer<K, V> {
    private static final Logger LOGGER = LogManager.getLogger(KafkaBridgeConsumer.class);
    private final KafkaConfig kafkaConfig;
    private final Deserializer<K> keyDeserializer;
    private final Deserializer<V> valueDeserializer;
    private Consumer<K, V> consumer;
    private final ConsumerRebalanceListener loggingPartitionsRebalance = new LoggingPartitionsRebalance();

    public KafkaBridgeConsumer(KafkaConfig kafkaConfig, Deserializer<K> deserializer, Deserializer<V> deserializer2) {
        this.kafkaConfig = kafkaConfig;
        this.keyDeserializer = deserializer;
        this.valueDeserializer = deserializer2;
    }

    public void close() {
        if (this.consumer != null) {
            this.consumer.close();
        }
    }

    public void create(Properties properties, String str) {
        Properties properties2 = new Properties();
        properties2.putAll(this.kafkaConfig.getConfig());
        properties2.putAll(this.kafkaConfig.getConsumerConfig().getConfig());
        properties2.put("group.id", str);
        if (properties != null) {
            properties2.putAll(properties);
        }
        this.consumer = new KafkaConsumer(properties2, this.keyDeserializer, this.valueDeserializer);
    }

    public void subscribe(List<SinkTopicSubscription> list) {
        if (list == null) {
            throw new IllegalArgumentException("Topic subscriptions cannot be null");
        }
        if (list.isEmpty()) {
            unsubscribe();
            return;
        }
        LOGGER.info("Subscribe to topics {}", list);
        Set set = (Set) list.stream().map((v0) -> {
            return v0.getTopic();
        }).collect(Collectors.toSet());
        LOGGER.trace("Subscribe thread {}", Thread.currentThread());
        this.consumer.subscribe(set, this.loggingPartitionsRebalance);
    }

    public void unsubscribe() {
        LOGGER.info("Unsubscribe from topics");
        LOGGER.trace("Unsubscribe thread {}", Thread.currentThread());
        this.consumer.unsubscribe();
    }

    public Set<TopicPartition> listSubscriptions() {
        LOGGER.info("Listing subscribed topics");
        LOGGER.trace("ListSubscriptions thread {}", Thread.currentThread());
        return this.consumer.assignment();
    }

    public void subscribe(Pattern pattern) {
        LOGGER.info("Subscribe to topics with pattern {}", pattern);
        LOGGER.trace("Subscribe thread {}", Thread.currentThread());
        this.consumer.subscribe(pattern, this.loggingPartitionsRebalance);
    }

    public void assign(List<SinkTopicSubscription> list) {
        if (list == null) {
            throw new IllegalArgumentException("Topic subscriptions cannot be null");
        }
        LOGGER.info("Assigning to topics partitions {}", list);
        HashSet hashSet = new HashSet();
        for (SinkTopicSubscription sinkTopicSubscription : list) {
            hashSet.add(new TopicPartition(sinkTopicSubscription.getTopic(), sinkTopicSubscription.getPartition().intValue()));
        }
        if (hashSet.isEmpty()) {
            unsubscribe();
        } else {
            LOGGER.trace("Assign thread {}", Thread.currentThread());
            this.consumer.assign(hashSet);
        }
    }

    public ConsumerRecords<K, V> poll(long j) {
        LOGGER.trace("Poll thread {}", Thread.currentThread());
        return this.consumer.poll(Duration.ofMillis(j));
    }

    public Map<TopicPartition, OffsetAndMetadata> commit(Map<TopicPartition, OffsetAndMetadata> map) {
        LOGGER.trace("Commit thread {}", Thread.currentThread());
        this.consumer.commitSync(map);
        return map;
    }

    public void commitLastPolledOffsets() {
        LOGGER.trace("Commit thread {}", Thread.currentThread());
        this.consumer.commitSync();
    }

    public void seek(TopicPartition topicPartition, long j) {
        LOGGER.trace("Seek thread {}", Thread.currentThread());
        this.consumer.seek(topicPartition, j);
    }

    public void seekToBeginning(Set<TopicPartition> set) {
        LOGGER.trace("SeekToBeginning thread {}", Thread.currentThread());
        this.consumer.seekToBeginning(set);
    }

    public void seekToEnd(Set<TopicPartition> set) {
        LOGGER.trace("SeekToEnd thread {}", Thread.currentThread());
        this.consumer.seekToEnd(set);
    }
}
