package io.vertx.lang.rx.test;

import io.vertx.core.streams.ReadStream;
import io.vertx.test.core.VertxTestBase;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import org.junit.Test;

/* loaded from: input_file:io/vertx/lang/rx/test/ReadStreamSubscriberStaticsTestBase.class */
public abstract class ReadStreamSubscriberStaticsTestBase<T, F> extends VertxTestBase {
    public abstract F emptyFlowable();

    public abstract F emptyExceptionFlowable(String str);

    public abstract F exceptionAfterDataFlowable(String str, Iterable<T> iterable);

    public abstract F flowable(Iterable<T> iterable);

    public abstract ReadStream<T> asReadStream(F f);

    public abstract List<T> generateData(int i);

    @Test
    public void testEmptyFlowable() {
        waitFor(1);
        ReadStream<T> asReadStream = asReadStream(emptyFlowable());
        asReadStream.exceptionHandler(this::fail);
        asReadStream.endHandler(r3 -> {
            complete();
        });
        asReadStream.handler(obj -> {
            fail("empty stream");
        });
        await();
    }

    @Test
    public void testEmtpyListFlowable() {
        waitFor(1);
        ReadStream<T> asReadStream = asReadStream(flowable(Collections.emptyList()));
        asReadStream.exceptionHandler(this::fail);
        asReadStream.endHandler(r3 -> {
            complete();
        });
        asReadStream.handler(obj -> {
            fail("empty stream");
        });
        await();
    }

    @Test
    public void testWithElements() {
        List<T> generateData = generateData(5);
        waitFor(generateData.size() + 1);
        LinkedList linkedList = new LinkedList(generateData);
        ReadStream<T> asReadStream = asReadStream(flowable(generateData));
        asReadStream.exceptionHandler(this::fail);
        asReadStream.endHandler(r3 -> {
            complete();
        });
        asReadStream.handler(obj -> {
            assertEquals(linkedList.poll(), obj);
            complete();
        });
        await();
    }

    @Test
    public void testWithException() {
        waitFor(2);
        String str = "error msg";
        ReadStream<T> asReadStream = asReadStream(emptyExceptionFlowable("error msg"));
        asReadStream.exceptionHandler(th -> {
            assertEquals(str, th.getMessage());
            complete();
        });
        asReadStream.endHandler(r3 -> {
            complete();
        });
        asReadStream.handler(obj -> {
            fail("only error in stream");
        });
        await();
    }

    @Test
    public void testWithDataAndException() {
        List<T> generateData = generateData(5);
        waitFor(generateData.size() + 2);
        LinkedList linkedList = new LinkedList(generateData);
        String str = "error msg";
        ReadStream<T> asReadStream = asReadStream(exceptionAfterDataFlowable("error msg", generateData));
        asReadStream.exceptionHandler(th -> {
            assertEquals(str, th.getMessage());
            assertTrue("all data elements should be received ahead of the exception", linkedList.isEmpty());
            complete();
        });
        asReadStream.endHandler(r3 -> {
            complete();
        });
        asReadStream.handler(obj -> {
            assertEquals(linkedList.poll(), obj);
            complete();
        });
        await();
    }
}
