package io.debezium.connector.cassandra;

import io.debezium.connector.cassandra.exceptions.CassandraConnectorConfigException;
import io.debezium.connector.cassandra.exceptions.CassandraConnectorTaskException;
import io.debezium.util.Threads;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.channels.OverlappingFileLockException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/cassandra/FileOffsetWriter.class */
public class FileOffsetWriter implements OffsetWriter {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) FileOffsetWriter.class);
    public static final String SNAPSHOT_OFFSET_FILE = "snapshot_offset.properties";
    public static final String COMMITLOG_OFFSET_FILE = "commitlog_offset.properties";
    private final Properties snapshotProps = new Properties();
    private final Properties commitLogProps = new Properties();
    private final File snapshotOffsetFile;
    private final File commitLogOffsetFile;
    private final FileLock snapshotOffsetFileLock;
    private final FileLock commitLogOffsetFileLock;
    private final OffsetFlushPolicy offsetFlushPolicy;
    private final ExecutorService executorService;

    public FileOffsetWriter(CassandraConnectorConfig cassandraConnectorConfig) throws IOException {
        if (cassandraConnectorConfig.offsetBackingStoreDir() == null) {
            throw new CassandraConnectorConfigException("Offset file directory must be configured at the start");
        }
        if (cassandraConnectorConfig.offsetFlushIntervalMs().isZero()) {
            this.offsetFlushPolicy = OffsetFlushPolicy.always();
        } else {
            this.offsetFlushPolicy = OffsetFlushPolicy.periodic(cassandraConnectorConfig.offsetFlushIntervalMs(), cassandraConnectorConfig.maxOffsetFlushSize());
        }
        File file = new File(cassandraConnectorConfig.offsetBackingStoreDir());
        if (!file.exists()) {
            Files.createDirectories(file.toPath(), new FileAttribute[0]);
        }
        this.snapshotOffsetFile = Paths.get(file.getAbsolutePath(), SNAPSHOT_OFFSET_FILE).toFile();
        this.commitLogOffsetFile = Paths.get(file.getAbsolutePath(), COMMITLOG_OFFSET_FILE).toFile();
        this.snapshotOffsetFileLock = init(this.snapshotOffsetFile);
        this.commitLogOffsetFileLock = init(this.commitLogOffsetFile);
        loadOffset(this.snapshotOffsetFile, this.snapshotProps);
        loadOffset(this.commitLogOffsetFile, this.commitLogProps);
        this.executorService = Threads.newSingleThreadExecutor(AbstractSourceConnector.class, cassandraConnectorConfig.getConnectorName(), "offset-writer");
    }

    @Override // io.debezium.connector.cassandra.OffsetWriter
    public Future<?> markOffset(String str, String str2, boolean z) {
        return this.executorService.submit(() -> {
            performMarkOffset(str, str2, z);
        });
    }

    private void performMarkOffset(String str, String str2, boolean z) {
        if (z) {
            if (!isOffsetProcessed(str, str2, z)) {
                this.snapshotProps.setProperty(str, str2);
            }
        } else if (!isOffsetProcessed(str, str2, z)) {
            this.commitLogProps.setProperty(str, str2);
        }
        if (this.offsetFlushPolicy.shouldFlush()) {
            performFlush();
        }
    }

    @Override // io.debezium.connector.cassandra.OffsetWriter
    public boolean isOffsetProcessed(String str, String str2, boolean z) {
        if (z) {
            return this.snapshotProps.containsKey(str);
        }
        OffsetPosition parse = OffsetPosition.parse(str2);
        OffsetPosition parse2 = this.commitLogProps.containsKey(str) ? OffsetPosition.parse((String) this.commitLogProps.get(str)) : null;
        return parse2 != null && parse.compareTo(parse2) <= 0;
    }

    @Override // io.debezium.connector.cassandra.OffsetWriter
    public Future<?> flush() {
        return this.executorService.submit(this::performFlush);
    }

    private void performFlush() {
        try {
            saveOffset(this.snapshotOffsetFile, this.snapshotProps);
            saveOffset(this.commitLogOffsetFile, this.commitLogProps);
        } catch (IOException e) {
            LOGGER.warn("Ignoring flush failure", (Throwable) e);
        }
    }

    @Override // io.debezium.connector.cassandra.OffsetWriter
    public void close() {
        try {
            if (!this.executorService.awaitTermination(1L, TimeUnit.SECONDS)) {
                this.executorService.shutdown();
            }
        } catch (InterruptedException e) {
        }
        try {
            this.snapshotOffsetFileLock.release();
        } catch (IOException e2) {
            LOGGER.warn("Failed to release snapshot offset file lock");
        }
        try {
            this.commitLogOffsetFileLock.release();
        } catch (IOException e3) {
            LOGGER.warn("Failed to release commit log offset file lock");
        }
    }

    private static void saveOffset(File file, Properties properties) throws IOException {
        try {
            FileOutputStream fileOutputStream = new FileOutputStream(file);
            try {
                properties.store(fileOutputStream, (String) null);
                fileOutputStream.close();
            } finally {
            }
        } catch (IOException e) {
            throw new IOException("Failed to save offset for file " + file.getAbsolutePath(), e);
        }
    }

    private void loadOffset(File file, Properties properties) throws IOException {
        try {
            FileInputStream fileInputStream = new FileInputStream(file);
            try {
                properties.load(fileInputStream);
                fileInputStream.close();
            } finally {
            }
        } catch (IOException e) {
            throw new IOException("Failed to load offset for file " + file.getAbsolutePath(), e);
        }
    }

    private FileLock init(File file) throws IOException {
        Path initLockPath = initLockPath(file);
        try {
            FileLock tryLock = FileChannel.open(initLockPath, StandardOpenOption.READ, StandardOpenOption.WRITE).tryLock();
            if (tryLock == null) {
                throw new CassandraConnectorTaskException("Failed to acquire file lock on " + String.valueOf(initLockPath) + ". There might be another Cassandra Connector Task running");
            }
            return tryLock;
        } catch (OverlappingFileLockException e) {
            throw new CassandraConnectorTaskException("Failed to acquire file lock on " + String.valueOf(initLockPath) + ". There might be another thread running", e);
        }
    }

    private Path initLockPath(File file) throws IOException {
        if (!file.exists()) {
            Files.createFile(file.toPath(), new FileAttribute[0]);
        }
        if (System.getProperty("os.name").toLowerCase().contains("windows")) {
            file = new File(file.getAbsolutePath() + ".lock");
            if (!file.exists()) {
                Files.createFile(file.toPath(), new FileAttribute[0]);
            }
        }
        return file.toPath();
    }
}
