package io.debezium.connector.spanner.kafka.internal;

import com.google.common.collect.Ordering;
import io.debezium.connector.spanner.kafka.internal.model.RebalanceState;
import io.debezium.connector.spanner.metrics.MetricsEventPublisher;
import io.debezium.connector.spanner.task.TaskSyncContext;
import io.debezium.connector.spanner.task.TaskSyncContextHolder;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Objects;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Consumer;
import java.util.function.Predicate;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

/* loaded from: input_file:io/debezium/connector/spanner/kafka/internal/BufferedPublisherTest.class */
class BufferedPublisherTest {
    BufferedPublisherTest() {
    }

    @Test
    void testBufferedPublisher_1() throws InterruptedException {
        runAndCheck(num -> {
            return num.intValue() % 10 == 0;
        }, num2 -> {
        });
    }

    @Test
    void testBufferedPublisher_2() throws InterruptedException {
        runAndCheck(num -> {
            return num.intValue() % 10 == 0;
        }, num2 -> {
        });
    }

    @Test
    void testBufferedPublisher_3() throws InterruptedException {
        runAndCheck(num -> {
            return num.intValue() % 100 == 0;
        }, num2 -> {
            try {
                Thread.sleep(20L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }

    private void runAndCheck(Predicate<Integer> predicate, Consumer<Integer> consumer) throws InterruptedException {
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        TaskSyncContextHolder taskSyncContextHolder = new TaskSyncContextHolder((MetricsEventPublisher) Mockito.mock(MetricsEventPublisher.class));
        taskSyncContextHolder.init(TaskSyncContext.builder().taskUid("test-task-1").rebalanceState(RebalanceState.NEW_EPOCH_STARTED).build());
        Objects.requireNonNull(copyOnWriteArrayList);
        BufferedPublisher bufferedPublisher = new BufferedPublisher("test-task-1", "pub-1", taskSyncContextHolder, 5L, predicate, consumer.andThen((v1) -> {
            r8.add(v1);
        }));
        bufferedPublisher.start();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 1001; i++) {
            bufferedPublisher.buffer(Integer.valueOf(i));
            Thread.sleep(1L);
            if (predicate.test(Integer.valueOf(i))) {
                arrayList.add(Integer.valueOf(i));
            }
        }
        bufferedPublisher.close();
        Assertions.assertThat(copyOnWriteArrayList).containsAll(arrayList).hasSizeGreaterThan(arrayList.size()).hasSizeLessThan(1001).hasSameSizeAs(new HashSet(copyOnWriteArrayList));
        Assertions.assertThat(Ordering.natural().isOrdered(copyOnWriteArrayList)).isTrue();
    }
}
