package io.debezium.connector.spanner.task.leader;

import io.debezium.DebeziumException;
import io.debezium.connector.spanner.kafka.internal.KafkaConsumerAdminService;
import io.debezium.connector.spanner.kafka.internal.TaskSyncPublisher;
import io.debezium.connector.spanner.kafka.internal.model.RebalanceState;
import io.debezium.connector.spanner.kafka.internal.model.TaskState;
import io.debezium.connector.spanner.kafka.internal.model.TaskSyncEvent;
import io.debezium.connector.spanner.task.LoggerUtils;
import io.debezium.connector.spanner.task.TaskStateUtil;
import io.debezium.connector.spanner.task.TaskSyncContext;
import io.debezium.connector.spanner.task.TaskSyncContextHolder;
import io.debezium.connector.spanner.task.leader.rebalancer.TaskPartitionRebalancer;
import io.debezium.util.Clock;
import io.debezium.util.Metronome;
import java.lang.Thread;
import java.time.Duration;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/spanner/task/leader/LeaderAction.class */
public class LeaderAction {
    private static final Logger LOGGER = LoggerFactory.getLogger(LeaderAction.class);
    private static final Duration EPOCH_OFFSET_UPDATE_DURATION = Duration.ofSeconds(300);
    private final TaskSyncContextHolder taskSyncContextHolder;
    private final KafkaConsumerAdminService kafkaAdminService;
    private final LeaderService leaderService;
    private final TaskPartitionRebalancer taskPartitonRebalancer;
    private final TaskSyncPublisher taskSyncPublisher;
    private volatile Thread leaderThread;
    private Consumer<Throwable> errorHandler;
    private final Duration sleepInterval = Duration.ofMillis(100);
    private final Clock clock = Clock.system();

    public LeaderAction(TaskSyncContextHolder taskSyncContextHolder, KafkaConsumerAdminService kafkaConsumerAdminService, LeaderService leaderService, TaskPartitionRebalancer taskPartitionRebalancer, TaskSyncPublisher taskSyncPublisher, Consumer<Throwable> consumer) {
        this.taskSyncContextHolder = taskSyncContextHolder;
        this.kafkaAdminService = kafkaConsumerAdminService;
        this.leaderService = leaderService;
        this.taskPartitonRebalancer = taskPartitionRebalancer;
        this.taskSyncPublisher = taskSyncPublisher;
        this.errorHandler = consumer;
    }

    private Thread createLeaderThread(long j) {
        Thread thread = new Thread(() -> {
            LOGGER.info("performLeaderAction: Task {} start leader thread with rebalance generation ID {}", this.taskSyncContextHolder.get().getTaskUid(), Long.valueOf(j));
            try {
                newEpoch(j);
                while (!Thread.currentThread().isInterrupted()) {
                    try {
                        Thread.sleep(EPOCH_OFFSET_UPDATE_DURATION.toMillis());
                        if (this.taskSyncContextHolder.get().getRebalanceState() == RebalanceState.NEW_EPOCH_STARTED) {
                            publishEpochOffset();
                        }
                    } catch (InterruptedException e) {
                        LOGGER.info("performLeaderAction: Task {} stop leader thread", this.taskSyncContextHolder.get().getTaskUid());
                        Thread.currentThread().interrupt();
                        return;
                    }
                }
                LOGGER.info("performLeaderAction: Task {} stopped leader thread", this.taskSyncContextHolder.get().getTaskUid());
            } catch (InterruptedException e2) {
                LOGGER.info("performLeaderAction: Task {} stop leader thread", this.taskSyncContextHolder.get().getTaskUid());
                Thread.currentThread().interrupt();
            }
        }, "SpannerConnector-LeaderAction");
        thread.setUncaughtExceptionHandler((thread2, th) -> {
            LOGGER.error("Leader action execution error, task {}, ex", this.taskSyncContextHolder.get().getTaskUid(), th);
            this.errorHandler.accept(th);
        });
        return thread;
    }

    private TaskSyncContext publishEpochOffset() throws InterruptedException {
        TaskSyncContext updateAndGet = this.taskSyncContextHolder.updateAndGet(taskSyncContext -> {
            return taskSyncContext.toBuilder().epochOffsetHolder(taskSyncContext.getEpochOffsetHolder().nextOffset(taskSyncContext.getCurrentKafkaRecordOffset())).build();
        });
        TaskSyncEvent buildUpdateEpochTaskSyncEvent = updateAndGet.buildUpdateEpochTaskSyncEvent();
        this.taskSyncPublisher.send(buildUpdateEpochTaskSyncEvent);
        LOGGER.info("Task {} - Leader task has updated the epoch offset with rebalance generation ID: {} and epoch offset: {}, num partitions {}, num shared partitions {}", new Object[]{this.taskSyncContextHolder.get().getTaskUid(), Long.valueOf(updateAndGet.getRebalanceGenerationId()), Long.valueOf(updateAndGet.getEpochOffsetHolder().getEpochOffset()), Integer.valueOf(buildUpdateEpochTaskSyncEvent.getNumPartitions()), Integer.valueOf(buildUpdateEpochTaskSyncEvent.getNumSharedPartitions())});
        return updateAndGet;
    }

    public void start(long j) {
        if (this.leaderThread != null) {
            stop(j);
        }
        this.leaderThread = createLeaderThread(j);
        this.leaderThread.start();
    }

    public void stop(long j) {
        if (this.leaderThread == null) {
            return;
        }
        LOGGER.info("Task {}, trying to stop leader thread with rebalance generation ID {}", this.taskSyncContextHolder.get().getTaskUid(), Long.valueOf(j));
        this.leaderThread.interrupt();
        LOGGER.info("Task {}, interrupted leader thread with rebalance generation ID {}", this.taskSyncContextHolder.get().getTaskUid(), Long.valueOf(j));
        Metronome sleeper = Metronome.sleeper(this.sleepInterval, this.clock);
        while (!this.leaderThread.getState().equals(Thread.State.TERMINATED)) {
            try {
                sleeper.pause();
                this.leaderThread.interrupt();
                LOGGER.info("Task {}, still waiting for leader thread to die with rebalance generation ID {}", this.taskSyncContextHolder.get().getTaskUid(), Long.valueOf(j));
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        LOGGER.info("Task {}, stopped leader thread with rebalance generation ID {}", this.taskSyncContextHolder.get().getTaskUid(), Long.valueOf(j));
        this.leaderThread = null;
    }

    private void newEpoch(long j) throws InterruptedException {
        LOGGER.info("performLeaderActions: new epoch initialization");
        boolean isStartFromScratch = this.leaderService.isStartFromScratch();
        Set<String> activeConsumerGroupMembers = this.kafkaAdminService.getActiveConsumerGroupMembers();
        LOGGER.info("Task {} with consumer {}, performLeaderActions: consumers found {}", new Object[]{this.taskSyncContextHolder.get().getTaskUid(), this.taskSyncContextHolder.get().getConsumerId(), activeConsumerGroupMembers});
        activeConsumerGroupMembers.remove(this.taskSyncContextHolder.get().getConsumerId());
        LOGGER.info("Task {} with consumer {}, performLeaderActions: consumers found without leader {}", new Object[]{this.taskSyncContextHolder.get().getTaskUid(), this.taskSyncContextHolder.get().getConsumerId(), activeConsumerGroupMembers});
        Map<String, String> awaitAllNewTaskStateUpdates = this.leaderService.awaitAllNewTaskStateUpdates(activeConsumerGroupMembers, j);
        LOGGER.info("performLeaderActions: answers received {}", awaitAllNewTaskStateUpdates);
        if (awaitAllNewTaskStateUpdates.size() < activeConsumerGroupMembers.size()) {
            LOGGER.info("TaskUid {}, Expected active consumers {}, but only received consumers {}, not sending new epoch", new Object[]{this.taskSyncContextHolder.get().getTaskUid(), activeConsumerGroupMembers, awaitAllNewTaskStateUpdates});
            throw new DebeziumException("Task Uid " + this.taskSyncContextHolder.get().getTaskUid() + " Expected active consumers " + activeConsumerGroupMembers.toString() + " but only received consumers " + awaitAllNewTaskStateUpdates.toString() + " not sending new epoch ");
        }
        boolean z = false;
        if (this.taskSyncContextHolder.get().checkDuplication(false, "NEW EPOCH rebalance event, initial context")) {
            z = true;
        }
        int numPartitions = this.taskSyncContextHolder.get().getNumPartitions();
        int numSharedPartitions = this.taskSyncContextHolder.get().getNumSharedPartitions();
        LOGGER.info("Task {} - before sending new epoch, total partitions {}, num partitions {}, num shared partitions {}", new Object[]{Integer.valueOf(numPartitions + numSharedPartitions), Integer.valueOf(numPartitions), Integer.valueOf(numSharedPartitions)});
        TaskSyncContext updateAndGet = this.taskSyncContextHolder.updateAndGet(taskSyncContext -> {
            TaskState currentTaskState = taskSyncContext.getCurrentTaskState();
            Map<String, TaskState> taskStates = taskSyncContext.getTaskStates();
            LOGGER.info("Task {}, Current task states in old context: {}", taskSyncContext.getTaskUid(), (Set) taskStates.keySet().stream().collect(Collectors.toSet()));
            Map<Boolean, Map<String, TaskState>> splitSurvivedAndObsoleteTaskStates = TaskStateUtil.splitSurvivedAndObsoleteTaskStates(taskStates, awaitAllNewTaskStateUpdates.values());
            Map<String, TaskState> map = splitSurvivedAndObsoleteTaskStates.get(true);
            return taskSyncContext.toBuilder().currentTaskState(this.taskPartitonRebalancer.rebalance(currentTaskState, map, splitSurvivedAndObsoleteTaskStates.get(false)).toBuilder().rebalanceGenerationId(j).build()).rebalanceState(RebalanceState.NEW_EPOCH_STARTED).rebalanceGenerationId(j).taskStates(TaskStateUtil.filterSurvivedTasksStates(taskSyncContext.getTaskStates(), map.keySet())).epochOffsetHolder(taskSyncContext.getEpochOffsetHolder().nextOffset(taskSyncContext.getCurrentKafkaRecordOffset())).build();
        });
        if (!z) {
            updateAndGet.checkDuplication(true, "NEW EPOCH rebalance event, resulting context");
        }
        TaskSyncEvent buildNewEpochTaskSyncEvent = updateAndGet.buildNewEpochTaskSyncEvent();
        int numPartitions2 = buildNewEpochTaskSyncEvent.getNumPartitions();
        int numSharedPartitions2 = buildNewEpochTaskSyncEvent.getNumSharedPartitions();
        LOGGER.info("Task {} - sent new epoch with rebalance generation ID {}, num tasks {}, total partitions {}, num owned partitions {}, num shared partitions {}, task Uids {}", new Object[]{updateAndGet.getTaskUid(), Long.valueOf(updateAndGet.getRebalanceGenerationId()), Integer.valueOf(buildNewEpochTaskSyncEvent.getTaskStates().size()), Integer.valueOf(numPartitions2 + numSharedPartitions2), Integer.valueOf(numPartitions2), Integer.valueOf(numSharedPartitions2), (Set) buildNewEpochTaskSyncEvent.getTaskStates().keySet().stream().collect(Collectors.toSet()), Integer.valueOf(numPartitions2 + numSharedPartitions2)});
        this.taskSyncPublisher.send(buildNewEpochTaskSyncEvent);
        if (isStartFromScratch) {
            this.leaderService.newParentPartition();
            LOGGER.info("performLeaderActions: newParentPartition");
        }
        LoggerUtils.debug(LOGGER, "performLeaderActions: new epoch {}", buildNewEpochTaskSyncEvent);
    }
}
