package io.debezium.connector.cassandra;

import com.datastax.oss.driver.api.core.type.DataTypes;
import io.debezium.config.Configuration;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.cassandra.CassandraSchemaFactory;
import io.debezium.connector.cassandra.KeyValueSchema;
import io.debezium.connector.cassandra.Record;
import io.debezium.connector.cassandra.spi.ProvidersResolver;
import io.debezium.connector.cassandra.utils.TestUtils;
import io.debezium.time.Conversions;
import java.io.File;
import java.util.Arrays;
import java.util.HashMap;
import org.apache.commons.lang3.tuple.Pair;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:io/debezium/connector/cassandra/QueueProcessorTest.class */
public class QueueProcessorTest {
    private CassandraConnectorContext context;
    private QueueProcessor queueProcessor;
    private TestingKafkaRecordEmitter emitter;
    private KeyValueSchema keyValueSchema;
    private CassandraSchemaFactory.RowData rowData;
    private SourceInfo sourceInfo;
    private CassandraSchemaFactory schemaFactory;

    public CassandraConnectorContext generateTaskContext(Configuration configuration) {
        return ProvidersResolver.resolveConnectorContextProvider().provideContextWithoutSchemaManagement(configuration);
    }

    @Before
    public void setUp() throws Exception {
        this.context = generateTaskContext(Configuration.from(TestUtils.generateDefaultConfigMap()));
        this.emitter = new TestingKafkaRecordEmitter(this.context.getCassandraConnectorConfig(), null, this.context.getOffsetWriter(), this.context.getCassandraConnectorConfig().getKeyConverter(), this.context.getCassandraConnectorConfig().getValueConverter(), this.context.getErroneousCommitLogs(), this.context.getCassandraConnectorConfig().getCommitLogTransfer());
        this.queueProcessor = new QueueProcessor(this.context, 0, this.emitter);
        this.keyValueSchema = new KeyValueSchema.KeyValueSchemaBuilder().withKeyspace(TestUtils.TEST_KEYSPACE_NAME).withTable("cdc_table").withKafkaTopicPrefix(this.context.getCassandraConnectorConfig().getLogicalName()).withSourceInfoStructMarker(this.context.getCassandraConnectorConfig().getSourceInfoStructMaker()).withRowSchema(CassandraSchemaFactory.RowData.rowSchema(Arrays.asList("col1", "col2"), Arrays.asList(DataTypes.TEXT, DataTypes.INT))).withPrimaryKeyNames(Arrays.asList("p1", "c1")).withPrimaryKeySchemas(KeyValueSchema.getPrimaryKeySchemas(Arrays.asList(DataTypes.INT, DataTypes.INT))).build();
        this.schemaFactory = CassandraSchemaFactory.get();
        this.rowData = this.schemaFactory.rowData();
        this.rowData.addCell(this.schemaFactory.cellData("p1", 1, (Object) null, CassandraSchemaFactory.CellData.ColumnType.PARTITION));
        this.rowData.addCell(this.schemaFactory.cellData("c1", 2, (Object) null, CassandraSchemaFactory.CellData.ColumnType.CLUSTERING));
        this.rowData.addCell(this.schemaFactory.cellData("col1", "col1value", (Object) null, CassandraSchemaFactory.CellData.ColumnType.REGULAR));
        this.rowData.addCell(this.schemaFactory.cellData("col2", 3, (Object) null, CassandraSchemaFactory.CellData.ColumnType.REGULAR));
        this.sourceInfo = new SourceInfo(this.context.getCassandraConnectorConfig(), "cluster1", new OffsetPosition("CommitLog-6-123.log", 0), new KeyspaceTable(TestUtils.TEST_KEYSPACE_NAME, "cdc_table"), false, Conversions.toInstantFromMicros(System.currentTimeMillis() * 1000));
    }

    @After
    public void tearDown() {
        this.context.cleanUp();
    }

    @Test
    public void testInsertChangeRecordProcessing() throws Exception {
        ((ChangeEventQueue) this.context.getQueues().get(0)).enqueue(new ChangeRecord(this.sourceInfo, this.rowData, this.keyValueSchema.keySchema(), this.keyValueSchema.valueSchema(), Record.Operation.INSERT, false));
        Assert.assertEquals(1L, r0.totalCapacity() - r0.remainingCapacity());
        this.queueProcessor.process();
        Assert.assertEquals(1L, this.emitter.records.size());
        Assert.assertEquals(r0.totalCapacity(), r0.remainingCapacity());
    }

    @Test
    public void testRangeTombstoneChangeRecordProcessing() throws Exception {
        ChangeEventQueue changeEventQueue = (ChangeEventQueue) this.context.getQueues().get(0);
        HashMap hashMap = new HashMap();
        hashMap.put("cl1", Pair.of("val1", "string"));
        hashMap.put("cl2", Pair.of("val2", "string"));
        hashMap.put("cl3", Pair.of("val3", "string"));
        HashMap hashMap2 = new HashMap();
        hashMap2.put("cl1", Pair.of("val1", "string"));
        hashMap2.put("cl2", Pair.of("val2", "string"));
        this.rowData.addStartRange(CassandraSchemaFactory.RangeData.start("EXCL_START_BOUND", hashMap));
        this.rowData.addEndRange(CassandraSchemaFactory.RangeData.end("INCL_END_BOUND", hashMap2));
        changeEventQueue.enqueue(new ChangeRecord(this.sourceInfo, this.rowData, this.keyValueSchema.keySchema(), this.keyValueSchema.valueSchema(), Record.Operation.RANGE_TOMBSTONE, false));
        Assert.assertEquals(1L, changeEventQueue.totalCapacity() - changeEventQueue.remainingCapacity());
        this.queueProcessor.process();
        Assert.assertEquals(1L, this.emitter.records.size());
        Assert.assertEquals(changeEventQueue.totalCapacity(), changeEventQueue.remainingCapacity());
    }

    @Test
    public void testProcessTombstoneRecords() throws Exception {
        ((ChangeEventQueue) this.context.getQueues().get(0)).enqueue(new TombstoneRecord(this.sourceInfo, this.rowData, this.keyValueSchema.keySchema()));
        Assert.assertEquals(1L, r0.totalCapacity() - r0.remainingCapacity());
        this.queueProcessor.process();
        Assert.assertEquals(1L, this.emitter.records.size());
        Assert.assertEquals(r0.totalCapacity(), r0.remainingCapacity());
    }

    @Test
    public void testProcessEofEvent() throws Exception {
        ((ChangeEventQueue) this.context.getQueues().get(0)).enqueue(new EOFEvent(new File("non-existing-log-file-path")));
        Assert.assertEquals(1L, r0.totalCapacity() - r0.remainingCapacity());
        this.queueProcessor.process();
        Assert.assertEquals(0L, this.emitter.records.size());
        Assert.assertEquals(r0.totalCapacity(), r0.remainingCapacity());
    }
}
