package io.debezium.connector.cassandra;

import io.debezium.config.Configuration;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.cassandra.Event;
import io.debezium.connector.cassandra.Record;
import io.debezium.connector.cassandra.spi.ProvidersResolver;
import io.debezium.connector.cassandra.utils.TestUtils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:io/debezium/connector/cassandra/SnapshotProcessorTest.class */
public class SnapshotProcessorTest extends CassandraConnectorTestBase {
    @After
    public void afterTest() {
        if (this.context != null) {
            this.context.cleanUp();
        }
    }

    @Before
    public void beforeTest() {
        this.provider = ProvidersResolver.resolveConnectorContextProvider();
    }

    @Test
    public void testSnapshotTable() throws Throwable {
        this.context = this.provider.provideContext(Configuration.from(TestUtils.generateDefaultConfigMap()));
        SnapshotProcessor snapshotProcessor = (SnapshotProcessor) Mockito.spy(new SnapshotProcessor(this.context, this.context.getClusterName()));
        Mockito.when(Boolean.valueOf(snapshotProcessor.isRunning())).thenReturn(true);
        this.context.getCassandraClient().execute("CREATE TABLE IF NOT EXISTS " + TestUtils.keyspaceTable("cdc_table") + " (a int, b text, PRIMARY KEY(a)) WITH cdc = true;");
        this.context.getCassandraClient().execute("CREATE TABLE IF NOT EXISTS " + TestUtils.keyspaceTable("cdc_table2") + " (a int, b text, PRIMARY KEY(a)) WITH cdc = true;");
        for (int i = 0; i < 5; i++) {
            this.context.getCassandraClient().execute("INSERT INTO " + TestUtils.keyspaceTable("cdc_table") + "(a, b) VALUES (?, ?)", new Object[]{Integer.valueOf(i), String.valueOf(i)});
            this.context.getCassandraClient().execute("INSERT INTO " + TestUtils.keyspaceTable("cdc_table2") + "(a, b) VALUES (?, ?)", new Object[]{Integer.valueOf(i + 10), String.valueOf(i + 10)});
        }
        ChangeEventQueue changeEventQueue = (ChangeEventQueue) this.context.getQueues().get(0);
        Assert.assertEquals(changeEventQueue.totalCapacity(), changeEventQueue.remainingCapacity());
        snapshotProcessor.process();
        Assert.assertEquals(2 * 5, changeEventQueue.totalCapacity() - changeEventQueue.remainingCapacity());
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        for (ChangeRecord changeRecord : changeEventQueue.poll()) {
            Assert.assertEquals(changeRecord.getEventType(), Event.EventType.CHANGE_EVENT);
            Assert.assertEquals(changeRecord.getOp(), Record.Operation.INSERT);
            Assert.assertEquals(changeRecord.getSource().cluster, CassandraConnectorTestBase.CLUSTER_NAME);
            Assert.assertTrue(changeRecord.getSource().snapshot);
            if (changeRecord.getSource().keyspaceTable.name().equals(TestUtils.keyspaceTable("cdc_table"))) {
                arrayList.add(changeRecord);
            } else {
                arrayList2.add(changeRecord);
            }
            Assert.assertEquals(changeRecord.getSource().offsetPosition, OffsetPosition.defaultOffsetPosition());
        }
        Assert.assertEquals(5, arrayList.size());
        Assert.assertEquals(5, arrayList2.size());
        TestUtils.deleteTestKeyspaceTables();
        TestUtils.deleteTestOffsets(this.context);
    }

    @Test
    public void testSnapshotSkipsNonCdcEnabledTable() throws Throwable {
        this.context = this.provider.provideContext(Configuration.from(TestUtils.generateDefaultConfigMap()));
        SnapshotProcessor snapshotProcessor = (SnapshotProcessor) Mockito.spy(new SnapshotProcessor(this.context, this.context.getClusterName()));
        Mockito.when(Boolean.valueOf(snapshotProcessor.isRunning())).thenReturn(true);
        this.context.getCassandraClient().execute("CREATE TABLE IF NOT EXISTS " + TestUtils.keyspaceTable("non_cdc_table") + " (a int, b text, PRIMARY KEY(a)) WITH cdc = false;");
        for (int i = 0; i < 5; i++) {
            this.context.getCassandraClient().execute("INSERT INTO " + TestUtils.keyspaceTable("non_cdc_table") + "(a, b) VALUES (?, ?)", new Object[]{Integer.valueOf(i), String.valueOf(i)});
        }
        ChangeEventQueue changeEventQueue = (ChangeEventQueue) this.context.getQueues().get(0);
        Assert.assertEquals(changeEventQueue.totalCapacity(), changeEventQueue.remainingCapacity());
        snapshotProcessor.process();
        Assert.assertEquals(changeEventQueue.totalCapacity(), changeEventQueue.remainingCapacity());
        TestUtils.deleteTestKeyspaceTables();
        TestUtils.deleteTestOffsets(this.context);
    }

    @Test
    public void testSnapshotEmptyTable() throws Throwable {
        this.context = this.provider.provideContext(Configuration.from(TestUtils.generateDefaultConfigMap()));
        AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        SnapshotProcessor snapshotProcessor = (SnapshotProcessor) Mockito.spy(new SnapshotProcessor(this.context, this.context.getClusterName()));
        Mockito.when(Boolean.valueOf(snapshotProcessor.isRunning())).thenReturn(true);
        this.context.getCassandraClient().execute("CREATE TABLE IF NOT EXISTS " + TestUtils.keyspaceTable("cdc_table") + " (a int, b text, PRIMARY KEY(a)) WITH cdc = true;");
        ChangeEventQueue changeEventQueue = (ChangeEventQueue) this.context.getQueues().get(0);
        Assert.assertEquals(changeEventQueue.totalCapacity(), changeEventQueue.remainingCapacity());
        snapshotProcessor.process();
        Assert.assertEquals(changeEventQueue.totalCapacity(), changeEventQueue.remainingCapacity());
        for (int i = 0; i < 5; i++) {
            this.context.getCassandraClient().execute("INSERT INTO " + TestUtils.keyspaceTable("cdc_table") + "(a, b) VALUES (?, ?)", new Object[]{Integer.valueOf(i), String.valueOf(i)});
        }
        snapshotProcessor.process();
        Assert.assertEquals(changeEventQueue.totalCapacity(), changeEventQueue.remainingCapacity());
        TestUtils.deleteTestKeyspaceTables();
        TestUtils.deleteTestOffsets(this.context);
        atomicBoolean.set(false);
    }

    @Test
    public void testSnapshotModeAlways() throws Throwable {
        HashMap<String, Object> propertiesForContext = TestUtils.propertiesForContext();
        propertiesForContext.put("kafka.producer.bootstrap.servers", TestUtils.TEST_KAFKA_SERVERS);
        propertiesForContext.put(CassandraConnectorConfig.SNAPSHOT_MODE.name(), "always");
        propertiesForContext.put(CassandraConnectorConfig.SNAPSHOT_POLL_INTERVAL_MS.name(), "0");
        this.context = this.provider.provideContext(Configuration.from(propertiesForContext));
        SnapshotProcessor snapshotProcessor = (SnapshotProcessor) Mockito.spy(new SnapshotProcessor(this.context, this.context.getClusterName()));
        ((SnapshotProcessor) Mockito.doNothing().when(snapshotProcessor)).snapshot();
        for (int i = 0; i < 5; i++) {
            snapshotProcessor.process();
        }
        ((SnapshotProcessor) Mockito.verify(snapshotProcessor, Mockito.times(5))).snapshot();
    }

    @Test
    public void testSnapshotModeInitial() throws Throwable {
        HashMap<String, Object> propertiesForContext = TestUtils.propertiesForContext();
        propertiesForContext.put(CassandraConnectorConfig.SNAPSHOT_MODE.name(), "initial");
        propertiesForContext.put(CassandraConnectorConfig.SNAPSHOT_POLL_INTERVAL_MS.name(), "0");
        this.context = this.provider.provideContext(Configuration.from(propertiesForContext));
        SnapshotProcessor snapshotProcessor = (SnapshotProcessor) Mockito.spy(new SnapshotProcessor(this.context, this.context.getClusterName()));
        ((SnapshotProcessor) Mockito.doNothing().when(snapshotProcessor)).snapshot();
        for (int i = 0; i < 5; i++) {
            snapshotProcessor.process();
        }
        ((SnapshotProcessor) Mockito.verify(snapshotProcessor, Mockito.times(1))).snapshot();
    }

    @Test
    public void testSnapshotModeNever() throws Throwable {
        HashMap<String, Object> propertiesForContext = TestUtils.propertiesForContext();
        propertiesForContext.put(CassandraConnectorConfig.SNAPSHOT_MODE.name(), "never");
        propertiesForContext.put(CassandraConnectorConfig.SNAPSHOT_POLL_INTERVAL_MS.name(), "0");
        this.context = this.provider.provideContext(Configuration.from(propertiesForContext));
        SnapshotProcessor snapshotProcessor = (SnapshotProcessor) Mockito.spy(new SnapshotProcessor(this.context, this.context.getClusterName()));
        ((SnapshotProcessor) Mockito.doNothing().when(snapshotProcessor)).snapshot();
        for (int i = 0; i < 5; i++) {
            snapshotProcessor.process();
        }
        ((SnapshotProcessor) Mockito.verify(snapshotProcessor, Mockito.never())).snapshot();
    }
}
