package io.debezium.connector.spanner.db.stream;

import com.google.common.annotations.VisibleForTesting;
import io.debezium.connector.spanner.db.model.InitialPartition;
import io.debezium.connector.spanner.db.model.event.ChangeStreamEvent;
import io.debezium.connector.spanner.db.stream.exception.ChangeStreamException;
import io.debezium.connector.spanner.db.stream.exception.FailureChangeStreamException;
import io.debezium.connector.spanner.metrics.MetricsEventPublisher;
import io.debezium.connector.spanner.metrics.event.StuckHeartbeatIntervalsMetricEvent;
import io.debezium.function.BlockingConsumer;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalAmount;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/spanner/db/stream/PartitionQueryingMonitor.class */
public class PartitionQueryingMonitor {
    private static final Logger LOGGER = LoggerFactory.getLogger(PartitionThreadPool.class);
    private static final Duration CHECK_INTERVAL = Duration.of(60000, ChronoUnit.MILLIS);
    private final PartitionThreadPool partitionThreadPool;
    private final long heartBeatIntervalMillis;
    private final Duration timeout;
    private volatile Thread thread;
    private final Map<String, Instant> lastEventTimestampMap = new ConcurrentHashMap();
    private final Consumer<ChangeStreamException> errorConsumer;
    private final BlockingConsumer<String> onStuckPartitionConsumer;
    private final MetricsEventPublisher metricsEventPublisher;

    public PartitionQueryingMonitor(PartitionThreadPool partitionThreadPool, Duration duration, BlockingConsumer<String> blockingConsumer, Consumer<ChangeStreamException> consumer, MetricsEventPublisher metricsEventPublisher, int i) {
        this.partitionThreadPool = partitionThreadPool;
        this.heartBeatIntervalMillis = duration.toMillis();
        this.timeout = Duration.of(duration.toMillis() * i, ChronoUnit.MILLIS);
        this.errorConsumer = consumer;
        this.onStuckPartitionConsumer = blockingConsumer;
        this.metricsEventPublisher = metricsEventPublisher;
    }

    public void checkPartitionThreads() throws InterruptedException {
        while (!Thread.currentThread().isInterrupted()) {
            Set<String> activeThreads = this.partitionThreadPool.getActiveThreads();
            Set set = (Set) this.lastEventTimestampMap.keySet().stream().filter(str -> {
                return !activeThreads.contains(str);
            }).collect(Collectors.toSet());
            Map<String, Instant> map = this.lastEventTimestampMap;
            Objects.requireNonNull(map);
            set.forEach((v1) -> {
                r1.remove(v1);
            });
            int i = -1;
            for (String str2 : activeThreads) {
                if (!InitialPartition.isInitialPartition(str2)) {
                    Instant instant = this.lastEventTimestampMap.get(str2);
                    if (instant == null) {
                        this.lastEventTimestampMap.put(str2, Instant.now());
                    } else {
                        LOGGER.info("PartitionQueryingMonitor, token {} last received timestamp {}", str2, instant);
                        int stuckHeartbeatIntervals = stuckHeartbeatIntervals(instant);
                        if (stuckHeartbeatIntervals > i) {
                            this.metricsEventPublisher.publishMetricEvent(new StuckHeartbeatIntervalsMetricEvent(stuckHeartbeatIntervals));
                            i = stuckHeartbeatIntervals;
                        }
                        if (isPartitionStuck(instant)) {
                            this.lastEventTimestampMap.remove(str2);
                            this.onStuckPartitionConsumer.accept(str2);
                        }
                    }
                }
            }
            Thread.sleep(CHECK_INTERVAL.toMillis());
        }
    }

    @VisibleForTesting
    int stuckHeartbeatIntervals(Instant instant) {
        return (int) (Duration.between(instant, Instant.now()).toMillis() / this.heartBeatIntervalMillis);
    }

    @VisibleForTesting
    boolean isPartitionStuck(Instant instant) {
        return instant.isBefore(Instant.now().minus((TemporalAmount) this.timeout));
    }

    public void start() {
        if (this.thread != null) {
            return;
        }
        this.thread = new Thread(() -> {
            try {
                checkPartitionThreads();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }, "SpannerConnector-PartitionQueryingMonitor");
        this.thread.setUncaughtExceptionHandler((thread, th) -> {
            this.errorConsumer.accept(new FailureChangeStreamException("PartitionQueryingMonitor error", new RuntimeException(th)));
        });
        this.thread.start();
    }

    public void stop() {
        if (this.thread == null) {
            return;
        }
        this.thread.interrupt();
    }

    public void acceptStreamEvent(ChangeStreamEvent changeStreamEvent) {
        this.lastEventTimestampMap.put(changeStreamEvent.getMetadata().getPartitionToken(), Instant.now());
    }
}
