package io.vertx.rx.java.test;

import io.vertx.core.file.OpenOptions;
import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.net.NetClientOptions;
import io.vertx.core.net.NetServerOptions;
import io.vertx.lang.rx.test.SimplePojo;
import io.vertx.rx.java.ObservableFuture;
import io.vertx.rx.java.RxHelper;
import io.vertx.rxjava.core.AbstractVerticle;
import io.vertx.rxjava.core.Context;
import io.vertx.rxjava.core.Vertx;
import io.vertx.rxjava.core.buffer.Buffer;
import io.vertx.rxjava.core.eventbus.EventBus;
import io.vertx.rxjava.core.eventbus.MessageConsumer;
import io.vertx.rxjava.core.http.HttpClient;
import io.vertx.rxjava.core.http.HttpServer;
import io.vertx.rxjava.core.http.HttpServerRequest;
import io.vertx.rxjava.core.http.ServerWebSocket;
import io.vertx.rxjava.core.http.WebSocket;
import io.vertx.rxjava.core.net.NetServer;
import io.vertx.rxjava.core.net.NetSocket;
import io.vertx.rxjava.core.parsetools.RecordParser;
import io.vertx.rxjava.core.streams.ReadStream;
import io.vertx.test.core.VertxTestBase;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;
import rx.Observable;
import rx.Observer;
import rx.Single;
import rx.Subscriber;

/* loaded from: input_file:io/vertx/rx/java/test/CoreApiTest.class */
public class CoreApiTest extends VertxTestBase {
    private Vertx vertx;

    public void setUp() throws Exception {
        super.setUp();
        this.vertx = new Vertx(((VertxTestBase) this).vertx);
    }

    @Test
    public void testConsumeBodyStream() {
        EventBus eventBus = this.vertx.eventBus();
        final MessageConsumer consumer = eventBus.consumer("the-address");
        Observable observable = consumer.bodyStream().toObservable();
        final ArrayList arrayList = new ArrayList();
        observable.subscribe(new Subscriber<String>() { // from class: io.vertx.rx.java.test.CoreApiTest.1
            public void onNext(String str) {
                arrayList.add(str);
                if (arrayList.size() == 3) {
                    unsubscribe();
                    CoreApiTest.this.assertEquals(Arrays.asList("msg1", "msg2", "msg3"), arrayList);
                    CoreApiTest.this.assertFalse(consumer.isRegistered());
                    CoreApiTest.this.testComplete();
                }
            }

            public void onError(Throwable th) {
                CoreApiTest.this.fail(th.getMessage());
            }

            public void onCompleted() {
                CoreApiTest.this.fail();
            }
        });
        eventBus.send("the-address", "msg1");
        eventBus.send("the-address", "msg2");
        eventBus.send("the-address", "msg3");
        await();
    }

    @Test
    public void testRegisterAgain() {
        final EventBus eventBus = this.vertx.eventBus();
        final MessageConsumer consumer = eventBus.consumer("the-address");
        final Observable observable = consumer.bodyStream().toObservable();
        observable.subscribe(new Subscriber<String>() { // from class: io.vertx.rx.java.test.CoreApiTest.2
            public void onNext(String str) {
                CoreApiTest.this.assertEquals("msg1", str);
                unsubscribe();
                CoreApiTest.this.assertFalse(consumer.isRegistered());
                observable.subscribe(new Subscriber<String>() { // from class: io.vertx.rx.java.test.CoreApiTest.2.1
                    public void onNext(String str2) {
                        CoreApiTest.this.assertEquals("msg2", str2);
                        unsubscribe();
                        CoreApiTest.this.assertFalse(consumer.isRegistered());
                        CoreApiTest.this.testComplete();
                    }

                    public void onError(Throwable th) {
                        CoreApiTest.this.fail("Was not esxpecting error " + th.getMessage());
                    }

                    public void onCompleted() {
                        CoreApiTest.this.fail();
                    }
                });
                eventBus.send("the-address", "msg2");
            }

            public void onError(Throwable th) {
                CoreApiTest.this.fail("Was not esxpecting error " + th.getMessage());
            }

            public void onCompleted() {
                CoreApiTest.this.fail();
            }
        });
        eventBus.send("the-address", "msg1");
        await();
    }

    @Test
    public void testObservableUnsubscribeDuringObservation() {
        EventBus eventBus = this.vertx.eventBus();
        Observable take = eventBus.consumer("the-address").bodyStream().toObservable().take(4);
        final ArrayList arrayList = new ArrayList();
        take.subscribe(new Subscriber<String>() { // from class: io.vertx.rx.java.test.CoreApiTest.3
            public void onCompleted() {
                CoreApiTest.this.assertEquals(Arrays.asList("msg0", "msg1", "msg2", "msg3"), arrayList);
                CoreApiTest.this.testComplete();
            }

            public void onError(Throwable th) {
                CoreApiTest.this.fail(th.getMessage());
            }

            public void onNext(String str) {
                arrayList.add(str);
            }
        });
        for (int i = 0; i < 7; i++) {
            eventBus.send("the-address", "msg" + i);
        }
        await();
    }

    @Test
    public void testUnregisterConsumer() {
        MessageConsumer consumer = this.vertx.eventBus().consumer("the-address");
        consumer.bodyStream().toObservable().subscribe(new Subscriber<String>() { // from class: io.vertx.rx.java.test.CoreApiTest.4
            public void onCompleted() {
                CoreApiTest.this.testComplete();
            }

            public void onError(Throwable th) {
                CoreApiTest.this.fail(th.getMessage());
            }

            public void onNext(String str) {
                CoreApiTest.this.fail();
            }
        });
        consumer.unregister();
        await();
    }

    @Test
    public void testConcatReplies() {
        EventBus eventBus = this.vertx.eventBus();
        eventBus.consumer("the-address", message -> {
            message.reply(message.body());
        });
        Single rxRequest = eventBus.rxRequest("the-address", "msg1");
        Single rxRequest2 = eventBus.rxRequest("the-address", "msg2");
        eventBus.request("the-address", "done", asyncResult -> {
            Observable concat = Single.concat(rxRequest, rxRequest2);
            LinkedList linkedList = new LinkedList();
            concat.subscribe(message2 -> {
                linkedList.add((String) message2.body());
            }, th -> {
                fail();
            }, () -> {
                assertEquals(Arrays.asList("msg1", "msg2"), linkedList);
                testComplete();
            });
        });
        await();
    }

    @Test
    public void testObservableNetSocket() {
        ObservableFuture observableFuture = RxHelper.observableFuture();
        observableFuture.subscribe(netServer -> {
            this.vertx.createNetClient(new NetClientOptions()).connect(1234, "localhost", asyncResult -> {
                assertTrue(asyncResult.succeeded());
                NetSocket netSocket = (NetSocket) asyncResult.result();
                netSocket.write("foo");
                netSocket.close();
            });
        }, th -> {
            fail(th.getMessage());
        });
        final NetServer createNetServer = this.vertx.createNetServer(new NetServerOptions().setPort(1234).setHost("localhost"));
        createNetServer.connectStream().toObservable().subscribe(new Subscriber<NetSocket>() { // from class: io.vertx.rx.java.test.CoreApiTest.5
            public void onNext(NetSocket netSocket) {
                netSocket.toObservable().subscribe(new Observer<Buffer>() { // from class: io.vertx.rx.java.test.CoreApiTest.5.1
                    LinkedList<Buffer> buffers = new LinkedList<>();

                    public void onNext(Buffer buffer) {
                        this.buffers.add(buffer);
                    }

                    public void onError(Throwable th2) {
                        CoreApiTest.this.fail(th2.getMessage());
                    }

                    public void onCompleted() {
                        CoreApiTest.this.assertEquals(1L, this.buffers.size());
                        CoreApiTest.this.assertEquals("foo", this.buffers.get(0).toString("UTF-8"));
                        createNetServer.close();
                    }
                });
            }

            public void onError(Throwable th2) {
                CoreApiTest.this.fail(th2.getMessage());
            }

            public void onCompleted() {
                CoreApiTest.this.testComplete();
            }
        });
        createNetServer.listen(observableFuture.toHandler());
        await();
    }

    @Test
    public void testObservableWebSocket() {
        ObservableFuture observableFuture = RxHelper.observableFuture();
        observableFuture.subscribe(httpServer -> {
            this.vertx.createHttpClient(new HttpClientOptions()).webSocket(8080, "localhost", "/some/path", asyncResult -> {
                if (!asyncResult.succeeded()) {
                    fail(asyncResult.cause().getMessage());
                    return;
                }
                WebSocket webSocket = (WebSocket) asyncResult.result();
                webSocket.write(Buffer.buffer("foo"));
                webSocket.close();
            });
        });
        final HttpServer createHttpServer = this.vertx.createHttpServer(new HttpServerOptions().setPort(8080).setHost("localhost"));
        createHttpServer.webSocketStream().toObservable().subscribe(new Subscriber<ServerWebSocket>() { // from class: io.vertx.rx.java.test.CoreApiTest.6
            public void onNext(ServerWebSocket serverWebSocket) {
                serverWebSocket.toObservable().subscribe(new Observer<Buffer>() { // from class: io.vertx.rx.java.test.CoreApiTest.6.1
                    LinkedList<Buffer> buffers = new LinkedList<>();

                    public void onNext(Buffer buffer) {
                        this.buffers.add(buffer);
                    }

                    public void onError(Throwable th) {
                        CoreApiTest.this.fail(th.getMessage());
                    }

                    public void onCompleted() {
                        CoreApiTest.this.assertEquals(1L, this.buffers.size());
                        CoreApiTest.this.assertEquals("foo", this.buffers.get(0).toString("UTF-8"));
                        createHttpServer.close();
                    }
                });
            }

            public void onError(Throwable th) {
                CoreApiTest.this.fail(th.getMessage());
            }

            public void onCompleted() {
                CoreApiTest.this.testComplete();
            }
        });
        createHttpServer.listen(observableFuture.toHandler());
        await();
    }

    @Test
    public void testObservableHttpRequest() {
        final HttpServer createHttpServer = this.vertx.createHttpServer(new HttpServerOptions().setPort(8080).setHost("localhost"));
        createHttpServer.requestStream().toObservable().subscribe(new Subscriber<HttpServerRequest>() { // from class: io.vertx.rx.java.test.CoreApiTest.7
            public void onNext(HttpServerRequest httpServerRequest) {
                httpServerRequest.toObservable().subscribe(new Observer<Buffer>() { // from class: io.vertx.rx.java.test.CoreApiTest.7.1
                    LinkedList<Buffer> buffers = new LinkedList<>();

                    public void onNext(Buffer buffer) {
                        this.buffers.add(buffer);
                    }

                    public void onError(Throwable th) {
                        CoreApiTest.this.fail(th.getMessage());
                    }

                    public void onCompleted() {
                        CoreApiTest.this.assertEquals(1L, this.buffers.size());
                        CoreApiTest.this.assertEquals("foo", this.buffers.get(0).toString("UTF-8"));
                        createHttpServer.close();
                    }
                });
            }

            public void onError(Throwable th) {
                CoreApiTest.this.fail(th.getMessage());
            }

            public void onCompleted() {
                CoreApiTest.this.testComplete();
            }
        });
        createHttpServer.rxListen().subscribe(httpServer -> {
            this.vertx.createHttpClient(new HttpClientOptions()).rxRequest(HttpMethod.PUT, 8080, "localhost", "/some/path").subscribe(httpClientRequest -> {
                httpClientRequest.putHeader("Content-Length", "3");
                httpClientRequest.write("foo");
            });
        }, th -> {
            fail(th.getMessage());
        });
        await();
    }

    @Test
    public void testConcatOperator() {
        Observable concat = Observable.concat(this.vertx.timerStream(100L).toObservable(), this.vertx.timerStream(100L).toObservable());
        AtomicInteger atomicInteger = new AtomicInteger();
        concat.subscribe(l -> {
            atomicInteger.incrementAndGet();
        }, th -> {
            fail();
        }, () -> {
            assertEquals(2L, atomicInteger.get());
            testComplete();
        });
        await();
    }

    @Test
    public void testScheduledTimer() {
        this.vertx.runOnContext(r9 -> {
            final long currentTimeMillis = System.currentTimeMillis();
            final Context currentContext = Vertx.currentContext();
            Observable.timer(100L, 100L, TimeUnit.MILLISECONDS, io.vertx.rxjava.core.RxHelper.scheduler(this.vertx)).take(10).subscribe(new Observer<Long>() { // from class: io.vertx.rx.java.test.CoreApiTest.8
                public void onNext(Long l) {
                    CoreApiTest.this.assertEquals(currentContext.getDelegate(), Vertx.currentContext().getDelegate());
                }

                public void onError(Throwable th) {
                    CoreApiTest.this.fail("unexpected failure");
                }

                public void onCompleted() {
                    long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
                    CoreApiTest.this.assertTrue("Was expecting to have time taken | " + currentTimeMillis2 + " -  1000 | < 200", Math.abs(currentTimeMillis2 - 1000) < 1000);
                    CoreApiTest.this.testComplete();
                }
            });
        });
        await();
    }

    @Test
    public void testScheduledBuffer() {
        this.vertx.runOnContext(r9 -> {
            final long currentTimeMillis = System.currentTimeMillis();
            final Context currentContext = Vertx.currentContext();
            Observable.timer(10L, 10L, TimeUnit.MILLISECONDS, io.vertx.rxjava.core.RxHelper.scheduler(this.vertx)).buffer(100L, TimeUnit.MILLISECONDS, io.vertx.rxjava.core.RxHelper.scheduler(this.vertx)).take(10).subscribe(new Observer<List<Long>>() { // from class: io.vertx.rx.java.test.CoreApiTest.9
                private int eventCount = 0;

                public void onNext(List<Long> list) {
                    this.eventCount++;
                    CoreApiTest.this.assertEquals(currentContext.getDelegate(), Vertx.currentContext().getDelegate());
                }

                public void onError(Throwable th) {
                    CoreApiTest.this.fail("unexpected failure");
                }

                public void onCompleted() {
                    long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
                    CoreApiTest.this.assertEquals(10L, this.eventCount);
                    CoreApiTest.this.assertTrue("Was expecting to have time taken | " + currentTimeMillis2 + " -  1000 | < 200", Math.abs(currentTimeMillis2 - 1000) < 1000);
                    CoreApiTest.this.testComplete();
                }
            });
        });
        await();
    }

    @Test
    public void testTimeMap() {
        this.vertx.runOnContext(r7 -> {
            final Context currentContext = Vertx.currentContext();
            EventBus eventBus = this.vertx.eventBus();
            ReadStream bodyStream = eventBus.localConsumer("the-address").bodyStream();
            bodyStream.toObservable().buffer(500L, TimeUnit.MILLISECONDS, io.vertx.rxjava.core.RxHelper.scheduler(this.vertx)).map(list -> {
                return (String) list.stream().reduce("", (str, str2) -> {
                    return str + str2;
                });
            }).subscribe(new Subscriber<String>() { // from class: io.vertx.rx.java.test.CoreApiTest.10
                boolean first = true;

                public void onNext(String str) {
                    if (this.first) {
                        this.first = false;
                        CoreApiTest.this.assertEquals(currentContext.getDelegate(), Vertx.currentContext().getDelegate());
                        CoreApiTest.this.assertEquals("msg1msg2msg3", str);
                        CoreApiTest.this.testComplete();
                    }
                }

                public void onError(Throwable th) {
                    CoreApiTest.this.fail(th.getMessage());
                }

                public void onCompleted() {
                }
            });
            eventBus.send("the-address", "msg1");
            eventBus.send("the-address", "msg2");
            eventBus.send("the-address", "msg3");
        });
        await();
    }

    @Test
    public void testObserverToFuture() {
        final HttpServer requestHandler = this.vertx.createHttpServer(new HttpServerOptions().setPort(8080)).requestHandler(httpServerRequest -> {
        });
        final AtomicInteger atomicInteger = new AtomicInteger();
        requestHandler.rxListen().subscribe(new Observer<HttpServer>() { // from class: io.vertx.rx.java.test.CoreApiTest.11
            public void onCompleted() {
                requestHandler.close();
                CoreApiTest.this.assertEquals(1L, atomicInteger.get());
                CoreApiTest.this.testComplete();
            }

            public void onError(Throwable th) {
                CoreApiTest.this.fail(th.getMessage());
            }

            public void onNext(HttpServer httpServer) {
                atomicInteger.incrementAndGet();
            }
        });
        await();
    }

    @Test
    public void testObserverToHandler() throws Exception {
        final AtomicInteger atomicInteger = new AtomicInteger();
        this.vertx.setTimer(1L, RxHelper.toHandler(new Observer<Long>() { // from class: io.vertx.rx.java.test.CoreApiTest.12
            public void onCompleted() {
                CoreApiTest.this.assertEquals(1L, atomicInteger.get());
                CoreApiTest.this.testComplete();
            }

            public void onError(Throwable th) {
                CoreApiTest.this.fail(th.getMessage());
            }

            public void onNext(Long l) {
                atomicInteger.incrementAndGet();
            }
        }));
        await();
    }

    @Test
    public void testHttpClient() {
        HttpServer createHttpServer = this.vertx.createHttpServer(new HttpServerOptions().setPort(8080));
        createHttpServer.requestStream().handler(httpServerRequest -> {
            httpServerRequest.response().setChunked(true).end("some_content");
        });
        try {
            createHttpServer.listen(asyncResult -> {
                this.vertx.createHttpClient(new HttpClientOptions()).rxRequest(HttpMethod.GET, 8080, "localhost", "/the_uri").flatMap((v0) -> {
                    return v0.rxSend();
                }).subscribe(httpClientResponse -> {
                    Buffer buffer = Buffer.buffer();
                    Observable observable = httpClientResponse.toObservable();
                    Objects.requireNonNull(buffer);
                    observable.forEach(buffer::appendBuffer, th -> {
                        fail();
                    }, () -> {
                        assertEquals("some_content", buffer.toString("UTF-8"));
                        testComplete();
                    });
                });
            });
            await();
        } finally {
            createHttpServer.close();
        }
    }

    @Test
    public void testHttpClientFlatMap() {
        HttpServer createHttpServer = this.vertx.createHttpServer(new HttpServerOptions().setPort(8080));
        createHttpServer.requestStream().handler(httpServerRequest -> {
            httpServerRequest.response().setChunked(true).end("some_content");
        });
        createHttpServer.listen(asyncResult -> {
            Single flatMap = this.vertx.createHttpClient(new HttpClientOptions()).rxRequest(HttpMethod.GET, 8080, "localhost", "/the_uri").flatMap((v0) -> {
                return v0.rxSend();
            });
            Buffer buffer = Buffer.buffer();
            Observable flatMapObservable = flatMap.flatMapObservable((v0) -> {
                return v0.toObservable();
            });
            Objects.requireNonNull(buffer);
            flatMapObservable.forEach(buffer::appendBuffer, th -> {
                fail();
            }, () -> {
                createHttpServer.close();
                assertEquals("some_content", buffer.toString("UTF-8"));
                testComplete();
            });
        });
        await();
    }

    @Test
    public void testHttpClientFlatMapUnmarshallPojo() {
        HttpServer createHttpServer = this.vertx.createHttpServer(new HttpServerOptions().setPort(8080));
        createHttpServer.requestStream().handler(httpServerRequest -> {
            httpServerRequest.response().setChunked(true).end("{\"foo\":\"bar\"}");
        });
        createHttpServer.listen(asyncResult -> {
            Single flatMap = this.vertx.createHttpClient(new HttpClientOptions()).rxRequest(HttpMethod.GET, 8080, "localhost", "/the_uri").flatMap((v0) -> {
                return v0.rxSend();
            });
            ArrayList arrayList = new ArrayList();
            Observable lift = flatMap.flatMapObservable((v0) -> {
                return v0.toObservable();
            }).lift(io.vertx.rxjava.core.RxHelper.unmarshaller(SimplePojo.class));
            Objects.requireNonNull(arrayList);
            lift.forEach((v1) -> {
                r1.add(v1);
            }, th -> {
                fail();
            }, () -> {
                createHttpServer.close();
                assertEquals(Arrays.asList(new SimplePojo("bar")), arrayList);
                testComplete();
            });
        });
        await();
    }

    @Test
    public void testHttpClientConnectionFailure() {
        this.vertx.createHttpClient(new HttpClientOptions()).rxRequest(HttpMethod.GET, 9998, "255.255.255.255", "/the_uri").flatMap((v0) -> {
            return v0.rxSend();
        }).subscribe(httpClientResponse -> {
            fail();
        }, th -> {
            testComplete();
        });
        await();
    }

    @Test
    public void testHttpClientConnectionFailureFlatMap() {
        this.vertx.createHttpClient(new HttpClientOptions()).rxRequest(HttpMethod.GET, 9998, "255.255.255.255", "/the_uri").flatMap((v0) -> {
            return v0.rxSend();
        }).flatMapObservable((v0) -> {
            return v0.toObservable();
        }).forEach(buffer -> {
            fail();
        }, th -> {
            testComplete();
        }, this::fail);
        await();
    }

    @Test
    public void testWebsocketClient() {
        HttpServer createHttpServer = this.vertx.createHttpServer(new HttpServerOptions().setPort(8080));
        createHttpServer.webSocketStream().handler(serverWebSocket -> {
            serverWebSocket.write(Buffer.buffer("some_content"));
            serverWebSocket.close();
        });
        createHttpServer.listen(asyncResult -> {
            this.vertx.createHttpClient(new HttpClientOptions()).webSocket(8080, "localhost", "/the_uri", asyncResult -> {
                if (asyncResult.succeeded()) {
                    WebSocket webSocket = (WebSocket) asyncResult.result();
                    Buffer buffer = Buffer.buffer();
                    Observable observable = webSocket.toObservable();
                    Objects.requireNonNull(buffer);
                    observable.forEach(buffer::appendBuffer, th -> {
                        fail();
                    }, () -> {
                        createHttpServer.close();
                        assertEquals("some_content", buffer.toString("UTF-8"));
                        testComplete();
                    });
                }
            });
        });
        await();
    }

    @Test
    public void testWebsocketClientFlatMap() {
        HttpServer createHttpServer = this.vertx.createHttpServer(new HttpServerOptions().setPort(8080));
        createHttpServer.webSocketStream().handler(serverWebSocket -> {
            serverWebSocket.write(Buffer.buffer("some_content"));
            serverWebSocket.close();
        });
        createHttpServer.listen(asyncResult -> {
            HttpClient createHttpClient = this.vertx.createHttpClient(new HttpClientOptions());
            Buffer buffer = Buffer.buffer();
            Observable flatMapObservable = createHttpClient.rxWebSocket(8080, "localhost", "/the_uri").flatMapObservable((v0) -> {
                return v0.toObservable();
            });
            Objects.requireNonNull(buffer);
            flatMapObservable.forEach(buffer::appendBuffer, th -> {
                fail();
            }, () -> {
                createHttpServer.close();
                assertEquals("some_content", buffer.toString("UTF-8"));
                testComplete();
            });
        });
        await();
    }

    @Test
    public void testDeployVerticle() throws Exception {
        final CountDownLatch countDownLatch = new CountDownLatch(2);
        io.vertx.rxjava.core.RxHelper.deployVerticle(this.vertx, new AbstractVerticle() { // from class: io.vertx.rx.java.test.CoreApiTest.13
            public void start() {
                countDownLatch.countDown();
            }
        }).subscribe(str -> {
            countDownLatch.countDown();
        });
        awaitLatch(countDownLatch);
    }

    @Test
    public void testRecordParser() {
        Single rxOpen = this.vertx.fileSystem().rxOpen("src/test/resources/test.txt", new OpenOptions());
        waitFor(5);
        rxOpen.map(asyncFile -> {
            return RecordParser.newDelimited("\n", asyncFile.toObservable());
        }).flatMapObservable((v0) -> {
            return v0.toObservable();
        }).doOnNext(buffer -> {
            complete();
        }).doOnCompleted(() -> {
            complete();
        }).ignoreElements().toCompletable().subscribe(() -> {
            complete();
        }, this::fail);
        await();
    }
}
