package io.debezium.connector.spanner.task;

import com.google.cloud.Timestamp;
import io.debezium.connector.spanner.SpannerConnectorConfig;
import io.debezium.util.Clock;
import io.debezium.util.Metronome;
import io.debezium.util.Stopwatch;
import java.lang.Thread;
import java.time.Duration;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/spanner/task/LowWatermarkCalculationJob.class */
public class LowWatermarkCalculationJob {
    private static final Logger LOGGER = LoggerFactory.getLogger(LowWatermarkCalculationJob.class);
    private volatile Thread calculationThread;
    private final Consumer<Throwable> errorHandler;
    private final boolean enabled;
    private final long period;
    private final LowWatermarkCalculator lowWatermarkCalculator;
    private final LowWatermarkHolder lowWatermarkHolder;
    private final String taskUid;
    private final Duration pollInterval = Duration.ofMillis(60000);
    private final Duration sleepInterval = Duration.ofMillis(100);
    private final Clock clock = Clock.system();

    public LowWatermarkCalculationJob(SpannerConnectorConfig spannerConnectorConfig, Consumer<Throwable> consumer, LowWatermarkCalculator lowWatermarkCalculator, LowWatermarkHolder lowWatermarkHolder, String str) {
        this.errorHandler = consumer;
        this.lowWatermarkCalculator = lowWatermarkCalculator;
        this.lowWatermarkHolder = lowWatermarkHolder;
        this.enabled = spannerConnectorConfig.isLowWatermarkEnabled();
        this.period = spannerConnectorConfig.getLowWatermarkUpdatePeriodMs();
        this.taskUid = str;
    }

    private Thread createCalculationThread() {
        Thread thread = new Thread(() -> {
            try {
                Stopwatch start = Stopwatch.accumulating().start();
                Metronome sleeper = Metronome.sleeper(Duration.ofMillis(this.period), this.clock);
                LOGGER.info("Task {}, beginning calculation of low watermark", this.taskUid);
                while (!Thread.currentThread().isInterrupted()) {
                    try {
                        boolean z = false;
                        if (start.stop().durations().statistics().getTotal().toMillis() >= this.pollInterval.toMillis()) {
                            z = true;
                            start = Stopwatch.accumulating().start();
                        } else {
                            start.start();
                        }
                        getLowWatermark(z);
                        try {
                            sleeper.pause();
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                            LOGGER.info("Task {}, ended calculation of low watermark", this.taskUid);
                            return;
                        }
                    } catch (InterruptedException e2) {
                        Thread.currentThread().interrupt();
                        LOGGER.info("Task {}, interrupted low watermark calculation", this.taskUid);
                        LOGGER.info("Task {}, ended calculation of low watermark", this.taskUid);
                        return;
                    }
                }
                LOGGER.info("Task {}, ended calculation of low watermark", this.taskUid);
            } catch (Throwable th) {
                LOGGER.info("Task {}, ended calculation of low watermark", this.taskUid);
                throw th;
            }
        }, "SpannerConnector-WatermarkCalculationJob-Calculation");
        thread.setUncaughtExceptionHandler((thread2, th) -> {
            LOGGER.error("Task {}, caught exception during low watermark calculation {}", this.taskUid, th);
            this.errorHandler.accept(th);
        });
        return thread;
    }

    private void getLowWatermark(boolean z) throws InterruptedException {
        Timestamp calculateLowWatermark;
        Metronome sleeper = Metronome.sleeper(Duration.ofMillis(100L), this.clock);
        do {
            try {
                calculateLowWatermark = this.lowWatermarkCalculator.calculateLowWatermark(z);
                if (calculateLowWatermark != null) {
                    break;
                } else {
                    sleeper.pause();
                }
            } catch (InterruptedException e) {
                LOGGER.warn("Task {}, low watermark calculation was interrupted", this.taskUid);
                throw e;
            }
        } while (!Thread.currentThread().isInterrupted());
        this.lowWatermarkHolder.setLowWatermark(calculateLowWatermark);
    }

    public void start() {
        if (this.enabled) {
            LOGGER.info("Task {}, Started low watermark calculation", this.taskUid);
            this.calculationThread = createCalculationThread();
            this.calculationThread.start();
        }
    }

    public void stop() {
        if (this.calculationThread != null) {
            this.calculationThread.interrupt();
            Metronome sleeper = Metronome.sleeper(this.sleepInterval, this.clock);
            LOGGER.info("Task {}, stopping low watermark calculation thread ", this.taskUid);
            while (!this.calculationThread.getState().equals(Thread.State.TERMINATED)) {
                try {
                    sleeper.pause();
                    this.calculationThread.interrupt();
                    LOGGER.info("Task {}, still waiting for low watermark calculation thread to die", this.taskUid);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
            LOGGER.info("Task {}, stopped low watermark calculation thread ", this.taskUid);
            this.calculationThread = null;
        }
    }
}
