package io.debezium.connector.spanner.task;

import io.debezium.connector.spanner.kafka.internal.TaskSyncPublisher;
import io.debezium.connector.spanner.kafka.internal.model.MessageTypeEnum;
import io.debezium.connector.spanner.kafka.internal.model.RebalanceState;
import io.debezium.connector.spanner.kafka.internal.model.SyncEventMetadata;
import io.debezium.connector.spanner.kafka.internal.model.TaskSyncEvent;
import io.debezium.connector.spanner.task.state.SyncEvent;
import io.debezium.connector.spanner.task.state.TaskStateChangeEvent;
import io.debezium.function.BlockingConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/spanner/task/SyncEventHandler.class */
public class SyncEventHandler {
    private static final Logger LOGGER = LoggerFactory.getLogger(SyncEventHandler.class);
    private final TaskSyncContextHolder taskSyncContextHolder;
    private final TaskSyncPublisher taskSyncPublisher;
    private final BlockingConsumer<TaskStateChangeEvent> eventConsumer;

    public SyncEventHandler(TaskSyncContextHolder taskSyncContextHolder, TaskSyncPublisher taskSyncPublisher, BlockingConsumer<TaskStateChangeEvent> blockingConsumer) {
        this.taskSyncContextHolder = taskSyncContextHolder;
        this.taskSyncPublisher = taskSyncPublisher;
        this.eventConsumer = blockingConsumer;
    }

    public void updateCurrentOffset(TaskSyncEvent taskSyncEvent, SyncEventMetadata syncEventMetadata) {
        if (taskSyncEvent == null || skipFromMismatchingGeneration(taskSyncEvent)) {
            return;
        }
        LOGGER.debug("Task {} - before update task sync topic offset with {}", this.taskSyncContextHolder.get().getTaskUid(), Long.valueOf(syncEventMetadata.getOffset()));
        this.taskSyncContextHolder.update(taskSyncContext -> {
            return taskSyncContext.toBuilder().currentKafkaRecordOffset(syncEventMetadata.getOffset()).build();
        });
        LOGGER.debug("Task {} - update task sync topic offset with {}", this.taskSyncContextHolder.get().getTaskUid(), Long.valueOf(syncEventMetadata.getOffset()));
    }

    public void processPreviousStates(TaskSyncEvent taskSyncEvent, SyncEventMetadata syncEventMetadata) {
        if (RebalanceState.START_INITIAL_SYNC.equals(this.taskSyncContextHolder.get().getRebalanceState())) {
            if (skipFromMismatchingGeneration(taskSyncEvent)) {
                if (taskSyncEvent != null) {
                    LOGGER.info("Task {}, skipFromMismatchingGeneration: currentGen: {}, inGen: {}, inTaskUid: {}, message type {}, received rebalance generation ID {}", new Object[]{this.taskSyncContextHolder.get().getTaskUid(), Long.valueOf(this.taskSyncContextHolder.get().getRebalanceGenerationId()), Long.valueOf(taskSyncEvent.getRebalanceGenerationId()), taskSyncEvent.getTaskUid(), taskSyncEvent.getMessageType(), Long.valueOf(this.taskSyncContextHolder.get().getRebalanceGenerationId())});
                }
                if (syncEventMetadata.isCanInitiateRebalancing()) {
                    LOGGER.info("task {}, finished processing all previous sync event messages with end offset {}, can initiate rebalancing with rebalance generation ID {}", new Object[]{this.taskSyncContextHolder.get().getTaskUid(), Long.valueOf(syncEventMetadata.getOffset()), Long.valueOf(this.taskSyncContextHolder.get().getRebalanceGenerationId())});
                    this.taskSyncContextHolder.update(taskSyncContext -> {
                        return taskSyncContext.toBuilder().rebalanceState(RebalanceState.INITIAL_INCREMENTED_STATE_COMPLETED).build();
                    });
                    return;
                }
                return;
            }
            if (taskSyncEvent != null) {
                if (taskSyncEvent.getMessageType() == MessageTypeEnum.NEW_EPOCH) {
                    this.taskSyncContextHolder.update(taskSyncContext2 -> {
                        return SyncEventMerger.mergeNewEpoch(taskSyncContext2, taskSyncEvent);
                    });
                } else if (taskSyncEvent.getMessageType() == MessageTypeEnum.REBALANCE_ANSWER) {
                    this.taskSyncContextHolder.update(taskSyncContext3 -> {
                        return SyncEventMerger.mergeRebalanceAnswer(taskSyncContext3, taskSyncEvent);
                    });
                } else if (taskSyncEvent.getMessageType() == MessageTypeEnum.UPDATE_EPOCH) {
                    this.taskSyncContextHolder.update(taskSyncContext4 -> {
                        return SyncEventMerger.mergeEpochUpdate(taskSyncContext4, taskSyncEvent);
                    });
                } else {
                    this.taskSyncContextHolder.update(taskSyncContext5 -> {
                        return SyncEventMerger.mergeIncrementalTaskSyncEvent(taskSyncContext5, taskSyncEvent);
                    });
                }
            }
            if (syncEventMetadata.isCanInitiateRebalancing()) {
                LOGGER.info("Task {} - processPreviousStates - switch state to INITIAL_INCREMENTED_STATE_COMPLETED", this.taskSyncContextHolder.get().getTaskUid());
                LOGGER.info("task {}, finished processing all previous sync event messages with end offset {}, can initiate rebalancing with rebalance generation ID {}, task has total partitions {}", new Object[]{this.taskSyncContextHolder.get().getTaskUid(), Long.valueOf(syncEventMetadata.getOffset()), Long.valueOf(this.taskSyncContextHolder.get().getRebalanceGenerationId()), Long.valueOf(this.taskSyncContextHolder.get().getNumPartitions() + this.taskSyncContextHolder.get().getNumSharedPartitions())});
                this.taskSyncContextHolder.update(taskSyncContext6 -> {
                    return taskSyncContext6.toBuilder().rebalanceState(RebalanceState.INITIAL_INCREMENTED_STATE_COMPLETED).build();
                });
                LOGGER.info("Task {} - now initialized with epoch offset {}", this.taskSyncContextHolder.get().getTaskUid(), Long.valueOf(this.taskSyncContextHolder.get().getEpochOffsetHolder().getEpochOffset()));
                this.taskSyncContextHolder.get().checkDuplication(true, "Finished Initializing Task State");
            }
        }
    }

    public void processNewEpoch(TaskSyncEvent taskSyncEvent, SyncEventMetadata syncEventMetadata) throws InterruptedException, IllegalStateException {
        LOGGER.info("Task {} - processNewEpoch: metadata {}, rebalance State {}, rebalanceId: {}, current task {}", new Object[]{this.taskSyncContextHolder.get().getTaskUid(), syncEventMetadata, this.taskSyncContextHolder.get().getRebalanceState(), Long.valueOf(taskSyncEvent.getRebalanceGenerationId())});
        TaskSyncContext updateAndGet = this.taskSyncContextHolder.updateAndGet(taskSyncContext -> {
            return SyncEventMerger.mergeNewEpoch(taskSyncContext, taskSyncEvent);
        });
        LOGGER.info("Task {} - SyncEventHandler sending response for new epoch", this.taskSyncContextHolder.get().getTaskUid());
        this.taskSyncPublisher.send(updateAndGet.buildCurrentTaskSyncEvent());
        LOGGER.info("Task {} - SyncEventHandler sent response for new epoch", this.taskSyncContextHolder.get().getTaskUid());
        this.eventConsumer.accept(new SyncEvent());
    }

    public void processUpdateEpoch(TaskSyncEvent taskSyncEvent, SyncEventMetadata syncEventMetadata) throws InterruptedException {
        LOGGER.debug("Task {} - SyncEventHandler updating from update epoch", this.taskSyncContextHolder.get().getTaskUid());
        this.taskSyncContextHolder.update(taskSyncContext -> {
            return SyncEventMerger.mergeEpochUpdate(taskSyncContext, taskSyncEvent);
        });
        LOGGER.debug("Task {} - SyncEventHandler updated from update epoch", this.taskSyncContextHolder.get().getTaskUid());
        this.eventConsumer.accept(new SyncEvent());
    }

    public void processRegularMessage(TaskSyncEvent taskSyncEvent, SyncEventMetadata syncEventMetadata) throws InterruptedException {
        LOGGER.debug("Task {} - process regular message event", this.taskSyncContextHolder.get().getTaskUid());
        this.taskSyncContextHolder.update(taskSyncContext -> {
            return SyncEventMerger.mergeIncrementalTaskSyncEvent(taskSyncContext, taskSyncEvent);
        });
        LOGGER.debug("Task {} - Finished processing regular message event", this.taskSyncContextHolder.get().getTaskUid());
    }

    public void processRebalanceAnswer(TaskSyncEvent taskSyncEvent, SyncEventMetadata syncEventMetadata) {
        LOGGER.info("Task {} - process sync event - rebalance answer from task {} with generation {}", new Object[]{this.taskSyncContextHolder.get().getTaskUid(), taskSyncEvent.getTaskUid(), Long.valueOf(taskSyncEvent.getRebalanceGenerationId())});
        this.taskSyncContextHolder.update(taskSyncContext -> {
            return SyncEventMerger.mergeRebalanceAnswer(taskSyncContext, taskSyncEvent);
        });
        LOGGER.debug("Task {} - process sync event - updated from rebalance answer", this.taskSyncContextHolder.get().getTaskUid());
    }

    private boolean skipFromMismatchingGeneration(TaskSyncEvent taskSyncEvent) {
        if (taskSyncEvent == null) {
            return false;
        }
        long rebalanceGenerationId = taskSyncEvent.getRebalanceGenerationId();
        long rebalanceGenerationId2 = this.taskSyncContextHolder.get().getRebalanceGenerationId();
        long receivedRebalanceGenerationId = this.taskSyncContextHolder.get().getReceivedRebalanceGenerationId();
        if (taskSyncEvent.getMessageType() == MessageTypeEnum.NEW_EPOCH || taskSyncEvent.getMessageType() == MessageTypeEnum.UPDATE_EPOCH) {
            return rebalanceGenerationId < rebalanceGenerationId2;
        }
        if (taskSyncEvent.getMessageType() == MessageTypeEnum.REBALANCE_ANSWER) {
            return rebalanceGenerationId < rebalanceGenerationId2 || rebalanceGenerationId < receivedRebalanceGenerationId;
        }
        return false;
    }

    public void process(TaskSyncEvent taskSyncEvent, SyncEventMetadata syncEventMetadata) throws InterruptedException, IllegalStateException {
        if (taskSyncEvent == null) {
            return;
        }
        if (skipFromMismatchingGeneration(taskSyncEvent)) {
            LOGGER.info("Task {}, skipping message from task {}, from prior generation {} and message type {} with current generation {} and received rebalance generation {}", new Object[]{this.taskSyncContextHolder.get().getTaskUid(), taskSyncEvent.getTaskUid(), Long.valueOf(taskSyncEvent.getRebalanceGenerationId()), taskSyncEvent.getMessageType(), Long.valueOf(this.taskSyncContextHolder.get().getRebalanceGenerationId()), Long.valueOf(this.taskSyncContextHolder.get().getReceivedRebalanceGenerationId())});
            return;
        }
        try {
            if (taskSyncEvent.getMessageType() == MessageTypeEnum.REGULAR) {
                processRegularMessage(taskSyncEvent, syncEventMetadata);
            } else if (taskSyncEvent.getMessageType() == MessageTypeEnum.REBALANCE_ANSWER) {
                processRebalanceAnswer(taskSyncEvent, syncEventMetadata);
            } else if (taskSyncEvent.getMessageType() == MessageTypeEnum.UPDATE_EPOCH) {
                processUpdateEpoch(taskSyncEvent, syncEventMetadata);
            } else if (taskSyncEvent.getMessageType() == MessageTypeEnum.NEW_EPOCH) {
                processNewEpoch(taskSyncEvent, syncEventMetadata);
            }
        } catch (Exception e) {
            LOGGER.error("Exception during processing task message task Uid {}, message type {}, exception", new Object[]{taskSyncEvent.getTaskUid(), taskSyncEvent.getMessageType(), e});
            throw e;
        }
    }
}
