package io.debezium.connector.cassandra;

import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.commons.math3.util.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/cassandra/CommitLogIdxProcessor.class */
public class CommitLogIdxProcessor extends AbstractProcessor {
    private static final String NAME = "Commit Log Processor";
    private final CassandraConnectorContext context;
    private final File cdcDir;
    private AbstractDirectoryWatcher watcher;
    private final CommitLogProcessorMetrics metrics;
    private boolean initial;
    private final boolean errorCommitLogReprocessEnabled;
    private final CommitLogTransfer commitLogTransfer;
    private final ExecutorService executorService;
    private final CommitLogSegmentReader commitLogReader;
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) CommitLogIdxProcessor.class);
    static final Set<Pair<CommitLogIdxParser, Future<CommitLogProcessingResult>>> submittedProcessings = ConcurrentHashMap.newKeySet();

    public CommitLogIdxProcessor(CassandraConnectorContext cassandraConnectorContext, CommitLogProcessorMetrics commitLogProcessorMetrics, CommitLogSegmentReader commitLogSegmentReader, File file) {
        super(NAME, Duration.ZERO);
        this.initial = true;
        this.context = cassandraConnectorContext;
        this.commitLogTransfer = this.context.getCassandraConnectorConfig().getCommitLogTransfer();
        this.errorCommitLogReprocessEnabled = this.context.getCassandraConnectorConfig().errorCommitLogReprocessEnabled();
        this.cdcDir = file;
        this.executorService = Executors.newSingleThreadExecutor();
        this.metrics = commitLogProcessorMetrics;
        this.commitLogReader = commitLogSegmentReader;
    }

    @Override // io.debezium.connector.cassandra.AbstractProcessor
    public void initialize() {
        this.metrics.registerMetrics();
    }

    @Override // io.debezium.connector.cassandra.AbstractProcessor
    public void destroy() {
        this.metrics.unregisterMetrics();
    }

    @Override // io.debezium.connector.cassandra.AbstractProcessor
    public void stop() {
        try {
            this.executorService.shutdown();
            for (Pair<CommitLogIdxParser, Future<CommitLogProcessingResult>> pair : submittedProcessings) {
                try {
                    pair.getFirst().complete();
                    pair.getSecond().get();
                } catch (Exception e) {
                    LOGGER.warn("Waiting for submitted task to finish has failed.");
                }
            }
            super.stop();
        } catch (Exception e2) {
            throw new RuntimeException("Unable to close executor service in CommitLogProcessor in a timely manner");
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static synchronized void removeProcessing(CommitLogIdxParser commitLogIdxParser) {
        Optional<Pair<CommitLogIdxParser, Future<CommitLogProcessingResult>>> findFirst = submittedProcessings.stream().filter(pair -> {
            return pair.getFirst() == commitLogIdxParser;
        }).findFirst();
        Set<Pair<CommitLogIdxParser, Future<CommitLogProcessingResult>>> set = submittedProcessings;
        Objects.requireNonNull(set);
        findFirst.map((v1) -> {
            return r1.remove(v1);
        });
    }

    public void submit(Path path) {
        CommitLogIdxParser commitLogIdxParser = new CommitLogIdxParser(new LogicalCommitLog(path.toFile()), this.metrics, this.context, this.commitLogReader);
        ExecutorService executorService = this.executorService;
        Objects.requireNonNull(commitLogIdxParser);
        submittedProcessings.add(new Pair<>(commitLogIdxParser, executorService.submit(commitLogIdxParser::process)));
        LOGGER.debug("Processing {} callables.", Integer.valueOf(submittedProcessings.size()));
    }

    @Override // io.debezium.connector.cassandra.AbstractProcessor
    public boolean isRunning() {
        return (!super.isRunning() || this.executorService.isShutdown() || this.executorService.isTerminated()) ? false : true;
    }

    @Override // io.debezium.connector.cassandra.AbstractProcessor
    public void process() throws IOException, InterruptedException {
        if (this.watcher == null) {
            this.watcher = new AbstractDirectoryWatcher(this.cdcDir.toPath(), this.context.getCassandraConnectorConfig().cdcDirPollInterval(), Collections.singleton(StandardWatchEventKinds.ENTRY_CREATE)) { // from class: io.debezium.connector.cassandra.CommitLogIdxProcessor.1
                @Override // io.debezium.connector.cassandra.AbstractDirectoryWatcher
                void handleEvent(WatchEvent<?> watchEvent, Path path) {
                    if (CommitLogIdxProcessor.this.isRunning() && path.getFileName().toString().endsWith("_cdc.idx")) {
                        CommitLogIdxProcessor.this.submit(path);
                    }
                }
            };
        }
        if (this.initial) {
            LOGGER.info("Reading existing commit logs in {}", this.cdcDir);
            File[] indexes = CommitLogUtil.getIndexes(this.cdcDir);
            Arrays.sort(indexes, CommitLogUtil::compareCommitLogsIndexes);
            for (File file : indexes) {
                if (isRunning()) {
                    submit(file.toPath());
                }
            }
            if (this.errorCommitLogReprocessEnabled) {
                LOGGER.info("CommitLog Error Processing is enabled. Attempting to get all error commitLog files for re-processing.");
                this.commitLogTransfer.getErrorCommitLogFiles();
            }
            this.initial = false;
        }
        this.watcher.poll();
    }
}
