package io.debezium.connector.spanner.task.operation;

import io.debezium.connector.spanner.db.stream.ChangeStream;
import io.debezium.connector.spanner.kafka.internal.model.PartitionState;
import io.debezium.connector.spanner.kafka.internal.model.PartitionStateEnum;
import io.debezium.connector.spanner.kafka.internal.model.TaskState;
import io.debezium.connector.spanner.task.PartitionFactory;
import io.debezium.connector.spanner.task.TaskSyncContext;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/spanner/task/operation/TakePartitionForStreamingOperation.class */
public class TakePartitionForStreamingOperation implements Operation {
    private static final Logger LOGGER = LoggerFactory.getLogger(TakePartitionForStreamingOperation.class);
    private final ChangeStream changeStream;
    private final PartitionFactory partitionFactory;
    private boolean isRequiredPublishSyncEvent = false;

    public TakePartitionForStreamingOperation(ChangeStream changeStream, PartitionFactory partitionFactory) {
        this.changeStream = changeStream;
        this.partitionFactory = partitionFactory;
    }

    private TaskSyncContext takePartitionForStreaming(TaskSyncContext taskSyncContext) {
        TaskState currentTaskState = taskSyncContext.getCurrentTaskState();
        List list = (List) currentTaskState.getPartitions().stream().filter(partitionState -> {
            return partitionState.getState().equals(PartitionStateEnum.READY_FOR_STREAMING);
        }).collect(Collectors.toList());
        if (!list.isEmpty()) {
            LOGGER.debug("Task {}, ready for streaming partitions {}", taskSyncContext.getTaskUid(), list);
        }
        try {
            HashSet hashSet = new HashSet();
            list.forEach(partitionState2 -> {
                LOGGER.info("Task {}, submitting the partition for streaming {}", taskSyncContext.getTaskUid(), partitionState2);
                if (submitPartition(partitionState2, taskSyncContext)) {
                    hashSet.add(partitionState2.getToken());
                } else {
                    LOGGER.error("Task {}, failed to submit partition {} with state {}", new Object[]{taskSyncContext.getTaskUid(), partitionState2, taskSyncContext.getRebalanceState()});
                }
            });
            List<PartitionState> list2 = (List) currentTaskState.getPartitions().stream().map(partitionState3 -> {
                return hashSet.contains(partitionState3.getToken()) ? partitionState3.toBuilder().state(PartitionStateEnum.SCHEDULED).build() : partitionState3;
            }).collect(Collectors.toList());
            this.isRequiredPublishSyncEvent = !hashSet.isEmpty();
            if (this.isRequiredPublishSyncEvent) {
                LOGGER.info("Task scheduled {} partitions, taskUid: {}", hashSet, taskSyncContext.getTaskUid());
            }
            TaskSyncContext build = taskSyncContext.toBuilder().currentTaskState(currentTaskState.toBuilder().partitions(list2).build()).build();
            LOGGER.debug("Task {}, finished trying to take partitions for streaming {}", taskSyncContext.getTaskUid());
            return build;
        } catch (Throwable th) {
            LOGGER.debug("Task {}, finished trying to take partitions for streaming {}", taskSyncContext.getTaskUid());
            throw th;
        }
    }

    private boolean submitPartition(PartitionState partitionState, TaskSyncContext taskSyncContext) {
        return this.changeStream.submitPartition(this.partitionFactory.getPartition(partitionState));
    }

    private TaskSyncContext removeAlreadyStreamingPartitions(TaskSyncContext taskSyncContext) {
        TaskState currentTaskState = taskSyncContext.getCurrentTaskState();
        return taskSyncContext.toBuilder().currentTaskState(currentTaskState.toBuilder().partitions((List) taskSyncContext.getCurrentTaskState().getPartitions().stream().map(partitionState -> {
            if (!partitionState.getState().equals(PartitionStateEnum.READY_FOR_STREAMING) || !isPartitionStreamingAlready(taskSyncContext.getTaskStates().values(), partitionState.getToken(), currentTaskState.getTaskUid())) {
                return partitionState;
            }
            LOGGER.info("Removing streaming partition {} with state {} since partition is already streaming", partitionState.getToken(), partitionState.getState());
            return null;
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        }).collect(Collectors.toList())).build()).build();
    }

    private boolean isPartitionStreamingAlready(Collection<TaskState> collection, String str, String str2) {
        return collection.stream().flatMap(taskState -> {
            return taskState.getPartitions().stream();
        }).filter(partitionState -> {
            return partitionState.getToken().equals(str);
        }).anyMatch(partitionState2 -> {
            return partitionState2.getState().equals(PartitionStateEnum.SCHEDULED) || partitionState2.getState().equals(PartitionStateEnum.RUNNING) || partitionState2.getState().equals(PartitionStateEnum.FINISHED) || partitionState2.getState().equals(PartitionStateEnum.REMOVED);
        });
    }

    private boolean isPartition(Collection<TaskState> collection, String str) {
        return collection.stream().flatMap(taskState -> {
            return taskState.getPartitions().stream();
        }).filter(partitionState -> {
            return partitionState.getToken().equals(str);
        }).anyMatch(partitionState2 -> {
            return partitionState2.getState().equals(PartitionStateEnum.SCHEDULED) || partitionState2.getState().equals(PartitionStateEnum.RUNNING) || partitionState2.getState().equals(PartitionStateEnum.FINISHED) || partitionState2.getState().equals(PartitionStateEnum.REMOVED);
        });
    }

    @Override // io.debezium.connector.spanner.task.operation.Operation
    public boolean isRequiredPublishSyncEvent() {
        return this.isRequiredPublishSyncEvent;
    }

    @Override // io.debezium.connector.spanner.task.operation.Operation
    public TaskSyncContext doOperation(TaskSyncContext taskSyncContext) {
        return takePartitionForStreaming(removeAlreadyStreamingPartitions(taskSyncContext));
    }
}
