package io.debezium.connector.cassandra;

import io.debezium.DebeziumException;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.spi.topic.TopicNamingStrategy;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.connect.storage.Converter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/cassandra/KafkaRecordEmitter.class */
public class KafkaRecordEmitter implements Emitter {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) KafkaRecordEmitter.class);
    private static final int RECORD_LOG_COUNT = 10000;
    private final KafkaProducer<byte[], byte[]> producer;
    private final TopicNamingStrategy<KeyspaceTable> topicNamingStrategy;
    private final OffsetWriter offsetWriter;
    private final Set<String> erroneousCommitLogs;
    private final CommitLogTransfer commitLogTransfer;
    private final Converter keyConverter;
    private final Converter valueConverter;
    private final AtomicLong emitCount = new AtomicLong();

    public KafkaRecordEmitter(CassandraConnectorConfig cassandraConnectorConfig, KafkaProducer<byte[], byte[]> kafkaProducer, OffsetWriter offsetWriter, Converter converter, Converter converter2, Set<String> set, CommitLogTransfer commitLogTransfer) {
        this.producer = kafkaProducer;
        this.topicNamingStrategy = cassandraConnectorConfig.getTopicNamingStrategy(CommonConnectorConfig.TOPIC_NAMING_STRATEGY);
        this.offsetWriter = offsetWriter;
        this.erroneousCommitLogs = set;
        this.commitLogTransfer = commitLogTransfer;
        this.keyConverter = converter;
        this.valueConverter = converter2;
    }

    @Override // io.debezium.connector.cassandra.Emitter
    public void emit(Record record) {
        try {
            ProducerRecord<byte[], byte[]> producerRecord = toProducerRecord(record);
            this.producer.send(producerRecord, (recordMetadata, exc) -> {
                callback(record, exc);
            });
            LOGGER.trace("Sent to topic {}: {}", producerRecord.topic(), record);
        } catch (Exception e) {
            if (record.getSource().snapshot || this.commitLogTransfer.getClass().getName().equals(CassandraConnectorConfig.DEFAULT_COMMIT_LOG_TRANSFER_CLASS)) {
                throw new DebeziumException(String.format("Failed to send record %s", record), e);
            }
            LOGGER.error("Failed to send the record {}. Error: ", record, e);
            this.erroneousCommitLogs.add(record.getSource().offsetPosition.fileName);
        }
    }

    protected ProducerRecord<byte[], byte[]> toProducerRecord(Record record) {
        String dataChangeTopic = this.topicNamingStrategy.dataChangeTopic(record.getSource().keyspaceTable);
        return new ProducerRecord<>(dataChangeTopic, this.keyConverter.fromConnectData(dataChangeTopic, record.getKeySchema(), record.buildKey()), this.valueConverter.fromConnectData(dataChangeTopic, record.getValueSchema(), record.buildValue()));
    }

    private void callback(Record record, Exception exc) {
        if (exc != null) {
            LOGGER.error("Failed to emit record {}", record, exc);
            return;
        }
        long incrementAndGet = this.emitCount.incrementAndGet();
        if (incrementAndGet % 10000 == 0) {
            LOGGER.debug("Emitted {} records to Kafka Broker", Long.valueOf(incrementAndGet));
            this.emitCount.addAndGet(-incrementAndGet);
        }
        if (hasOffset(record)) {
            markOffset(record);
        }
    }

    private boolean hasOffset(Record record) {
        return (record.getSource().snapshot || this.commitLogTransfer.getClass().getName().equals(CassandraConnectorConfig.DEFAULT_COMMIT_LOG_TRANSFER_CLASS)) ? record.shouldMarkOffset() : record.shouldMarkOffset() && !this.erroneousCommitLogs.contains(record.getSource().offsetPosition.fileName);
    }

    private void markOffset(Record record) {
        SourceInfo source = record.getSource();
        String name = source.keyspaceTable.name();
        String serialize = source.offsetPosition.serialize();
        boolean z = source.snapshot;
        this.offsetWriter.markOffset(name, serialize, z);
        if (z) {
            LOGGER.debug("Mark snapshot offset for table '{}'", name);
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        this.producer.close();
    }
}
