package io.debezium.connector.spanner.task;

import io.debezium.connector.spanner.exception.SpannerConnectorException;
import io.debezium.connector.spanner.kafka.internal.model.RebalanceState;
import io.debezium.connector.spanner.metrics.MetricsEventPublisher;
import io.debezium.connector.spanner.metrics.event.TaskSyncContextMetricEvent;
import io.debezium.connector.spanner.task.utils.TimeoutMeter;
import io.debezium.util.Clock;
import io.debezium.util.Metronome;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.UnaryOperator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/spanner/task/TaskSyncContextHolder.class */
public class TaskSyncContextHolder {
    private static final Logger LOGGER = LoggerFactory.getLogger(TaskSyncContextHolder.class);
    private final MetricsEventPublisher metricsEventPublisher;
    private final ReentrantLock lock = new ReentrantLock();
    private final AtomicReference<TaskSyncContext> taskSyncContextRef = new AtomicReference<>();
    private final Duration sleepInterval = Duration.ofMillis(100);
    private final Clock clock = Clock.system();

    public TaskSyncContextHolder(MetricsEventPublisher metricsEventPublisher) {
        this.metricsEventPublisher = metricsEventPublisher;
    }

    public final void init(TaskSyncContext taskSyncContext) {
        this.taskSyncContextRef.set(taskSyncContext);
        this.metricsEventPublisher.publishMetricEvent(new TaskSyncContextMetricEvent(taskSyncContext));
    }

    public TaskSyncContext get() {
        return this.taskSyncContextRef.get();
    }

    public String lockDebugString() {
        return "Lock Debug String {is locked: " + this.lock.isLocked() + ", isLockedByCurrentThread: " + this.lock.isHeldByCurrentThread() + ", lock debug string: " + this.lock.toString() + "current thread " + Thread.currentThread().getName();
    }

    public void update(UnaryOperator<TaskSyncContext> unaryOperator) {
        updateAndGet(unaryOperator);
    }

    public TaskSyncContext updateAndGet(UnaryOperator<TaskSyncContext> unaryOperator) {
        this.lock.lock();
        try {
            TaskSyncContext updateAndGet = this.taskSyncContextRef.updateAndGet(unaryOperator);
            this.metricsEventPublisher.publishMetricEvent(new TaskSyncContextMetricEvent(updateAndGet));
            return updateAndGet;
        } finally {
            this.lock.unlock();
        }
    }

    public void awaitInitialization(Duration duration) {
        LOGGER.debug("Task {} awaitInitialization: start", get().getTaskUid());
        TimeoutMeter timeout = TimeoutMeter.setTimeout(duration);
        while (RebalanceState.START_INITIAL_SYNC.equals(get().getRebalanceState())) {
            if (timeout.isExpired()) {
                LOGGER.debug("Await task initialization timeout expired");
                throw new SpannerConnectorException("Await task initialization timeout expired");
            }
        }
        LOGGER.debug("Task {} awaitInitialization: end", get().getTaskUid());
    }

    public void awaitNewEpoch() {
        while (!RebalanceState.NEW_EPOCH_STARTED.equals(get().getRebalanceState())) {
            if (Thread.interrupted()) {
                Thread.currentThread().interrupt();
                return;
            } else {
                try {
                    Metronome.sleeper(this.sleepInterval, this.clock).pause();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }
}
