package io.debezium.connector.spanner.task;

import io.debezium.connector.spanner.SpannerConnectorConfig;
import io.debezium.connector.spanner.db.stream.ChangeStream;
import io.debezium.connector.spanner.exception.SpannerConnectorException;
import io.debezium.connector.spanner.kafka.internal.TaskSyncPublisher;
import io.debezium.connector.spanner.processor.SpannerEventDispatcher;
import io.debezium.connector.spanner.task.operation.ChildPartitionOperation;
import io.debezium.connector.spanner.task.operation.ClearSharedPartitionOperation;
import io.debezium.connector.spanner.task.operation.ConnectorEndDetectionOperation;
import io.debezium.connector.spanner.task.operation.FindPartitionForStreamingOperation;
import io.debezium.connector.spanner.task.operation.Operation;
import io.debezium.connector.spanner.task.operation.PartitionStatusUpdateOperation;
import io.debezium.connector.spanner.task.operation.RemoveFinishedPartitionOperation;
import io.debezium.connector.spanner.task.operation.TakePartitionForStreamingOperation;
import io.debezium.connector.spanner.task.operation.TakeSharedPartitionOperation;
import io.debezium.connector.spanner.task.state.NewPartitionsEvent;
import io.debezium.connector.spanner.task.state.PartitionStatusUpdateEvent;
import io.debezium.connector.spanner.task.state.SyncEvent;
import io.debezium.connector.spanner.task.state.TaskStateChangeEvent;
import java.time.Instant;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/spanner/task/TaskStateChangeEventHandler.class */
public class TaskStateChangeEventHandler {
    private static final Logger LOGGER = LoggerFactory.getLogger(TaskStateChangeEventHandler.class);
    private final TaskSyncContextHolder taskSyncContextHolder;
    private final TaskSyncPublisher taskSyncPublisher;
    private final ChangeStream changeStream;
    private final PartitionFactory partitionFactory;
    private final Runnable finishingHandler;
    private final SpannerConnectorConfig connectorConfig;
    private final SpannerEventDispatcher spannerEventDispatcher;
    private final Consumer<RuntimeException> errorHandler;
    private final AtomicLong failOverloadedTaskTimer = new AtomicLong(System.currentTimeMillis());

    public TaskStateChangeEventHandler(TaskSyncContextHolder taskSyncContextHolder, TaskSyncPublisher taskSyncPublisher, ChangeStream changeStream, PartitionFactory partitionFactory, SpannerEventDispatcher spannerEventDispatcher, Runnable runnable, SpannerConnectorConfig spannerConnectorConfig, Consumer<RuntimeException> consumer) {
        this.taskSyncContextHolder = taskSyncContextHolder;
        this.taskSyncPublisher = taskSyncPublisher;
        this.partitionFactory = partitionFactory;
        this.changeStream = changeStream;
        this.finishingHandler = runnable;
        this.connectorConfig = spannerConnectorConfig;
        this.errorHandler = consumer;
        this.spannerEventDispatcher = spannerEventDispatcher;
    }

    public void processEvent(TaskStateChangeEvent taskStateChangeEvent) throws InterruptedException {
        LOGGER.debug("process TaskStateChangeEvent of type: {}", taskStateChangeEvent.getClass().getSimpleName());
        long epochMilli = Instant.now().toEpochMilli();
        if (taskStateChangeEvent instanceof PartitionStatusUpdateEvent) {
            processEvent((PartitionStatusUpdateEvent) taskStateChangeEvent);
        } else if (taskStateChangeEvent instanceof NewPartitionsEvent) {
            processEvent((NewPartitionsEvent) taskStateChangeEvent);
        } else {
            if (!(taskStateChangeEvent instanceof SyncEvent)) {
                throw new IllegalStateException("Unknown event");
            }
            processSyncEvent();
        }
        LOGGER.debug("Task {}, TaskStateChangeEventHandler: Processed {} in {} millis", new Object[]{this.taskSyncContextHolder.get().getTaskUid(), taskStateChangeEvent.getClass().getSimpleName(), Long.valueOf(Instant.now().toEpochMilli() - epochMilli)});
    }

    private void processEvent(PartitionStatusUpdateEvent partitionStatusUpdateEvent) throws InterruptedException {
        performOperation(new PartitionStatusUpdateOperation(partitionStatusUpdateEvent.getToken(), partitionStatusUpdateEvent.getState()), new ClearSharedPartitionOperation(), new FindPartitionForStreamingOperation(), new TakePartitionForStreamingOperation(this.changeStream, this.partitionFactory));
    }

    private void processEvent(NewPartitionsEvent newPartitionsEvent) throws InterruptedException {
        performOperation(new ChildPartitionOperation(newPartitionsEvent.getPartitions()), new ClearSharedPartitionOperation(), new FindPartitionForStreamingOperation(), new TakePartitionForStreamingOperation(this.changeStream, this.partitionFactory), new RemoveFinishedPartitionOperation(this.spannerEventDispatcher, this.connectorConfig));
    }

    private void processSyncEvent() throws InterruptedException {
        failOverloadedTaskByTimer(performOperation(new ClearSharedPartitionOperation(), new TakeSharedPartitionOperation(), new FindPartitionForStreamingOperation(), new TakePartitionForStreamingOperation(this.changeStream, this.partitionFactory), new RemoveFinishedPartitionOperation(this.spannerEventDispatcher, this.connectorConfig), new ConnectorEndDetectionOperation(this.finishingHandler, this.connectorConfig.endTime())));
    }

    private void failOverloadedTaskByTimer(TaskSyncContext taskSyncContext) {
        if (this.connectorConfig.failOverloadedTask()) {
            synchronized (this) {
                this.failOverloadedTaskTimer.getAndUpdate(j -> {
                    long currentTimeMillis = System.currentTimeMillis();
                    if (j + this.connectorConfig.failOverloadedTaskInterval() >= currentTimeMillis) {
                        return j;
                    }
                    checkToFailOverloadedTask(taskSyncContext);
                    return currentTimeMillis;
                });
            }
        }
    }

    private synchronized void checkToFailOverloadedTask(TaskSyncContext taskSyncContext) {
        long numOwnedAndAssignedPartitions = TaskStateUtil.numOwnedAndAssignedPartitions(taskSyncContext);
        long j = TaskStateUtil.totalInProgressPartitions(taskSyncContext);
        if (numOwnedAndAssignedPartitions <= this.connectorConfig.getDesiredPartitionsTasks() || numOwnedAndAssignedPartitions <= 2 * (j / (taskSyncContext.getTaskStates().size() + 1))) {
            return;
        }
        this.errorHandler.accept(new SpannerConnectorException(String.format("Task is overloaded by assignments: %d of total: %d", Long.valueOf(numOwnedAndAssignedPartitions), Long.valueOf(j))));
    }

    private TaskSyncContext performOperation(Operation... operationArr) throws InterruptedException {
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        TaskSyncContext updateAndGet = this.taskSyncContextHolder.updateAndGet(taskSyncContext -> {
            TaskSyncContext taskSyncContext = taskSyncContext;
            for (Operation operation : operationArr) {
                long epochMilli = Instant.now().toEpochMilli();
                taskSyncContext = operation.doOperation(taskSyncContext);
                if (operation.isRequiredPublishSyncEvent()) {
                    LOGGER.debug("Task {} - need to publish sync event for operation {}", this.taskSyncContextHolder.get().getTaskUid(), operation.getClass().getSimpleName());
                    atomicBoolean.set(true);
                }
                LOGGER.debug("Task {} - did operation {} in {} millis", new Object[]{this.taskSyncContextHolder.get().getTaskUid(), operation.getClass().getSimpleName(), Long.valueOf(Instant.now().toEpochMilli() - epochMilli)});
            }
            return taskSyncContext;
        });
        if (atomicBoolean.get()) {
            this.taskSyncPublisher.send(updateAndGet.buildCurrentTaskSyncEvent());
        }
        return updateAndGet;
    }
}
