package io.debezium.connector.spanner.task;

import io.debezium.connector.spanner.kafka.internal.model.RebalanceState;
import io.debezium.connector.spanner.kafka.internal.model.TaskState;
import io.debezium.connector.spanner.kafka.internal.model.TaskSyncEvent;
import io.debezium.connector.spanner.task.TaskSyncContext;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/spanner/task/SyncEventMerger.class */
public class SyncEventMerger {
    private static final Logger LOGGER = LoggerFactory.getLogger(SyncEventMerger.class);

    private SyncEventMerger() {
    }

    public static TaskSyncContext mergeIncrementalTaskSyncEvent(TaskSyncContext taskSyncContext, TaskSyncEvent taskSyncEvent) {
        LoggerUtils.debug(LOGGER, "merge: state before {}, \nIncoming states: {}", taskSyncContext, taskSyncEvent.getTaskStates());
        TaskSyncContext.TaskSyncContextBuilder builder = taskSyncContext.toBuilder();
        TaskState taskState = taskSyncEvent.getTaskStates().get(taskSyncEvent.getTaskUid());
        if (taskState == null) {
            LOGGER.warn("The rebalance answer {} did not contain the task's UID: {}", taskSyncEvent, taskSyncEvent.getTaskUid());
            return builder.build();
        }
        if (taskState.getTaskUid().equals(taskSyncContext.getTaskUid())) {
            return builder.build();
        }
        long numPartitions = taskSyncContext.getNumPartitions() + taskSyncContext.getNumSharedPartitions();
        TaskState taskState2 = taskSyncContext.getTaskStates().get(taskSyncEvent.getTaskUid());
        if (taskState2 == null) {
            LOGGER.debug("Task {}, The task's UID: {} not contained in current task states map", taskSyncContext.getTaskUid(), taskSyncEvent.getTaskUid());
            return builder.build();
        }
        if (taskState.getStateTimestamp() <= taskState2.getStateTimestamp()) {
            LOGGER.debug("merge: final state is not changed");
            return builder.build();
        }
        HashMap hashMap = new HashMap(taskSyncContext.getTaskStates());
        hashMap.remove(taskSyncEvent.getTaskUid());
        hashMap.put(taskSyncEvent.getTaskUid(), taskState);
        builder.taskStates(hashMap).createdTimestamp(Long.max(taskSyncContext.getCreatedTimestamp(), taskSyncEvent.getMessageTimestamp()));
        TaskSyncContext build = builder.build();
        if (build.getNumPartitions() + build.getNumSharedPartitions() != numPartitions) {
            LOGGER.debug("Task {}, processed incremental answer {}: {}, task has total partitions {}, num partitions {}, num shared partitions {}, num old partitions {}", new Object[]{taskSyncContext.getTaskUid(), taskSyncEvent, Integer.valueOf(build.getNumPartitions() + build.getNumSharedPartitions()), Integer.valueOf(build.getNumPartitions()), Integer.valueOf(build.getNumSharedPartitions()), Long.valueOf(numPartitions)});
        }
        return build;
    }

    public static TaskSyncContext mergeRebalanceAnswer(TaskSyncContext taskSyncContext, TaskSyncEvent taskSyncEvent) {
        LoggerUtils.debug(LOGGER, "merge: state before {}, \nIncoming states: {}", taskSyncContext, taskSyncEvent.getTaskStates());
        TaskSyncContext.TaskSyncContextBuilder builder = taskSyncContext.toBuilder();
        TaskState taskState = taskSyncEvent.getTaskStates().get(taskSyncEvent.getTaskUid());
        if (taskState == null) {
            LOGGER.warn("Task {}, The rebalance answer {} did not contain the task's UID: {}", new Object[]{taskSyncContext.getTaskUid(), taskSyncEvent, taskSyncEvent.getTaskUid()});
            return builder.build();
        }
        if (taskState.getTaskUid().equals(taskSyncContext.getTaskUid())) {
            return builder.build();
        }
        long numPartitions = taskSyncContext.getNumPartitions() + taskSyncContext.getNumSharedPartitions();
        TaskState taskState2 = taskSyncContext.getTaskStates().get(taskSyncEvent.getTaskUid());
        if (taskState2 != null && taskState.getStateTimestamp() <= taskState2.getStateTimestamp()) {
            LOGGER.info("Task {}, Skipping rebalance answer from task {} for rebalance generation id {}", new Object[]{taskSyncContext.getTaskUid(), taskSyncEvent.getTaskUid(), Long.valueOf(taskSyncEvent.getRebalanceGenerationId())});
            LOGGER.debug("merge: final state is not changed");
            return builder.build();
        }
        HashMap hashMap = new HashMap(taskSyncContext.getTaskStates());
        hashMap.remove(taskSyncEvent.getTaskUid());
        hashMap.put(taskSyncEvent.getTaskUid(), taskState);
        builder.taskStates(hashMap).createdTimestamp(Long.max(taskSyncContext.getCreatedTimestamp(), taskSyncEvent.getMessageTimestamp()));
        TaskSyncContext build = builder.build();
        LOGGER.info("Task {}, Processed rebalance answer from task {} for rebalance generation id {}, task has total partitions {}, num partitions {}, num shared partitions {}, num old partitions {}", new Object[]{taskSyncContext.getTaskUid(), taskSyncEvent.getTaskUid(), Long.valueOf(taskSyncEvent.getRebalanceGenerationId()), Integer.valueOf(build.getNumPartitions() + build.getNumSharedPartitions()), Integer.valueOf(build.getNumPartitions()), Integer.valueOf(build.getNumSharedPartitions()), Long.valueOf(numPartitions)});
        return build;
    }

    public static TaskSyncContext mergeEpochUpdate(TaskSyncContext taskSyncContext, TaskSyncEvent taskSyncEvent) {
        TaskState taskState;
        Map<String, TaskState> taskStates = taskSyncEvent.getTaskStates();
        LoggerUtils.debug(LOGGER, "merge: state before {}, \nIncoming states: {}", taskSyncContext, taskStates);
        TaskSyncContext.TaskSyncContextBuilder builder = taskSyncContext.toBuilder();
        if (((taskSyncContext.getRebalanceState() == RebalanceState.START_INITIAL_SYNC) || taskSyncContext.getRebalanceState().equals(RebalanceState.NEW_EPOCH_STARTED)) && !taskSyncEvent.getTaskUid().equals(taskSyncContext.getTaskUid())) {
            long numPartitions = taskSyncContext.getNumPartitions() + taskSyncContext.getNumSharedPartitions();
            builder.epochOffsetHolder(taskSyncContext.getEpochOffsetHolder().nextOffset(taskSyncEvent.getEpochOffset()));
            TaskState.TaskStateBuilder builder2 = taskSyncContext.getCurrentTaskState().toBuilder();
            if (RebalanceState.START_INITIAL_SYNC.equals(taskSyncContext.getRebalanceState())) {
                LOGGER.info("Task {}, updating the rebalance generation ID from the leader epoch update {}: {}", new Object[]{taskSyncContext.getTaskUid(), taskSyncEvent.getTaskUid(), Long.valueOf(taskSyncEvent.getRebalanceGenerationId())});
                builder.rebalanceGenerationId(taskSyncEvent.getRebalanceGenerationId());
                builder2.rebalanceGenerationId(taskSyncEvent.getRebalanceGenerationId());
            }
            Set set = (Set) taskStates.keySet().stream().collect(Collectors.toSet());
            Map<String, TaskState> taskStates2 = taskSyncContext.getTaskStates();
            HashMap hashMap = new HashMap();
            for (Map.Entry<String, TaskState> entry : taskStates2.entrySet()) {
                if (set.contains(entry.getKey())) {
                    hashMap.put(entry.getKey(), entry.getValue());
                } else {
                    LOGGER.info("Task {}, removing task state {} since it is not included in the UPDATE_EPOCH message {}", new Object[]{taskSyncContext.getTaskUid(), entry.getKey(), set});
                }
            }
            HashSet hashSet = new HashSet();
            for (TaskState taskState2 : taskStates.values()) {
                if (!taskState2.getTaskUid().equals(taskSyncContext.getTaskUid()) && ((taskState = (TaskState) hashMap.get(taskState2.getTaskUid())) == null || taskState2.getStateTimestamp() > taskState.getStateTimestamp())) {
                    hashSet.add(taskState2.getTaskUid());
                }
            }
            TaskSyncContext build = builder.taskStates((Map) Stream.concat(hashMap.entrySet().stream().filter(entry2 -> {
                return !hashSet.contains(entry2.getKey());
            }), taskStates.entrySet().stream().filter(entry3 -> {
                return hashSet.contains(entry3.getKey());
            })).collect(Collectors.toUnmodifiableMap((v0) -> {
                return v0.getKey();
            }, (v0) -> {
                return v0.getValue();
            }))).createdTimestamp(Long.max(taskSyncContext.getCreatedTimestamp(), taskSyncEvent.getMessageTimestamp())).currentTaskState(builder2.build()).epochOffsetHolder(taskSyncContext.getEpochOffsetHolder().nextOffset(taskSyncEvent.getEpochOffset())).build();
            int numPartitions2 = build.getNumPartitions();
            int numSharedPartitions = build.getNumSharedPartitions();
            long j = numPartitions2 + numSharedPartitions;
            LOGGER.debug("Task {}, updating the epoch offset from the leader's UPDATE_EPOCH message {}: {}, task has total partitions {}, num partitions {}, num shared partitions {}, num old partitions {}", new Object[]{taskSyncContext.getTaskUid(), taskSyncEvent.getTaskUid(), Long.valueOf(taskSyncEvent.getEpochOffset()), Integer.valueOf(numPartitions2 + numSharedPartitions), Integer.valueOf(numPartitions2), Integer.valueOf(numSharedPartitions), Long.valueOf(numPartitions)});
            return build;
        }
        return builder.build();
    }

    public static TaskSyncContext mergeNewEpoch(TaskSyncContext taskSyncContext, TaskSyncEvent taskSyncEvent) {
        TaskSyncContext.TaskSyncContextBuilder builder = taskSyncContext.toBuilder();
        if (taskSyncEvent.getTaskUid().equals(taskSyncContext.getTaskUid())) {
            return builder.build();
        }
        boolean z = false;
        if (taskSyncContext.checkDuplication(false, "NEW_EPOCH")) {
            z = true;
        }
        long numPartitions = taskSyncContext.getNumPartitions() + taskSyncContext.getNumSharedPartitions();
        HashMap hashMap = new HashMap(taskSyncEvent.getTaskStates());
        hashMap.remove(taskSyncContext.getTaskUid());
        if (!(taskSyncContext.getRebalanceState() == RebalanceState.START_INITIAL_SYNC)) {
            if (!((Set) taskSyncEvent.getTaskStates().values().stream().map(taskState -> {
                return taskState.getTaskUid();
            }).collect(Collectors.toSet())).contains(taskSyncContext.getTaskUid())) {
                LOGGER.warn("Task {} - Received new epoch message , but leader {} did not include the task in the new epoch message with rebalance ID {} with tasks {}, probably just initialized, throw exception", new Object[]{taskSyncContext.getTaskUid(), taskSyncEvent.getTaskUid(), Long.valueOf(taskSyncEvent.getRebalanceGenerationId()), taskSyncEvent.getTaskStates().keySet().stream().collect(Collectors.toList())});
            } else if (taskSyncEvent.getRebalanceGenerationId() < taskSyncContext.getReceivedRebalanceGenerationId()) {
                LOGGER.warn("Task {} - Received new epoch message from {} , but the new epoch message had rebalance generation ID {} while the latest rebalance generation ID is {}", new Object[]{taskSyncContext.getTaskUid(), taskSyncEvent.getTaskUid(), Long.valueOf(taskSyncEvent.getRebalanceGenerationId()), Long.valueOf(taskSyncContext.getReceivedRebalanceGenerationId())});
            } else {
                LOGGER.info("Task {}, updating the rebalance state to NEW_EPOCH_STARTED {}: {}", new Object[]{taskSyncContext.getTaskUid(), taskSyncEvent.getTaskUid(), Long.valueOf(taskSyncEvent.getRebalanceGenerationId())});
                builder.rebalanceState(RebalanceState.NEW_EPOCH_STARTED);
            }
        }
        TaskState.TaskStateBuilder builder2 = taskSyncContext.getCurrentTaskState().toBuilder();
        LOGGER.info("Task {}, updating the rebalance generation ID and epoch offset from the leader new epoch {}: {}, {}", new Object[]{taskSyncContext.getTaskUid(), taskSyncEvent.getTaskUid(), Long.valueOf(taskSyncEvent.getRebalanceGenerationId()), Long.valueOf(taskSyncEvent.getEpochOffset())});
        builder2.rebalanceGenerationId(taskSyncEvent.getRebalanceGenerationId());
        builder.createdTimestamp(taskSyncEvent.getMessageTimestamp()).rebalanceGenerationId(taskSyncEvent.getRebalanceGenerationId()).epochOffsetHolder(taskSyncContext.getEpochOffsetHolder().nextOffset(taskSyncEvent.getEpochOffset())).taskStates(hashMap).currentTaskState(builder2.build());
        TaskSyncContext build = builder.build();
        long numPartitions2 = build.getNumPartitions() + build.getNumSharedPartitions();
        LOGGER.info("Task {}, processed new epoch message {}: {}, task has total partitions {}, num partitions {}, num shared partitions {}, num old partitions {}", new Object[]{taskSyncContext.getTaskUid(), taskSyncEvent.getTaskUid(), Long.valueOf(taskSyncEvent.getEpochOffset()), Integer.valueOf(build.getNumPartitions() + build.getNumSharedPartitions()), Integer.valueOf(build.getNumPartitions()), Integer.valueOf(build.getNumSharedPartitions()), Long.valueOf(numPartitions)});
        if (!z && build.checkDuplication(true, "NEW_EPOCH")) {
            LOGGER.debug("Task {}, duplication exists after processing new epoch, old context {}", build.getTaskUid(), taskSyncContext);
            LOGGER.debug("Task {}, duplication exists after processing new epoch, new message {}", build.getTaskUid(), taskSyncEvent);
            LOGGER.debug("Task {}, duplication exists after processing new epoch, resulting context {}", build.getTaskUid(), build);
        }
        return build;
    }
}
