package io.vertx.lang.rx.test;

import io.vertx.core.buffer.Buffer;
import io.vertx.core.streams.ReadStream;
import io.vertx.lang.rx.test.TestSubscriber;
import io.vertx.test.fakestream.FakeStream;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Stream;
import org.junit.Test;

/* loaded from: input_file:io/vertx/lang/rx/test/ReadStreamAdapterBackPressureTest.class */
public abstract class ReadStreamAdapterBackPressureTest<O> extends ReadStreamAdapterTestBase<Buffer, O> {
    protected abstract O toObservable(ReadStream<Buffer> readStream, int i);

    protected abstract O flatMap(O o, Function<Buffer, O> function);

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Can't rename method to resolve collision */
    @Override // io.vertx.lang.rx.test.ReadStreamAdapterTestBase
    public Buffer buffer(String str) {
        return Buffer.buffer(str);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.vertx.lang.rx.test.ReadStreamAdapterTestBase
    public String string(Buffer buffer) {
        return buffer.toString("UTF-8");
    }

    protected abstract long defaultMaxBufferSize();

    @Test
    public void testPause() {
        FakeStream fakeStream = new FakeStream();
        O observable = toObservable(fakeStream);
        TestSubscriber<B> prefetch = new TestSubscriber().prefetch(0L);
        subscribe(observable, prefetch);
        prefetch.assertEmpty();
        for (int i = 0; i < defaultMaxBufferSize(); i++) {
            fakeStream.emit(buffer(i));
        }
        prefetch.assertEmpty();
        prefetch.request(1L);
        prefetch.assertItem(buffer("0")).assertEmpty();
    }

    @Test
    public void testNoPauseWhenRequestingOne() {
        FakeStream fakeStream = new FakeStream();
        subscribe(toObservable(fakeStream), new TestSubscriber<Buffer>() { // from class: io.vertx.lang.rx.test.ReadStreamAdapterBackPressureTest.1
            @Override // io.vertx.lang.rx.test.TestSubscriber
            public void onNext(Buffer buffer) {
                super.onNext((AnonymousClass1) buffer);
                request(1L);
            }
        }.prefetch(1L));
        fakeStream.emit(Stream.of((Object[]) new Buffer[]{buffer("0"), buffer("1"), buffer("2")}));
    }

    @Test
    public void testUnsubscribeOnFirstItemFromBufferedDeliveredWhileRequesting() {
        for (int i = 1; i <= 3; i++) {
            FakeStream fakeStream = new FakeStream();
            TestSubscriber prefetch = new TestSubscriber<Buffer>() { // from class: io.vertx.lang.rx.test.ReadStreamAdapterBackPressureTest.2
                @Override // io.vertx.lang.rx.test.TestSubscriber
                public void onNext(Buffer buffer) {
                    super.onNext((AnonymousClass2) buffer);
                    unsubscribe();
                }
            }.prefetch(0L);
            subscribe(toObservable(fakeStream, 2), prefetch);
            fakeStream.emit(Stream.of((Object[]) new Buffer[]{buffer("0"), buffer("1")}));
            prefetch.request(i);
            prefetch.assertItem(Buffer.buffer("0")).assertEmpty();
        }
    }

    @Test
    public void testEndWithoutRequest() {
        testEndOrFailWithoutRequest(null);
    }

    @Test
    public void testFailWithoutRequest() {
        testEndOrFailWithoutRequest(new RuntimeException());
    }

    private void testEndOrFailWithoutRequest(Throwable th) {
        FakeStream fakeStream = new FakeStream();
        TestSubscriber<B> prefetch = new TestSubscriber().prefetch(0L);
        subscribe(toObservable(fakeStream, 2), prefetch);
        if (th == null) {
            fakeStream.end();
            prefetch.assertEmpty();
            prefetch.request(1L);
            prefetch.assertCompleted();
        } else {
            fakeStream.fail(th);
            prefetch.assertError(th);
        }
        prefetch.assertEmpty();
    }

    @Test
    public void testNoResumeWhenRequestingBuffered() {
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        FakeStream fakeStream = new FakeStream();
        TestSubscriber<B> prefetch = new TestSubscriber().prefetch(0L);
        subscribe(toObservable(fakeStream, 2), prefetch);
        fakeStream.emit(Stream.of((Object[]) new Buffer[]{buffer("0"), buffer("1")}));
        prefetch.request(1L);
        assertEquals(false, Boolean.valueOf(atomicBoolean.get()));
    }

    @Test
    public void testEndDuringRequestResume() {
        int i;
        FakeStream fakeStream = new FakeStream();
        fakeStream.drainHandler(r3 -> {
            fakeStream.end();
        });
        TestSubscriber<B> prefetch = new TestSubscriber().prefetch(0L);
        subscribe(toObservable(fakeStream, 10), prefetch);
        int i2 = 0;
        do {
            i = i2;
            i2++;
        } while (fakeStream.emit(Buffer.buffer(i)));
        prefetch.request(i2);
        for (int i3 = 0; i3 < i2; i3++) {
            prefetch.assertItem(Buffer.buffer(i3));
        }
        prefetch.assertEmpty();
        prefetch.request(1L);
        prefetch.assertCompleted();
    }

    @Test
    public void testDeliverEndWhenPaused() {
        testDeliverEndOrFailWhenPaused(null);
    }

    @Test
    public void testDeliverFailWhenPaused() {
        testDeliverEndOrFailWhenPaused(new RuntimeException());
    }

    private void testDeliverEndOrFailWhenPaused(Throwable th) {
        FakeStream fakeStream = new FakeStream();
        TestSubscriber<B> prefetch = new TestSubscriber().prefetch(0L);
        subscribe(toObservable(fakeStream, 2), prefetch);
        fakeStream.emit(Stream.of((Object[]) new Buffer[]{buffer("0"), buffer("1")}));
        if (th == null) {
            fakeStream.end();
        } else {
            fakeStream.fail(th);
        }
        prefetch.request(3L);
        if (th == null) {
            prefetch.assertItems(buffer("0"), buffer("1"));
            prefetch.assertCompleted();
        } else {
            prefetch.assertError(th);
        }
        prefetch.assertEmpty();
    }

    @Test
    public void testEndWhenPaused() {
        testEndOrFailWhenPaused(null);
    }

    @Test
    public void testFailWhenPaused() {
        testEndOrFailWhenPaused(new RuntimeException());
    }

    private void testEndOrFailWhenPaused(Throwable th) {
        FakeStream fakeStream = new FakeStream();
        TestSubscriber<B> prefetch = new TestSubscriber().prefetch(0L);
        subscribe(toObservable(fakeStream, 2), prefetch);
        fakeStream.emit(Stream.of((Object[]) new Buffer[]{buffer("0"), buffer("1")}));
        if (th == null) {
            fakeStream.end();
        } else {
            fakeStream.fail(th);
        }
        prefetch.request(3L);
        if (th == null) {
            prefetch.assertItems(buffer("0"), buffer("1"));
            prefetch.assertCompleted();
        } else {
            prefetch.assertError(th);
        }
        prefetch.assertEmpty();
    }

    @Test
    public void testRequestDuringOnNext() {
        FakeStream fakeStream = new FakeStream();
        TestSubscriber prefetch = new TestSubscriber<Buffer>() { // from class: io.vertx.lang.rx.test.ReadStreamAdapterBackPressureTest.3
            @Override // io.vertx.lang.rx.test.TestSubscriber
            public void onNext(Buffer buffer) {
                super.onNext((AnonymousClass3) buffer);
                request(1L);
            }
        }.prefetch(1L);
        subscribe(toObservable(fakeStream), prefetch);
        fakeStream.emit(buffer("0"));
        prefetch.assertItem(buffer("0")).assertEmpty();
        fakeStream.emit(buffer("1"));
        prefetch.assertItem(buffer("1")).assertEmpty();
        fakeStream.emit(buffer("2"));
        prefetch.assertItem(buffer("2")).assertEmpty();
        fakeStream.end();
        prefetch.assertCompleted().assertEmpty();
    }

    @Test
    public void testDeliverDuringResume() {
        TestSubscriber<B> prefetch = new TestSubscriber().prefetch(0L);
        FakeStream fakeStream = new FakeStream();
        fakeStream.drainHandler(r6 -> {
            fakeStream.emit(buffer("2"));
        });
        subscribe(toObservable(fakeStream, 2), prefetch);
        fakeStream.emit(Buffer.buffer("0"));
        fakeStream.emit(Buffer.buffer("1"));
        prefetch.request(2L);
        prefetch.assertItems(buffer("0"), buffer("1")).assertEmpty();
    }

    @Test
    public void testEndDuringResume() {
        int i;
        TestSubscriber<B> prefetch = new TestSubscriber().prefetch(0L);
        FakeStream fakeStream = new FakeStream();
        fakeStream.drainHandler(r3 -> {
            fakeStream.end();
        });
        subscribe(toObservable(fakeStream, 4), prefetch);
        int i2 = 0;
        do {
            i = i2;
            i2++;
        } while (fakeStream.emit(Buffer.buffer(i)));
        prefetch.request(i2);
        for (int i3 = 0; i3 < i2; i3++) {
            prefetch.assertItem(Buffer.buffer(i3));
        }
        prefetch.assertEmpty();
        prefetch.request(1L);
        prefetch.assertCompleted();
    }

    @Test
    public void testBufferDuringResume() {
        TestSubscriber<B> prefetch = new TestSubscriber().prefetch(0L);
        FakeStream fakeStream = new FakeStream();
        fakeStream.drainHandler(r9 -> {
            fakeStream.emit(Stream.of((Object[]) new Buffer[]{buffer("2"), buffer("3")}));
        });
        subscribe(toObservable(fakeStream, 2), prefetch);
        fakeStream.emit(Stream.of((Object[]) new Buffer[]{buffer("0"), buffer("1")}));
        prefetch.request(2L);
        prefetch.assertItem(buffer("0")).assertItem(buffer("1")).assertEmpty();
    }

    @Test
    public void testFoo() {
        TestSubscriber<B> prefetch = new TestSubscriber().prefetch(0L);
        FakeStream fakeStream = new FakeStream();
        subscribe(toObservable(fakeStream), prefetch);
        fakeStream.emit(buffer("0"));
        fakeStream.end();
        prefetch.request(1L);
        prefetch.assertItem(buffer("0"));
        prefetch.request(1L);
        prefetch.assertCompleted().assertEmpty();
    }

    @Test
    public void testBar() {
        TestSubscriber<B> prefetch = new TestSubscriber().prefetch(0L);
        FakeStream fakeStream = new FakeStream();
        subscribe(toObservable(fakeStream), prefetch);
        for (int i = 0; i < defaultMaxBufferSize(); i++) {
            fakeStream.emit(buffer(i));
        }
        fakeStream.end();
        prefetch.request(1L);
        prefetch.assertItem(buffer("0")).assertEmpty();
    }

    @Test
    public void testUnsubscribeDuringOnNext() {
        TestSubscriber testSubscriber = new TestSubscriber<Buffer>() { // from class: io.vertx.lang.rx.test.ReadStreamAdapterBackPressureTest.4
            @Override // io.vertx.lang.rx.test.TestSubscriber
            public void onNext(Buffer buffer) {
                super.onNext((AnonymousClass4) buffer);
                unsubscribe();
            }
        };
        FakeStream fakeStream = new FakeStream();
        subscribe(toObservable(fakeStream), testSubscriber);
        fakeStream.emit(buffer("0"));
    }

    @Test
    public void testResubscribe() {
        TestSubscriber<B> prefetch = new TestSubscriber().prefetch(0L);
        FakeStream fakeStream = new FakeStream();
        O observable = toObservable(fakeStream, 2);
        subscribe(observable, prefetch);
        fakeStream.emit(Stream.of((Object[]) new Buffer[]{buffer("0"), buffer("1")}));
        prefetch.unsubscribe();
        TestSubscriber<B> prefetch2 = new TestSubscriber().prefetch(0L);
        subscribe(observable, prefetch2);
        fakeStream.emit(buffer("2"));
        fakeStream.emit(buffer("3"));
        prefetch2.assertEmpty();
        prefetch2.request(2L);
        prefetch2.assertItems(buffer("2"), buffer("3"));
        RuntimeException runtimeException = new RuntimeException();
        fakeStream.fail(runtimeException);
        prefetch2.assertError(runtimeException);
        assertTrue(prefetch2.isUnsubscribed());
        TestSubscriber<B> testSubscriber = new TestSubscriber<>();
        subscribe(observable, testSubscriber);
        fakeStream.end();
        testSubscriber.assertCompleted();
    }

    @Test
    public void testBackPressureBuffer() {
        FakeStream fakeStream = new FakeStream();
        O observable = toObservable(fakeStream, 20);
        TestSubscriber prefetch = new TestSubscriber<Buffer>() { // from class: io.vertx.lang.rx.test.ReadStreamAdapterBackPressureTest.5
            @Override // io.vertx.lang.rx.test.TestSubscriber
            public void onSubscribe(TestSubscriber.Subscription subscription) {
                super.onSubscribe(subscription);
                request(5L);
            }
        }.prefetch(0L);
        subscribe(observable, prefetch);
        Objects.requireNonNull(prefetch);
        waitUntil(prefetch::isSubscribed);
        AtomicInteger atomicInteger = new AtomicInteger();
        while (!fakeStream.isPaused()) {
            fakeStream.emit(buffer(atomicInteger.get()));
            atomicInteger.incrementAndGet();
        }
        for (int i = 0; i < 5; i++) {
            prefetch.assertItem(buffer(i));
            fakeStream.emit(Buffer.buffer(String.valueOf(atomicInteger)));
            atomicInteger.incrementAndGet();
        }
        prefetch.assertEmpty();
        prefetch.request(atomicInteger.get() - 5);
        for (int i2 = 5; i2 < atomicInteger.get(); i2++) {
            prefetch.assertItem(buffer(i2));
        }
        prefetch.assertEmpty();
        fakeStream.end();
        prefetch.assertEmpty();
        prefetch.request(1L);
        prefetch.assertCompleted();
    }

    @Test
    public void testChained() throws Exception {
        FakeStream fakeStream = new FakeStream();
        O observable = toObservable(fakeStream);
        TestSubscriber<B> testSubscriber = new TestSubscriber<>();
        testSubscriber.prefetch(1L);
        subscribe(observable, testSubscriber);
        Objects.requireNonNull(testSubscriber);
        waitUntil(testSubscriber::isSubscribed);
        fakeStream.emit(buffer("foo"));
        fakeStream.end();
        testSubscriber.assertItem(buffer("foo"));
        testSubscriber.assertEmpty();
        testSubscriber.request(1L);
        testSubscriber.assertCompleted();
    }

    @Test
    public void testFlatMap() {
        FakeStream fakeStream = new FakeStream();
        O observable = toObservable(fakeStream);
        FakeStream fakeStream2 = new FakeStream();
        O observable2 = toObservable(fakeStream2);
        O flatMap = flatMap(observable, buffer -> {
            return observable2;
        });
        TestSubscriber<B> testSubscriber = new TestSubscriber<>();
        testSubscriber.prefetch(1L);
        subscribe(flatMap, testSubscriber);
        fakeStream.emit(buffer("foo"));
        fakeStream.end();
        fakeStream2.emit(buffer("bar"));
        fakeStream2.end();
        testSubscriber.assertItem(buffer("bar"));
        testSubscriber.assertCompleted();
    }

    @Test
    public void testCancelWhenSubscribedPropagatesToStream() {
        final Buffer buffer = buffer("something");
        final FakeStream fakeStream = new FakeStream();
        O observable = toObservable(fakeStream);
        TestSubscriber testSubscriber = new TestSubscriber<Buffer>() { // from class: io.vertx.lang.rx.test.ReadStreamAdapterBackPressureTest.6
            @Override // io.vertx.lang.rx.test.TestSubscriber
            public void onNext(Buffer buffer2) {
                ReadStreamAdapterBackPressureTest.this.assertSame(buffer2, buffer);
                super.onNext((AnonymousClass6) buffer2);
                unsubscribe();
                ReadStreamAdapterBackPressureTest.this.assertNull(fakeStream.handler());
            }
        };
        testSubscriber.prefetch(1L);
        subscribe(observable, testSubscriber);
        testSubscriber.assertEmpty();
        fakeStream.emit(buffer);
        testSubscriber.assertItem(buffer);
        testSubscriber.assertEmpty();
        assertNull(fakeStream.handler());
    }
}
