package io.vertx.ext.reactivestreams.test;

import io.vertx.core.VertxException;
import io.vertx.core.buffer.Buffer;
import io.vertx.ext.reactivestreams.ReactiveReadStream;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import org.junit.Test;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

/* loaded from: input_file:io/vertx/ext/reactivestreams/test/ReactiveReadStreamTest.class */
public class ReactiveReadStreamTest extends ReactiveStreamTestBase {

    /* loaded from: input_file:io/vertx/ext/reactivestreams/test/ReactiveReadStreamTest$MyPublisher.class */
    class MyPublisher implements Publisher<Buffer> {
        MySubscription subscription;
        Subscriber<? super Buffer> subscriber;

        MyPublisher() {
        }

        public void subscribe(Subscriber<? super Buffer> subscriber) {
            this.subscriber = subscriber;
            this.subscription = new MySubscription();
            subscriber.onSubscribe(this.subscription);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/vertx/ext/reactivestreams/test/ReactiveReadStreamTest$MySubscription.class */
    public class MySubscription implements Subscription {
        int requested;
        int requestedTimes;

        MySubscription() {
        }

        public void request(long j) {
            this.requestedTimes++;
            this.requested = (int) (this.requested + j);
        }

        public void cancel() {
        }
    }

    @Test
    public void testSubscribe() throws Exception {
        Subscriber<? super Buffer> readStream = ReactiveReadStream.readStream();
        MyPublisher myPublisher = new MyPublisher();
        myPublisher.subscribe(readStream);
        assertNotNull(myPublisher.subscription);
        assertEquals(0L, myPublisher.subscription.requestedTimes);
    }

    @Test
    public void testDatahandler() throws Exception {
        Subscriber<? super Buffer> readStream = ReactiveReadStream.readStream();
        MyPublisher myPublisher = new MyPublisher();
        myPublisher.subscribe(readStream);
        assertNotNull(myPublisher.subscription);
        assertEquals(0L, myPublisher.subscription.requestedTimes);
        ArrayList arrayList = new ArrayList();
        Objects.requireNonNull(arrayList);
        readStream.handler((v1) -> {
            r1.add(v1);
        });
        assertEquals(1L, myPublisher.subscription.requestedTimes);
        assertEquals(4L, myPublisher.subscription.requested);
        List<Buffer> createRandomBuffers = createRandomBuffers(4);
        Iterator<Buffer> it = createRandomBuffers.iterator();
        while (it.hasNext()) {
            myPublisher.subscriber.onNext(it.next());
        }
        assertEquals(4, arrayList.size());
        for (int i = 0; i < 4; i++) {
            assertEquals(createRandomBuffers.get(i), arrayList.get(i));
        }
        assertEquals(2L, myPublisher.subscription.requestedTimes);
        assertEquals(8L, myPublisher.subscription.requested);
    }

    @Test
    public void testSetPausedDataHandler() throws Exception {
        Subscriber<? super Buffer> readStream = ReactiveReadStream.readStream();
        MyPublisher myPublisher = new MyPublisher();
        myPublisher.subscribe(readStream);
        assertNotNull(myPublisher.subscription);
        assertEquals(0L, myPublisher.subscription.requestedTimes);
        ArrayList arrayList = new ArrayList();
        readStream.pause();
        Objects.requireNonNull(arrayList);
        readStream.handler((v1) -> {
            r1.add(v1);
        });
        assertEquals(0L, myPublisher.subscription.requestedTimes);
        assertEquals(0L, myPublisher.subscription.requested);
        assertEquals(0L, arrayList.size());
        assertEquals(0L, myPublisher.subscription.requestedTimes);
        assertEquals(0L, myPublisher.subscription.requested);
        readStream.resume();
        assertEquals(1L, myPublisher.subscription.requestedTimes);
        assertEquals(4L, myPublisher.subscription.requested);
        List<Buffer> createRandomBuffers = createRandomBuffers(4);
        Iterator<Buffer> it = createRandomBuffers.iterator();
        while (it.hasNext()) {
            myPublisher.subscriber.onNext(it.next());
        }
        assertEquals(4, arrayList.size());
        for (int i = 0; i < 4; i++) {
            assertEquals(createRandomBuffers.get(i), arrayList.get(i));
        }
    }

    @Test
    public void testPauseInHandler() throws Exception {
        Subscriber<? super Buffer> readStream = ReactiveReadStream.readStream();
        MyPublisher myPublisher = new MyPublisher();
        myPublisher.subscribe(readStream);
        assertNotNull(myPublisher.subscription);
        assertEquals(0L, myPublisher.subscription.requestedTimes);
        ArrayList arrayList = new ArrayList();
        readStream.handler(buffer -> {
            arrayList.add(buffer);
            readStream.pause();
        });
        assertEquals(1L, myPublisher.subscription.requestedTimes);
        assertEquals(4L, myPublisher.subscription.requested);
        List<Buffer> createRandomBuffers = createRandomBuffers(4);
        Iterator<Buffer> it = createRandomBuffers.iterator();
        while (it.hasNext()) {
            myPublisher.subscriber.onNext(it.next());
        }
        assertEquals(1L, arrayList.size());
        assertEquals(createRandomBuffers.get(0), arrayList.get(0));
        assertEquals(1L, myPublisher.subscription.requestedTimes);
        assertEquals(4L, myPublisher.subscription.requested);
    }

    @Test
    public void testPauseResume() throws Exception {
        Subscriber<? super Buffer> readStream = ReactiveReadStream.readStream();
        MyPublisher myPublisher = new MyPublisher();
        myPublisher.subscribe(readStream);
        assertNotNull(myPublisher.subscription);
        assertEquals(0L, myPublisher.subscription.requestedTimes);
        ArrayList arrayList = new ArrayList();
        Objects.requireNonNull(arrayList);
        readStream.handler((v1) -> {
            r1.add(v1);
        });
        assertEquals(1L, myPublisher.subscription.requestedTimes);
        assertEquals(4L, myPublisher.subscription.requested);
        List<Buffer> createRandomBuffers = createRandomBuffers(4);
        Iterator<Buffer> it = createRandomBuffers.iterator();
        while (it.hasNext()) {
            myPublisher.subscriber.onNext(it.next());
        }
        assertEquals(4, arrayList.size());
        for (int i = 0; i < 4; i++) {
            assertEquals(createRandomBuffers.get(i), arrayList.get(i));
        }
        readStream.pause();
        assertEquals(2L, myPublisher.subscription.requestedTimes);
        assertEquals(8L, myPublisher.subscription.requested);
        readStream.resume();
        assertEquals(2L, myPublisher.subscription.requestedTimes);
        assertEquals(8L, myPublisher.subscription.requested);
        createRandomBuffers.clear();
        arrayList.clear();
        List<Buffer> createRandomBuffers2 = createRandomBuffers(4);
        Iterator<Buffer> it2 = createRandomBuffers2.iterator();
        while (it2.hasNext()) {
            myPublisher.subscriber.onNext(it2.next());
        }
        assertEquals(4, arrayList.size());
        for (int i2 = 0; i2 < 4; i2++) {
            assertEquals(createRandomBuffers2.get(i2), arrayList.get(i2));
        }
    }

    @Test
    public void testPauseResumeInHandler() throws Exception {
        Subscriber<? super Buffer> readStream = ReactiveReadStream.readStream();
        MyPublisher myPublisher = new MyPublisher();
        myPublisher.subscribe(readStream);
        assertNotNull(myPublisher.subscription);
        assertEquals(0L, myPublisher.subscription.requestedTimes);
        ArrayList arrayList = new ArrayList();
        readStream.handler(buffer -> {
            readStream.pause();
            readStream.resume();
            arrayList.add(buffer);
        });
        assertEquals(1L, myPublisher.subscription.requestedTimes);
        assertEquals(4L, myPublisher.subscription.requested);
        List<Buffer> createRandomBuffers = createRandomBuffers(4);
        Iterator<Buffer> it = createRandomBuffers.iterator();
        while (it.hasNext()) {
            myPublisher.subscriber.onNext(it.next());
        }
        assertEquals(4, arrayList.size());
        for (int i = 0; i < 4; i++) {
            assertEquals(createRandomBuffers.get(i), arrayList.get(i));
        }
        assertEquals(2L, myPublisher.subscription.requestedTimes);
        assertEquals(8L, myPublisher.subscription.requested);
    }

    @Test
    public void testFetch() throws Exception {
        Subscriber<? super Buffer> readStream = ReactiveReadStream.readStream();
        MyPublisher myPublisher = new MyPublisher();
        myPublisher.subscribe(readStream);
        assertNotNull(myPublisher.subscription);
        assertEquals(0L, myPublisher.subscription.requestedTimes);
        ArrayList arrayList = new ArrayList();
        Objects.requireNonNull(arrayList);
        readStream.handler((v1) -> {
            r1.add(v1);
        });
        assertEquals(1L, myPublisher.subscription.requestedTimes);
        assertEquals(4L, myPublisher.subscription.requested);
        List<Buffer> createRandomBuffers = createRandomBuffers(4);
        Iterator<Buffer> it = createRandomBuffers.iterator();
        while (it.hasNext()) {
            myPublisher.subscriber.onNext(it.next());
        }
        assertEquals(4, arrayList.size());
        for (int i = 0; i < 4; i++) {
            assertEquals(createRandomBuffers.get(i), arrayList.get(i));
        }
        readStream.pause();
        assertEquals(2L, myPublisher.subscription.requestedTimes);
        assertEquals(8L, myPublisher.subscription.requested);
        assertEquals(2L, myPublisher.subscription.requestedTimes);
        assertEquals(8L, myPublisher.subscription.requested);
        createRandomBuffers.clear();
        arrayList.clear();
        List<Buffer> createRandomBuffers2 = createRandomBuffers(4);
        Iterator<Buffer> it2 = createRandomBuffers2.iterator();
        while (it2.hasNext()) {
            myPublisher.subscriber.onNext(it2.next());
        }
        for (int i2 = 0; i2 < 4; i2++) {
            readStream.fetch(1L);
            assertEquals(1 + i2, arrayList.size());
        }
        for (int i3 = 0; i3 < 4; i3++) {
            assertEquals(createRandomBuffers2.get(i3), arrayList.get(i3));
        }
    }

    @Test
    public void testOnError() throws Exception {
        Subscriber<? super Buffer> readStream = ReactiveReadStream.readStream();
        MyPublisher myPublisher = new MyPublisher();
        myPublisher.subscribe(readStream);
        readStream.exceptionHandler(th -> {
            assertTrue(th instanceof VertxException);
            assertEquals("foo", th.getMessage());
            testComplete();
        });
        myPublisher.subscriber.onError(new VertxException("foo"));
        await();
    }

    @Test
    public void testOnComplete() throws Exception {
        Subscriber<? super Buffer> readStream = ReactiveReadStream.readStream();
        MyPublisher myPublisher = new MyPublisher();
        myPublisher.subscribe(readStream);
        readStream.endHandler(r3 -> {
            testComplete();
        });
        myPublisher.subscriber.onComplete();
        await();
    }
}
