package io.vertx.lang.rx.test;

import io.vertx.core.Handler;
import io.vertx.core.streams.ReadStream;
import io.vertx.test.core.VertxTestBase;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Test;

/* loaded from: input_file:io/vertx/lang/rx/test/ReadStreamSubscriberTestBase.class */
public abstract class ReadStreamSubscriberTestBase extends VertxTestBase {
    private final long BUFFER_SIZE = bufferSize();

    /* loaded from: input_file:io/vertx/lang/rx/test/ReadStreamSubscriberTestBase$Receiver.class */
    private class Receiver extends ArrayDeque<Object> {
        final Object DONE = new Object();

        private Receiver() {
        }

        void handle(String str) {
            add(str);
        }

        void handleException(Throwable th) {
            add(th);
        }

        void handleEnd(Void r4) {
            add(this.DONE);
        }

        void subscribe(ReadStream<String> readStream) {
            readStream.exceptionHandler(this::handleException);
            readStream.endHandler(this::handleEnd);
            readStream.handler(this::handle);
        }

        Receiver assertEmpty() {
            ReadStreamSubscriberTestBase.this.assertEquals(Collections.emptyList(), new ArrayList(this));
            return this;
        }

        Receiver assertItems(String... strArr) {
            ArrayList arrayList = new ArrayList();
            while (size() > 0 && arrayList.size() < strArr.length) {
                arrayList.add(remove());
            }
            ReadStreamSubscriberTestBase.this.assertEquals(Arrays.asList(strArr), arrayList);
            return this;
        }

        void assertEnded() {
            ReadStreamSubscriberTestBase.this.assertEquals(this.DONE, remove());
            assertEmpty();
        }
    }

    /* loaded from: input_file:io/vertx/lang/rx/test/ReadStreamSubscriberTestBase$Sender.class */
    public abstract class Sender {
        protected ReadStream<String> stream;
        protected long requested;
        protected int seq;

        public Sender() {
        }

        protected abstract void emit();

        void emit(int i) {
            for (int i2 = 0; i2 < i; i2++) {
                emit();
            }
        }

        protected abstract void complete();

        protected abstract void fail(Throwable th);

        void assertRequested(long j) {
            ReadStreamSubscriberTestBase.this.assertEquals(j, this.requested);
        }

        long available() {
            return this.requested - this.seq;
        }

        protected abstract boolean isUnsubscribed();
    }

    public abstract long bufferSize();

    protected abstract Sender sender();

    @Test
    public void testInitial() throws Exception {
        Sender sender = sender();
        sender.assertRequested(0L);
        new Receiver().subscribe(sender.stream);
        sender.assertRequested(this.BUFFER_SIZE);
        while (sender.seq < this.BUFFER_SIZE / 2) {
            sender.emit();
            sender.assertRequested(this.BUFFER_SIZE);
        }
        long j = this.BUFFER_SIZE - (sender.seq - 1);
        sender.emit();
        sender.assertRequested(this.BUFFER_SIZE + j);
    }

    @Test
    public void testPause() {
        Sender sender = sender();
        sender.stream.resume();
        sender.stream.pause();
        Receiver receiver = new Receiver();
        receiver.subscribe(sender.stream);
        for (int i = 0; i < this.BUFFER_SIZE; i++) {
            sender.emit();
            assertEquals(this.BUFFER_SIZE, sender.requested);
        }
        assertEquals(0L, sender.available());
        receiver.assertEmpty();
        sender.stream.resume();
        assertEquals(this.BUFFER_SIZE, sender.available());
        receiver.assertItems("0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13", "14", "15");
        receiver.assertEmpty();
    }

    @Test
    public void testFetch() {
        Sender sender = sender();
        sender.stream.pause();
        Receiver receiver = new Receiver();
        receiver.subscribe(sender.stream);
        assertEquals(16L, sender.requested);
        receiver.assertEmpty();
        sender.emit();
        receiver.assertEmpty();
        sender.stream.fetch(1L);
        receiver.assertItems("0");
        sender.stream.fetch(1L);
        receiver.assertItems(new String[0]);
        sender.emit(4);
        receiver.assertItems("1");
        sender.complete();
        receiver.assertEmpty();
        sender.stream.fetch(3L);
        receiver.assertItems("2", "3", "4");
        receiver.assertEnded();
    }

    @Test
    public void testCompletion() {
        Sender sender = sender();
        Receiver receiver = new Receiver();
        receiver.subscribe(sender.stream);
        sender.complete();
        receiver.assertEnded();
    }

    @Test
    public void testCompletionWhenPaused() {
        Sender sender = sender();
        sender.stream.pause();
        Receiver receiver = new Receiver();
        receiver.subscribe(sender.stream);
        sender.emit(3);
        sender.complete();
        sender.stream.resume();
        receiver.assertItems("0", "1", "2").assertEnded();
    }

    @Test
    public void testSetNullHandlersInEndHandler() {
        Sender sender = sender();
        AtomicInteger atomicInteger = new AtomicInteger();
        sender.stream.endHandler(r5 -> {
            atomicInteger.incrementAndGet();
            sender.stream.handler((Handler) null);
            sender.stream.endHandler((Handler) null);
            sender.stream.exceptionHandler((Handler) null);
        });
        sender.stream.handler(str -> {
        });
        sender.complete();
        assertEquals(1L, atomicInteger.get());
    }

    @Test
    public void testSetHandlersAfterCompletion() {
        Sender sender = sender();
        sender.stream.handler(str -> {
        });
        sender.complete();
        try {
            sender.stream.endHandler(r1 -> {
            });
            fail();
        } catch (IllegalStateException e) {
        }
        sender.stream.endHandler((Handler) null);
        try {
            sender.stream.exceptionHandler(th -> {
            });
            fail();
        } catch (IllegalStateException e2) {
        }
        sender.stream.exceptionHandler((Handler) null);
    }

    @Test
    public void testSetHandlersAfterError() {
        Sender sender = sender();
        sender.stream.handler(str -> {
        });
        sender.fail(new Throwable());
        try {
            sender.stream.endHandler(r1 -> {
            });
            fail();
        } catch (IllegalStateException e) {
        }
        sender.stream.endHandler((Handler) null);
        try {
            sender.stream.exceptionHandler(th -> {
            });
            fail();
        } catch (IllegalStateException e2) {
        }
        sender.stream.exceptionHandler((Handler) null);
    }

    @Test
    public void testDontDeliverCompletionWhenPausedWithPendingBuffers() {
        Sender sender = sender();
        AtomicInteger atomicInteger = new AtomicInteger();
        AtomicInteger atomicInteger2 = new AtomicInteger();
        sender.stream.endHandler(r3 -> {
            atomicInteger2.incrementAndGet();
        });
        sender.stream.exceptionHandler(th -> {
            atomicInteger.incrementAndGet();
        });
        sender.stream.handler(str -> {
        });
        sender.stream.pause();
        sender.emit();
        sender.complete();
        assertEquals(0L, atomicInteger2.get());
        sender.stream.resume();
        assertEquals(1L, atomicInteger2.get());
        assertEquals(0L, atomicInteger.get());
    }

    @Test
    public void testDontDeliverErrorWhenPausedWithPendingBuffers() {
        Sender sender = sender();
        AtomicInteger atomicInteger = new AtomicInteger();
        AtomicInteger atomicInteger2 = new AtomicInteger();
        sender.stream.endHandler(r3 -> {
            atomicInteger2.incrementAndGet();
        });
        sender.stream.exceptionHandler(th -> {
            atomicInteger.incrementAndGet();
        });
        sender.stream.handler(str -> {
        });
        sender.stream.pause();
        sender.emit();
        sender.fail(new RuntimeException());
        assertEquals(0L, atomicInteger2.get());
        assertEquals(0L, atomicInteger.get());
        sender.stream.resume();
        assertEquals(1L, atomicInteger2.get());
        assertEquals(1L, atomicInteger.get());
    }

    @Test
    public void testSetHandlersAfterCompletionButPending() {
        Sender sender = sender();
        sender.stream.handler(str -> {
        });
        sender.stream.pause();
        sender.emit();
        sender.complete();
        sender.stream.exceptionHandler(th -> {
        });
        sender.stream.exceptionHandler((Handler) null);
        sender.stream.endHandler(r1 -> {
        });
        sender.stream.endHandler((Handler) null);
    }

    @Test
    public void testSetHandlersAfterErrorButPending() {
        Sender sender = sender();
        sender.stream.handler(str -> {
        });
        sender.stream.pause();
        sender.emit();
        sender.fail(new Throwable());
        sender.stream.exceptionHandler(th -> {
        });
        sender.stream.exceptionHandler((Handler) null);
        sender.stream.endHandler(r1 -> {
        });
        sender.stream.endHandler((Handler) null);
    }

    @Test
    public void testSetNullHandlerUnsubscribes() {
        Sender sender = sender();
        sender.stream.handler(str -> {
        });
        sender.emit();
        sender.stream.handler((Handler) null);
        assertTrue(sender.isUnsubscribed());
    }

    @Test
    public void testReadStreamElementsMustBeSerialized() {
        Sender sender = sender();
        List synchronizedList = Collections.synchronizedList(new ArrayList());
        AtomicReference atomicReference = new AtomicReference();
        AtomicReference atomicReference2 = new AtomicReference();
        sender.stream.handler(str -> {
            if (atomicReference.get() != null) {
                atomicReference2.set(Thread.currentThread());
            } else {
                atomicReference.set(Thread.currentThread());
                publishItemFromAnotherThread(sender);
            }
            synchronizedList.add(str);
        });
        sender.stream.pause();
        sender.emit();
        sender.stream.resume();
        assertEquals(Arrays.asList("0", "1"), synchronizedList);
        assertEquals(atomicReference.get(), atomicReference2.get());
    }

    private static void publishItemFromAnotherThread(Sender sender) {
        Objects.requireNonNull(sender);
        Thread thread = new Thread(sender::emit);
        thread.start();
        try {
            thread.join();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}
