package io.vertx.reactivex.test;

import io.reactivex.Flowable;
import io.reactivex.Single;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.file.OpenOptions;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.reactivex.core.AbstractVerticle;
import io.vertx.reactivex.core.RxHelper;
import io.vertx.reactivex.core.Vertx;
import io.vertx.reactivex.core.http.HttpClientAgent;
import io.vertx.reactivex.core.http.HttpResponseExpectation;
import io.vertx.reactivex.core.http.WebSocketClient;
import io.vertx.reactivex.core.parsetools.RecordParser;
import io.vertx.test.core.VertxTestBase;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.junit.Test;

/* loaded from: input_file:io/vertx/reactivex/test/CoreApiTest.class */
public class CoreApiTest extends VertxTestBase {
    private Vertx vertx;

    public void setUp() throws Exception {
        super.setUp();
        this.vertx = new Vertx(((VertxTestBase) this).vertx);
    }

    @Test
    public void testDeployVerticle() throws Exception {
        final CountDownLatch countDownLatch = new CountDownLatch(2);
        RxHelper.deployVerticle(this.vertx, new AbstractVerticle() { // from class: io.vertx.reactivex.test.CoreApiTest.1
            public void start() {
                countDownLatch.countDown();
            }
        }).subscribe(str -> {
            countDownLatch.countDown();
        });
        awaitLatch(countDownLatch);
    }

    @Test
    public void testWebSocket() {
        waitFor(2);
        AtomicLong atomicLong = new AtomicLong();
        this.vertx.createHttpServer(new HttpServerOptions().setIdleTimeout(2)).webSocketHandler(serverWebSocket -> {
            serverWebSocket.toFlowable().subscribe(buffer -> {
                atomicLong.incrementAndGet();
                serverWebSocket.writeTextMessage("pong");
            }, th -> {
                assertEquals(1L, atomicLong.get());
                complete();
            }, this::fail);
        }).rxListen(8080, "localhost").blockingGet();
        WebSocketClient createWebSocketClient = this.vertx.createWebSocketClient();
        AtomicLong atomicLong2 = new AtomicLong();
        createWebSocketClient.rxConnect(8080, "localhost", "/").doAfterSuccess(webSocket -> {
            webSocket.writeTextMessage("ping");
        }).flatMapPublisher((v0) -> {
            return v0.toFlowable();
        }).subscribe(buffer -> {
            atomicLong2.incrementAndGet();
        }, th -> {
            assertEquals(1L, atomicLong2.get());
            complete();
        }, this::fail);
        await();
    }

    @Test
    public void testPipeFailureShouldUnsubscribe() throws Exception {
        this.vertx.createHttpServer().requestHandler(httpServerRequest -> {
            httpServerRequest.response().send(Flowable.generate(() -> {
                return 0L;
            }, (l, emitter) -> {
                emitter.onNext(Buffer.buffer("Chunk " + l + "\n"));
                return Long.valueOf(l.longValue() + 1);
            }).delay(100L, TimeUnit.MILLISECONDS).rebatchRequests(1).doOnCancel(this::testComplete));
        }).rxListen(8080, "localhost").blockingGet();
        this.vertx.createHttpClient().request(HttpMethod.GET, 8080, "localhost", "/").onComplete(onSuccess(httpClientRequest -> {
            httpClientRequest.send().onComplete(onSuccess(httpClientResponse -> {
                AtomicInteger atomicInteger = new AtomicInteger();
                httpClientResponse.handler(buffer -> {
                    if (atomicInteger.incrementAndGet() > 5) {
                        httpClientResponse.request().reset();
                    }
                });
            }));
        }));
        await();
    }

    @Test
    public void testRecordParser() {
        Single rxOpen = this.vertx.fileSystem().rxOpen("src/test/resources/test.txt", new OpenOptions());
        waitFor(5);
        rxOpen.map(asyncFile -> {
            return RecordParser.newDelimited("\n", asyncFile.toFlowable());
        }).flatMapObservable((v0) -> {
            return v0.toObservable();
        }).doOnNext(buffer -> {
            complete();
        }).doOnComplete(() -> {
            complete();
        }).ignoreElements().subscribe(() -> {
            complete();
        }, this::fail);
        await();
    }

    @Test
    public void testHttpResponseExpectation() {
        waitFor(2);
        HttpClientAgent createHttpClient = this.vertx.createHttpClient();
        createHttpClient.rxRequest(HttpMethod.GET, 8080, "localhost", "/").flatMap(httpClientRequest -> {
            return httpClientRequest.rxSend();
        }).compose(HttpResponseExpectation.status(200)).flatMap(httpClientResponse -> {
            return httpClientResponse.rxBody();
        }).subscribe((buffer, th) -> {
            assertNull(th);
            assertEquals("Hello World", buffer.toString());
            complete();
        });
        createHttpClient.rxRequest(HttpMethod.GET, 8080, "localhost", "/").flatMap(httpClientRequest2 -> {
            return httpClientRequest2.rxSend();
        }).compose(HttpResponseExpectation.status(201)).flatMap(httpClientResponse2 -> {
            return httpClientResponse2.rxBody();
        }).subscribe((buffer2, th2) -> {
            assertNull(buffer2);
            assertNotNull(th2);
            assertEquals("Response status code 200 is not equal to 201", th2.getMessage());
            complete();
        });
        await();
    }
}
