package org.apache.camel.component.kafka.consumer.support.streaming;

import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.apache.camel.component.kafka.KafkaConsumer;
import org.apache.camel.component.kafka.consumer.CommitManager;
import org.apache.camel.component.kafka.consumer.errorhandler.KafkaConsumerListener;
import org.apache.camel.component.kafka.consumer.support.AbstractKafkaRecordProcessorFacade;
import org.apache.camel.component.kafka.consumer.support.ProcessingResult;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/camel-kafka-4.4.2.jar:org/apache/camel/component/kafka/consumer/support/streaming/KafkaRecordStreamingProcessorFacade.class */
public class KafkaRecordStreamingProcessorFacade extends AbstractKafkaRecordProcessorFacade {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) KafkaRecordStreamingProcessorFacade.class);
    private final KafkaRecordStreamingProcessor kafkaRecordProcessor;

    public KafkaRecordStreamingProcessorFacade(KafkaConsumer kafkaConsumer, String str, CommitManager commitManager, KafkaConsumerListener kafkaConsumerListener) {
        super(kafkaConsumer, str, commitManager, kafkaConsumerListener);
        this.kafkaRecordProcessor = buildKafkaRecordProcessor(commitManager);
    }

    private KafkaRecordStreamingProcessor buildKafkaRecordProcessor(CommitManager commitManager) {
        return new KafkaRecordStreamingProcessor(this.camelKafkaConsumer.getEndpoint().getConfiguration(), this.camelKafkaConsumer.getProcessor(), commitManager);
    }

    private ProcessingResult processRecord(TopicPartition topicPartition, boolean z, boolean z2, KafkaRecordStreamingProcessor kafkaRecordStreamingProcessor, ConsumerRecord<Object, Object> consumerRecord) {
        logRecord(consumerRecord);
        return kafkaRecordStreamingProcessor.processExchange(this.camelKafkaConsumer, topicPartition, z, z2, consumerRecord);
    }

    @Override // org.apache.camel.component.kafka.consumer.support.KafkaRecordProcessorFacade
    public ProcessingResult processPolledRecords(ConsumerRecords<Object, Object> consumerRecords) {
        logRecords(consumerRecords);
        ProcessingResult newUnprocessed = ProcessingResult.newUnprocessed();
        Set<TopicPartition> partitions = consumerRecords.partitions();
        Iterator<TopicPartition> it = partitions.iterator();
        LOG.debug("Poll received records on {} partitions", Integer.valueOf(partitions.size()));
        while (it.hasNext() && !isStopping()) {
            TopicPartition next = it.next();
            LOG.debug("Processing records on partition {}", Integer.valueOf(next.partition()));
            List<ConsumerRecord<Object, Object>> records = consumerRecords.records(next);
            Iterator<ConsumerRecord<Object, Object>> it2 = records.iterator();
            logRecordsInPartition(records, next);
            while (!newUnprocessed.isBreakOnErrorHit() && it2.hasNext() && !isStopping()) {
                ConsumerRecord<Object, Object> next2 = it2.next();
                LOG.debug("Processing record on partition {} with offset {}", Integer.valueOf(next2.partition()), Long.valueOf(next2.offset()));
                newUnprocessed = processRecord(next, it.hasNext(), it2.hasNext(), this.kafkaRecordProcessor, next2);
                LOG.debug("Processed record on partition {} with offset {}", Integer.valueOf(next2.partition()), Long.valueOf(next2.offset()));
                if (this.consumerListener != null && !this.consumerListener.afterProcess(newUnprocessed)) {
                    this.commitManager.commit(next);
                    return newUnprocessed;
                }
            }
            if (!newUnprocessed.isBreakOnErrorHit()) {
                LOG.debug("Committing offset on successful execution");
                this.commitManager.commit(next);
            }
        }
        return newUnprocessed;
    }
}
