package io.debezium.connector.cassandra;

import io.debezium.config.Configuration;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.cassandra.Event;
import io.debezium.connector.cassandra.Record;
import io.debezium.connector.cassandra.spi.CassandraTestProvider;
import io.debezium.connector.cassandra.spi.CommitLogProcessing;
import io.debezium.connector.cassandra.spi.ProvidersResolver;
import io.debezium.connector.cassandra.utils.TestUtils;
import java.io.IOException;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.ServiceLoader;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:io/debezium/connector/cassandra/AbstractCommitLogProcessorTest.class */
public abstract class AbstractCommitLogProcessorTest extends CassandraConnectorTestBase {
    protected CommitLogProcessorMetrics metrics = new CommitLogProcessorMetrics();
    protected CommitLogProcessing commitLogProcessing;

    public Configuration getContextConfiguration() throws Throwable {
        return Configuration.from(TestUtils.generateDefaultConfigMap());
    }

    @Before
    public void setUp() throws Throwable {
        initialiseData();
        this.provider = ProvidersResolver.resolveConnectorContextProvider();
        this.context = this.provider.provideContext(getContextConfiguration());
        this.commitLogProcessing = this.provider.provideCommitLogProcessing(this.context, this.metrics);
        Awaitility.await().atMost(Duration.ofSeconds(60L)).until(() -> {
            return Boolean.valueOf(this.context.getSchemaHolder().getKeyValueSchema(new KeyspaceTable(TestUtils.TEST_KEYSPACE_NAME, TestUtils.TEST_TABLE_NAME)) != null);
        });
        this.metrics.registerMetrics();
    }

    @After
    public void tearDown() throws Exception {
        TestUtils.deleteTestOffsets(this.context);
        this.metrics.unregisterMetrics();
        TestUtils.deleteTestKeyspaceTables();
        this.context.cleanUp();
    }

    @Test
    public void test() throws Throwable {
        assumeTestRuns();
        verifyEvents();
    }

    public void assumeTestRuns() {
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void assumeNotDse() {
        Assume.assumeFalse(((CassandraTestProvider) ServiceLoader.load(CassandraTestProvider.class).findFirst().get()).getClass().getName().contains("io.debezium.connector.dse"));
    }

    protected void assumeNotCassandra3() {
        Assume.assumeFalse(((CassandraTestProvider) ServiceLoader.load(CassandraTestProvider.class).findFirst().get()).getClass().getName().contains("Cassandra3TestProvider"));
    }

    public abstract void initialiseData() throws Throwable;

    public abstract void verifyEvents() throws Throwable;

    public void createTable(String str) {
        createTable(str, TestUtils.TEST_KEYSPACE_NAME, TestUtils.TEST_TABLE_NAME);
    }

    public void createTable(String str, String str2, String str3) {
        TestUtils.runCql(String.format(str, str2, str3));
    }

    public List<Event> getEvents(int i) throws Throwable {
        ChangeEventQueue changeEventQueue = (ChangeEventQueue) this.context.getQueues().get(0);
        ArrayList arrayList = new ArrayList();
        new AtomicReference();
        Awaitility.await().atMost(60L, TimeUnit.SECONDS).until(() -> {
            try {
                readLogs(changeEventQueue);
                arrayList.clear();
                arrayList.addAll(changeEventQueue.poll());
                return Boolean.valueOf(arrayList.size() == i);
            } catch (IOException e) {
                return false;
            }
        });
        Assert.assertEquals(i, arrayList.size());
        return arrayList;
    }

    private void readLogs(ChangeEventQueue<Event> changeEventQueue) throws IOException {
        Assert.assertEquals(changeEventQueue.totalCapacity(), changeEventQueue.remainingCapacity());
        this.commitLogProcessing.readAllCommitLogs(CommitLogUtil.getCommitLogs(Paths.get("target/data/cassandra/cdc_raw", new String[0]).toAbsolutePath().toFile()));
    }

    public void assertEventTypes(List<Event> list, Event.EventType eventType, Record.Operation... operationArr) {
        Assert.assertEquals(list.size(), operationArr.length);
        for (int i = 0; i < list.size(); i++) {
            Record record = list.get(i);
            Assert.assertEquals(record.getEventType(), eventType);
            Assert.assertEquals(operationArr[i], record.getOp());
        }
    }
}
