package io.debezium.connector.cassandra;

import io.debezium.DebeziumException;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.cassandra.CommitLogProcessingResult;
import io.debezium.connector.cassandra.exceptions.CassandraConnectorTaskException;
import java.util.List;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/cassandra/CommitLogIdxParser.class */
public class CommitLogIdxParser {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) CommitLogIdxParser.class);
    private final CommitLogSegmentReader commitLogReader;
    private final List<ChangeEventQueue<Event>> queues;
    private final CommitLogProcessorMetrics metrics;
    private final CommitLogTransfer commitLogTransfer;
    private final Set<String> erroneousCommitLogs;
    private boolean completePrematurely = false;
    private final LogicalCommitLog commitLog;
    private final int pollingInterval;
    private final boolean realTimeProcessingEnabled;
    private Integer offset;

    public CommitLogIdxParser(LogicalCommitLog logicalCommitLog, CommitLogProcessorMetrics commitLogProcessorMetrics, CassandraConnectorContext cassandraConnectorContext, CommitLogSegmentReader commitLogSegmentReader) {
        this.queues = cassandraConnectorContext.getQueues();
        this.metrics = commitLogProcessorMetrics;
        this.commitLog = logicalCommitLog;
        this.commitLogReader = commitLogSegmentReader;
        this.commitLogTransfer = cassandraConnectorContext.getCassandraConnectorConfig().getCommitLogTransfer();
        this.erroneousCommitLogs = cassandraConnectorContext.getErroneousCommitLogs();
        this.pollingInterval = cassandraConnectorContext.getCassandraConnectorConfig().getCommitLogMarkedCompletePollInterval();
        this.realTimeProcessingEnabled = cassandraConnectorContext.getCassandraConnectorConfig().isCommitLogRealTimeProcessingEnabled();
    }

    public void complete() {
        this.completePrematurely = true;
    }

    private CommitLogProcessingResult parse() {
        Integer num;
        try {
            parseIndexFile(this.commitLog);
            while (!this.commitLog.completed) {
                LOGGER.debug("Polling for completeness of idx file for: {}", this.commitLog);
                if (this.completePrematurely) {
                    LOGGER.warn("{} completed prematurely", this.commitLog);
                    return new CommitLogProcessingResult(this.commitLog, CommitLogProcessingResult.Result.COMPLETED_PREMATURELY);
                }
                if (this.realTimeProcessingEnabled) {
                    if (this.offset == null) {
                        LOGGER.debug("Start to read the partial file : {}", this.commitLog);
                        num = 0;
                    } else if (this.offset.intValue() < this.commitLog.offsetOfEndOfLastWrittenCDCMutation) {
                        LOGGER.debug("Resume to read the partial file: {}", this.commitLog);
                        num = this.offset;
                    } else {
                        LOGGER.debug("No movement in offset in idx file: {}", this.commitLog);
                        num = null;
                    }
                    if (num != null) {
                        this.metrics.setCommitLogPosition(num.intValue());
                        processCommitLog(this.commitLog, num.intValue());
                        this.offset = Integer.valueOf(this.commitLog.offsetOfEndOfLastWrittenCDCMutation);
                    }
                }
                LOGGER.debug("Sleep for idx file to be complete");
                Thread.sleep(this.pollingInterval);
                parseIndexFile(this.commitLog);
            }
            LOGGER.info("Completed idx file for: {}", this.commitLog);
            int intValue = this.offset == null ? 0 : this.offset.intValue();
            this.metrics.setCommitLogPosition(intValue);
            processCommitLog(this.commitLog, intValue);
            return new CommitLogProcessingResult(this.commitLog);
        } catch (Exception e) {
            LOGGER.error("Processing of {} errored out", this.commitLog, e);
            return new CommitLogProcessingResult(this.commitLog, CommitLogProcessingResult.Result.ERROR, e);
        }
    }

    public CommitLogProcessingResult process() {
        if (!this.commitLog.exists()) {
            LOGGER.warn("Commit log " + String.valueOf(this.commitLog) + " does not exist!");
            return new CommitLogProcessingResult(this.commitLog, CommitLogProcessingResult.Result.DOES_NOT_EXIST);
        }
        LOGGER.info("Processing commit log {}", this.commitLog.log.toString());
        this.metrics.setCommitLogFilename(this.commitLog.log.toString());
        this.metrics.setCommitLogPosition(0L);
        CommitLogProcessingResult parse = parse();
        if (parse.result == CommitLogProcessingResult.Result.OK || parse.result == CommitLogProcessingResult.Result.ERROR) {
            enqueueEOFEvent();
        }
        CommitLogIdxProcessor.removeProcessing(this);
        LOGGER.debug("Processing {} callables.", Integer.valueOf(CommitLogIdxProcessor.submittedProcessings.size()));
        return parse;
    }

    private void enqueueEOFEvent() {
        try {
            this.queues.get(Math.abs(this.commitLog.log.getName().hashCode() % this.queues.size())).enqueue(new EOFEvent(this.commitLog.log));
        } catch (InterruptedException e) {
            throw new CassandraConnectorTaskException(String.format("Enqueuing has been interrupted while enqueuing EOF Event for file %s", this.commitLog.log.getName()), e);
        }
    }

    private void processCommitLog(LogicalCommitLog logicalCommitLog, int i) {
        try {
            LOGGER.debug("Starting to read commit log segments {} on position {}", logicalCommitLog, Integer.valueOf(i));
            this.commitLogReader.readCommitLogSegment(logicalCommitLog.log, logicalCommitLog.commitLogSegmentId, i);
            LOGGER.debug("Finished reading commit log segments {} on position {}", logicalCommitLog, Integer.valueOf(i));
        } catch (Exception e) {
            if (this.commitLogTransfer.getClass().getName().equals(CassandraConnectorConfig.DEFAULT_COMMIT_LOG_TRANSFER_CLASS)) {
                throw new DebeziumException(String.format("Error occurred while processing commit log %s", logicalCommitLog.log), e);
            }
            LOGGER.error("Error occurred while processing commit log " + String.valueOf(logicalCommitLog.log), (Throwable) e);
            this.erroneousCommitLogs.add(logicalCommitLog.log.getName());
            enqueueEOFEvent();
        }
    }

    private void parseIndexFile(LogicalCommitLog logicalCommitLog) throws DebeziumException {
        try {
            logicalCommitLog.parseCommitLogIndex();
        } catch (DebeziumException e) {
            this.erroneousCommitLogs.add(logicalCommitLog.log.getName());
            throw e;
        }
    }
}
