package io.debezium.connector.base;

import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.util.LoggingContext;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:io/debezium/connector/base/ChangeEventQueueTest.class */
public class ChangeEventQueueTest {
    private static final DataChangeEvent EVENT = getDataChangeEvent();
    private final int noOfWriters;
    private final int noOfReaders;
    private final int noOfEventsPerWriter;
    private final long totalNoOfEvents;
    private final Thread[] writers;
    private final Thread[] readers;
    private final AtomicLong recordsRead = new AtomicLong();

    public ChangeEventQueueTest(int i, int i2, int i3) {
        this.noOfWriters = i;
        this.noOfReaders = i2;
        this.noOfEventsPerWriter = i3;
        this.totalNoOfEvents = i * i3;
        this.writers = new Thread[i];
        this.readers = new Thread[i2];
    }

    @Parameterized.Parameters(name = "{index}: testQueue({0} writers, {1} readers, {2} events)")
    public static Collection<Object[]> data() {
        int[] iArr = {1, 2, 4, 8, 16};
        int[] iArr2 = {1, 2, 4, 8, 16};
        Object[] objArr = new Object[iArr.length * iArr2.length];
        int i = 0;
        for (int i2 : iArr) {
            for (int i3 : iArr2) {
                int i4 = i;
                i++;
                Object[] objArr2 = new Object[3];
                objArr2[0] = Integer.valueOf(i2);
                objArr2[1] = Integer.valueOf(i3);
                objArr2[2] = 1000000;
                objArr[i4] = objArr2;
            }
        }
        return Arrays.asList(objArr);
    }

    @Before
    public void setup() {
        ChangeEventQueue build = new ChangeEventQueue.Builder().maxBatchSize(8192).maxQueueSize(16384).loggingContextSupplier(() -> {
            return LoggingContext.forConnector("a", "b", "c");
        }).pollInterval(Duration.ofMillis(500L)).build();
        for (int i = 0; i < this.noOfWriters; i++) {
            this.writers[i] = getWriter(build, this.noOfEventsPerWriter);
        }
        for (int i2 = 0; i2 < this.noOfReaders; i2++) {
            this.readers[i2] = getReader(build, this.totalNoOfEvents, this.recordsRead);
        }
    }

    @Test
    public void shouldQueueAndPollMessages() throws InterruptedException {
        for (Thread thread : this.writers) {
            thread.start();
        }
        for (Thread thread2 : this.readers) {
            thread2.start();
        }
        long millis = TimeUnit.SECONDS.toMillis(10L);
        for (Thread thread3 : this.writers) {
            thread3.join(millis);
        }
        for (Thread thread4 : this.readers) {
            thread4.join(millis);
        }
        Assert.assertEquals(this.totalNoOfEvents, this.recordsRead.get());
    }

    @After
    public void teardown() {
        for (Thread thread : this.writers) {
            thread.interrupt();
        }
        for (Thread thread2 : this.readers) {
            thread2.interrupt();
        }
    }

    private static Thread getWriter(ChangeEventQueue<DataChangeEvent> changeEventQueue, int i) {
        return new Thread(() -> {
            for (int i2 = 0; i2 < i; i2++) {
                try {
                    changeEventQueue.doEnqueue(EVENT);
                } catch (InterruptedException e) {
                }
            }
        });
    }

    private static Thread getReader(ChangeEventQueue<DataChangeEvent> changeEventQueue, long j, AtomicLong atomicLong) {
        return new Thread(() -> {
            while (atomicLong.get() < j) {
                try {
                    atomicLong.addAndGet(changeEventQueue.poll().size());
                } catch (InterruptedException e) {
                }
            }
        });
    }

    private static DataChangeEvent getDataChangeEvent() {
        Schema build = SchemaBuilder.struct().field("cdc", Schema.STRING_SCHEMA).build();
        return new DataChangeEvent(new SourceRecord(Collections.emptyMap(), Collections.emptyMap(), "dummy", build, new Struct(build).put("cdc", "Change Data Capture Even via Debezium")));
    }
}
