package io.vertx.cassandra;

import com.datastax.oss.driver.api.core.cql.PagingState;
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import com.datastax.oss.driver.api.core.cql.SimpleStatementBuilder;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:io/vertx/cassandra/StreamingTest.class */
public class StreamingTest extends CassandraClientTestBase {
    @Test
    public void testReadStream(TestContext testContext) throws Exception {
        initializeRandomStringKeyspace();
        insertRandomStrings(63);
        SimpleStatement pageSize = SimpleStatement.newInstance("select random_string from random_strings.random_string_by_first_letter where first_letter = 'A'").setPageSize(5);
        Async async = testContext.async();
        List synchronizedList = Collections.synchronizedList(new ArrayList());
        List synchronizedList2 = Collections.synchronizedList(new ArrayList());
        AtomicInteger atomicInteger = new AtomicInteger();
        this.client.queryStream(pageSize).onComplete(testContext.asyncAssertSuccess(cassandraRowStream -> {
            long j = 500;
            long nanoTime = System.nanoTime();
            CassandraRowStream endHandler = cassandraRowStream.endHandler(r15 -> {
                testContext.verify(r13 -> {
                    Assert.assertTrue(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime) >= 5 * j);
                    for (int i = 1; i < synchronizedList2.size(); i++) {
                        if (i >= 60) {
                            Assert.assertNull(synchronizedList2.get(i));
                        } else if (i % 5 == 0) {
                            Assert.assertFalse(Arrays.equals(((PagingState) synchronizedList2.get(i)).toBytes(), ((PagingState) synchronizedList2.get(i - 1)).toBytes()));
                        } else {
                            Assert.assertArrayEquals(((PagingState) synchronizedList2.get(i)).toBytes(), ((PagingState) synchronizedList2.get(i - 1)).toBytes());
                        }
                    }
                    async.countDown();
                });
            });
            testContext.getClass();
            endHandler.exceptionHandler(testContext::fail).handler(row -> {
                synchronizedList.add(row);
                int andIncrement = atomicInteger.getAndIncrement();
                synchronizedList2.add(cassandraRowStream.executionInfo().getSafePagingState());
                if (andIncrement == 3 || andIncrement == 16 || andIncrement == 21 || andIncrement == 38 || andIncrement == 47) {
                    cassandraRowStream.pause();
                    int size = synchronizedList.size();
                    this.vertx.setTimer(j, l -> {
                        testContext.assertTrue(size == synchronizedList.size());
                        cassandraRowStream.resume();
                    });
                }
            });
        }));
    }

    @Test
    public void streamFetchesDoesNotOverflowDefault512KbJVMStack(TestContext testContext) throws Exception {
        initializeRandomStringKeyspace();
        insertRandomStrings(5000);
        SimpleStatement build = new SimpleStatementBuilder(String.format("select random_string from random_strings.random_string_by_first_letter limit %d", 100000)).setPageSize(100000).build();
        Async async = testContext.async();
        this.client.queryStream(build, testContext.asyncAssertSuccess(cassandraRowStream -> {
            CassandraRowStream endHandler = cassandraRowStream.endHandler(r3 -> {
                async.countDown();
            });
            testContext.getClass();
            endHandler.exceptionHandler(testContext::fail).handler(row -> {
            });
        }));
    }

    @Test
    public void emptyStream(TestContext testContext) throws Exception {
        initializeRandomStringKeyspace();
        insertRandomStrings(1);
        Async async = testContext.async();
        this.client.queryStream("select random_string from random_strings.random_string_by_first_letter where first_letter = '$'").onComplete(testContext.asyncAssertSuccess(cassandraRowStream -> {
            testContext.assertNotNull(cassandraRowStream.columnDefinitions());
            CassandraRowStream endHandler = cassandraRowStream.endHandler(r3 -> {
                async.countDown();
            });
            testContext.getClass();
            endHandler.exceptionHandler(testContext::fail).handler(row -> {
                testContext.fail();
            });
        }));
    }

    @Test
    public void emptyStreamWithHandlerSetFirst(TestContext testContext) throws Exception {
        initializeRandomStringKeyspace();
        insertRandomStrings(1);
        Async async = testContext.async();
        this.client.queryStream("select random_string from random_strings.random_string_by_first_letter where first_letter = '$'", testContext.asyncAssertSuccess(cassandraRowStream -> {
            CassandraRowStream endHandler = cassandraRowStream.handler(row -> {
                testContext.fail();
            }).endHandler(r3 -> {
                async.countDown();
            });
            testContext.getClass();
            endHandler.exceptionHandler(testContext::fail);
        }));
    }
}
