package io.debezium.connector.spanner.task.leader;

import com.google.cloud.Timestamp;
import io.debezium.connector.spanner.SpannerConnectorConfig;
import io.debezium.connector.spanner.kafka.internal.model.PartitionState;
import io.debezium.connector.spanner.kafka.internal.model.PartitionStateEnum;
import io.debezium.connector.spanner.kafka.internal.model.RebalanceState;
import io.debezium.connector.spanner.kafka.internal.model.TaskState;
import io.debezium.connector.spanner.metrics.MetricsEventPublisher;
import io.debezium.connector.spanner.task.PartitionFactory;
import io.debezium.connector.spanner.task.TaskSyncContext;
import io.debezium.connector.spanner.task.TaskSyncContextHolder;
import io.debezium.connector.spanner.task.TaskTestHelper;
import io.debezium.pipeline.ErrorHandler;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;

@ExtendWith({MockitoExtension.class})
/* loaded from: input_file:io/debezium/connector/spanner/task/leader/LeaderActionServiceTest.class */
class LeaderActionServiceTest {
    private static final String TASK_UID = "leader-007";

    @Mock
    private MetricsEventPublisher metricsEventPublisher;
    private TaskSyncContextHolder taskSyncContextHolder;
    private LeaderService leaderService;

    LeaderActionServiceTest() {
    }

    @BeforeEach
    void init() {
        this.taskSyncContextHolder = new TaskSyncContextHolder(this.metricsEventPublisher);
        this.taskSyncContextHolder.init(TaskSyncContext.builder().taskUid(TASK_UID).rebalanceState(RebalanceState.START_INITIAL_SYNC).build());
        SpannerConnectorConfig spannerConnectorConfig = (SpannerConnectorConfig) Mockito.mock(SpannerConnectorConfig.class);
        Mockito.when(spannerConnectorConfig.startTime()).thenReturn(Timestamp.now());
        Mockito.when(spannerConnectorConfig.getAwaitTaskAnswerTimeout()).thenReturn(Duration.of(120000L, ChronoUnit.MILLIS));
        this.leaderService = new LeaderService(this.taskSyncContextHolder, spannerConnectorConfig, taskStateChangeEvent -> {
        }, (ErrorHandler) Mockito.mock(ErrorHandler.class), (PartitionFactory) Mockito.mock(PartitionFactory.class), (MetricsEventPublisher) Mockito.mock(MetricsEventPublisher.class));
    }

    @Test
    void isStartFromScratch_false_leaderInProgress() {
        TaskState generateTaskState = generateTaskState("t1", 2, 1, PartitionStateEnum.FINISHED, PartitionStateEnum.REMOVED);
        TaskState generateTaskState2 = generateTaskState("t2", 2, 1, PartitionStateEnum.REMOVED, PartitionStateEnum.FINISHED);
        TaskState generateTaskState3 = generateTaskState(TASK_UID, 1, 4, PartitionStateEnum.CREATED, PartitionStateEnum.CREATED);
        this.taskSyncContextHolder.updateAndGet(taskSyncContext -> {
            return taskSyncContext.toBuilder().currentTaskState(generateTaskState3).taskStates(TaskTestHelper.createTaskStateMap(generateTaskState, generateTaskState2)).build();
        });
        Assertions.assertThat(this.leaderService.isStartFromScratch()).isFalse();
    }

    @Test
    void isStartFromScratch_false_someTaskInProgress() {
        TaskState generateTaskState = generateTaskState("t1", 2, 1, PartitionStateEnum.FINISHED, PartitionStateEnum.REMOVED);
        TaskState generateTaskState2 = generateTaskState("t2", 2, 1, PartitionStateEnum.REMOVED, PartitionStateEnum.READY_FOR_STREAMING);
        TaskState generateTaskState3 = generateTaskState(TASK_UID, 1, 4, PartitionStateEnum.FINISHED, PartitionStateEnum.FINISHED);
        this.taskSyncContextHolder.updateAndGet(taskSyncContext -> {
            return taskSyncContext.toBuilder().currentTaskState(generateTaskState3).taskStates(TaskTestHelper.createTaskStateMap(generateTaskState, generateTaskState2)).build();
        });
        Assertions.assertThat(this.leaderService.isStartFromScratch()).isFalse();
    }

    @Test
    void isStartFromScratch_true_someTaskInProgress() {
        TaskState generateTaskState = generateTaskState("t1", 2, 1, PartitionStateEnum.FINISHED, PartitionStateEnum.REMOVED);
        TaskState generateTaskState2 = generateTaskState("t2", 2, 1, PartitionStateEnum.REMOVED, PartitionStateEnum.REMOVED);
        TaskState generateTaskState3 = generateTaskState(TASK_UID, 1, 4, PartitionStateEnum.FINISHED, PartitionStateEnum.FINISHED);
        this.taskSyncContextHolder.updateAndGet(taskSyncContext -> {
            return taskSyncContext.toBuilder().currentTaskState(generateTaskState3).taskStates(TaskTestHelper.createTaskStateMap(generateTaskState, generateTaskState2)).build();
        });
        Assertions.assertThat(this.leaderService.isStartFromScratch()).isTrue();
    }

    @Timeout(10)
    @Test
    void awaitAllNewTaskStateUpdatesWorks() throws InterruptedException {
        String str = "consumer1";
        TaskState build = TaskState.builder().taskUid("consumer1".toUpperCase()).consumerId("consumer1").build();
        this.taskSyncContextHolder.updateAndGet(taskSyncContext -> {
            return taskSyncContext.toBuilder().consumerId(str).build();
        });
        this.taskSyncContextHolder.updateAndGet(taskSyncContext2 -> {
            return taskSyncContext2.toBuilder().currentTaskState(build).taskStates(Map.of()).build();
        });
        Set of = Set.of("consumer1", "consumer2", "consumer3");
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(of.size(), false, Set.of("consumer2", "consumer3"));
        new Thread(() -> {
            while (!arrayBlockingQueue.isEmpty()) {
                try {
                    Thread.sleep(40L);
                    String str2 = (String) arrayBlockingQueue.take();
                    TaskState build2 = TaskState.builder().taskUid(str2.toUpperCase()).consumerId(str2).build();
                    HashMap hashMap = new HashMap(this.taskSyncContextHolder.get().getTaskStates());
                    hashMap.put(build2.getTaskUid(), build2);
                    this.taskSyncContextHolder.updateAndGet(taskSyncContext3 -> {
                        return taskSyncContext3.toBuilder().taskStates(hashMap).build();
                    });
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }).start();
        Map awaitAllNewTaskStateUpdates = this.leaderService.awaitAllNewTaskStateUpdates(of, this.taskSyncContextHolder.get().getRebalanceGenerationId());
        Assertions.assertThat(awaitAllNewTaskStateUpdates).hasSameSizeAs(of);
        Assertions.assertThat(awaitAllNewTaskStateUpdates).doesNotContainKey("consumer0");
        Assertions.assertThat((String) awaitAllNewTaskStateUpdates.get("consumer1")).isEqualTo("consumer1".toUpperCase());
        Assertions.assertThat((String) awaitAllNewTaskStateUpdates.get("consumer2")).isEqualTo("consumer2".toUpperCase());
        Assertions.assertThat((String) awaitAllNewTaskStateUpdates.get("consumer3")).isEqualTo("consumer3".toUpperCase());
    }

    private TaskState generateTaskState(String str, int i, int i2, PartitionStateEnum partitionStateEnum, PartitionStateEnum partitionStateEnum2) {
        List<PartitionState> generatePartitions = TaskTestHelper.generatePartitions(i, () -> {
            return PartitionState.builder().token(UUID.randomUUID().toString()).state(partitionStateEnum).build();
        });
        return TaskState.builder().taskUid(str).partitions(generatePartitions).sharedPartitions(TaskTestHelper.generatePartitions(i2, () -> {
            return PartitionState.builder().token(UUID.randomUUID().toString()).state(partitionStateEnum2).build();
        })).build();
    }
}
