package io.vertx.servicediscovery.zookeeper;

import com.jayway.awaitility.Awaitility;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.impl.NoStackTraceThrowable;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.Repeat;
import io.vertx.ext.unit.junit.RepeatRule;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import io.vertx.servicediscovery.Record;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.TestingServer;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.curator.x.discovery.UriSpec;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;

@RunWith(VertxUnitRunner.class)
/* loaded from: input_file:io/vertx/servicediscovery/zookeeper/ZookeeperBridgeTest.class */
public class ZookeeperBridgeTest {

    @Rule
    public RepeatRule rule = new RepeatRule();
    private TestingServer zkTestServer;
    private CuratorFramework cli;
    private ServiceDiscovery<String> discovery;
    private Vertx vertx;
    private io.vertx.servicediscovery.ServiceDiscovery sd;

    @Before
    public void startZookeeper() throws Exception {
        this.zkTestServer = new TestingServer(2181);
        this.cli = CuratorFrameworkFactory.newClient(this.zkTestServer.getConnectString(), new RetryOneTime(2000));
        this.cli.start();
        this.discovery = ServiceDiscoveryBuilder.builder(String.class).client(this.cli).basePath("/discovery").watchInstances(true).build();
        this.discovery.start();
        this.vertx = Vertx.vertx();
        this.sd = io.vertx.servicediscovery.ServiceDiscovery.create(this.vertx);
    }

    @After
    public void stopZookeeper() throws IOException {
        this.discovery.close();
        this.sd.close();
        this.cli.close();
        this.zkTestServer.stop();
        this.vertx.close();
    }

    @Test
    public void testRegistration(TestContext testContext) throws Exception {
        Async async = testContext.async();
        this.discovery.registerService(ServiceInstance.builder().name("foo-service").payload(new JsonObject().put("foo", "bar").encodePrettily()).port((int) (65535.0d * Math.random())).uriSpec(new UriSpec("{scheme}://foo.com:{port}")).build());
        this.sd.registerServiceImporter(new ZookeeperServiceImporter(), new JsonObject().put("connection", this.zkTestServer.getConnectString()), asyncResult -> {
            if (asyncResult.failed()) {
                asyncResult.cause().printStackTrace();
            }
            testContext.assertTrue(asyncResult.succeeded());
            this.sd.getRecords(record -> {
                return true;
            }, asyncResult -> {
                if (asyncResult.failed()) {
                    asyncResult.cause().printStackTrace();
                }
                testContext.assertTrue(asyncResult.succeeded());
                testContext.assertTrue(((List) asyncResult.result()).size() == 1);
                testContext.assertEquals("foo-service", ((Record) ((List) asyncResult.result()).get(0)).getName());
                async.complete();
            });
        });
    }

    @Test
    public void testServiceArrival(TestContext testContext) throws Exception {
        Async async = testContext.async();
        ServiceInstance build = ServiceInstance.builder().name("foo-service").payload(new JsonObject().put("foo", "bar").encodePrettily()).port(8080).uriSpec(new UriSpec("{scheme}://foo.com:{port}")).build();
        this.sd.registerServiceImporter(new ZookeeperServiceImporter(), new JsonObject().put("connection", this.zkTestServer.getConnectString()), testContext.asyncAssertSuccess(r12 -> {
            this.sd.getRecords(record -> {
                return true;
            }, testContext.asyncAssertSuccess(list -> {
                testContext.assertTrue(list.size() == 0);
                this.vertx.executeBlocking(promise -> {
                    try {
                        this.discovery.registerService(build);
                        promise.complete();
                    } catch (Exception e) {
                        promise.fail(e);
                    }
                }, testContext.asyncAssertSuccess(obj -> {
                    waitUntil(() -> {
                        return serviceLookup(this.sd, 1);
                    }, testContext.asyncAssertSuccess(list -> {
                        async.complete();
                    }));
                }));
            }));
        }));
    }

    @Test
    public void testArrivalDepartureAndComeBack(TestContext testContext) throws Exception {
        Async async = testContext.async();
        ServiceInstance build = ServiceInstance.builder().name("foo-service").payload(new JsonObject().put("foo", "bar").encodePrettily()).port(8080).uriSpec(new UriSpec("{scheme}://foo.com:{port}")).build();
        this.sd.registerServiceImporter(new ZookeeperServiceImporter(), new JsonObject().put("connection", this.zkTestServer.getConnectString()), testContext.asyncAssertSuccess(r12 -> {
            this.sd.getRecords(record -> {
                return true;
            }, testContext.asyncAssertSuccess(list -> {
                testContext.assertTrue(list.size() == 0);
                this.vertx.executeBlocking(promise -> {
                    try {
                        this.discovery.registerService(build);
                        promise.complete();
                    } catch (Exception e) {
                        promise.fail(e);
                    }
                }, testContext.asyncAssertSuccess(obj -> {
                    waitUntil(() -> {
                        return serviceLookup(this.sd, 1);
                    }, testContext.asyncAssertSuccess(list -> {
                        this.vertx.executeBlocking(promise2 -> {
                            try {
                                this.discovery.unregisterService(build);
                                promise2.complete();
                            } catch (Exception e) {
                                promise2.fail(e);
                            }
                        }, testContext.asyncAssertSuccess(obj -> {
                            waitUntil(() -> {
                                return serviceLookup(this.sd, 0);
                            }, testContext.asyncAssertSuccess(list -> {
                                this.vertx.executeBlocking(promise3 -> {
                                    try {
                                        this.discovery.registerService(build);
                                        promise3.complete();
                                    } catch (Exception e) {
                                        promise3.fail(e);
                                    }
                                }, asyncResult -> {
                                    waitUntil(() -> {
                                        return serviceLookup(this.sd, 1);
                                    }, testContext.asyncAssertSuccess(list -> {
                                        async.complete();
                                    }));
                                });
                            }));
                        }));
                    }));
                }));
            }));
        }));
    }

    private Future<List<Record>> serviceLookup(io.vertx.servicediscovery.ServiceDiscovery serviceDiscovery, int i) {
        Promise promise = Promise.promise();
        serviceDiscovery.getRecords(record -> {
            return true;
        }, asyncResult -> {
            if (asyncResult.failed()) {
                NoStackTraceThrowable noStackTraceThrowable = new NoStackTraceThrowable("service lookup failed: " + asyncResult.cause().getMessage());
                noStackTraceThrowable.initCause(asyncResult.cause());
                promise.fail(noStackTraceThrowable);
            } else if (((List) asyncResult.result()).size() != i) {
                promise.fail("service lookup failed: unexpected records " + asyncResult.result() + " != " + i);
            } else {
                promise.complete((List) asyncResult.result());
            }
        });
        return promise.future();
    }

    @Test
    @Repeat(10)
    public void testServiceArrivalWithSameName(TestContext testContext) throws Exception {
        Async async = testContext.async();
        UriSpec uriSpec = new UriSpec("{scheme}://foo.com:{port}");
        ServiceInstance build = ServiceInstance.builder().name("foo-service").payload(new JsonObject().put("foo", "bar").encodePrettily()).port(8080).uriSpec(uriSpec).build();
        ServiceInstance build2 = ServiceInstance.builder().name("foo-service").payload(new JsonObject().put("foo", "bar2").encodePrettily()).port(8081).uriSpec(uriSpec).build();
        this.discovery.registerService(build);
        this.sd.registerServiceImporter(new ZookeeperServiceImporter(), new JsonObject().put("connection", this.zkTestServer.getConnectString()), testContext.asyncAssertSuccess(r12 -> {
            waitUntil(() -> {
                return serviceLookup(this.sd, 1);
            }, testContext.asyncAssertSuccess(list -> {
                testContext.assertEquals(((Record) list.get(0)).getName(), "foo-service");
                this.vertx.executeBlocking(promise -> {
                    try {
                        this.discovery.registerService(build2);
                        promise.complete();
                    } catch (Exception e) {
                        promise.fail(e);
                    }
                }, testContext.asyncAssertSuccess(obj -> {
                    waitUntil(() -> {
                        return serviceLookup(this.sd, 2);
                    }, testContext.asyncAssertSuccess(list -> {
                        testContext.assertEquals(((Record) list.get(0)).getName(), "foo-service");
                        testContext.assertEquals(((Record) list.get(1)).getName(), "foo-service");
                        async.complete();
                    }));
                }));
            }));
        }));
    }

    private void fetchRecords(AtomicBoolean atomicBoolean, TestContext testContext) {
        this.sd.getRecords(record -> {
            return true;
        }, asyncResult -> {
            if (!asyncResult.succeeded() || ((List) asyncResult.result()).size() != 1) {
                this.vertx.setTimer(100L, l -> {
                    fetchRecords(atomicBoolean, testContext);
                });
            } else {
                testContext.assertEquals("foo-service", ((Record) ((List) asyncResult.result()).get(0)).getName());
                atomicBoolean.set(true);
            }
        });
    }

    @Test
    public void testReconnection(TestContext testContext) throws Exception {
        this.discovery.registerService(ServiceInstance.builder().name("foo-service").payload(new JsonObject().put("foo", "bar").encodePrettily()).port((int) (65535.0d * Math.random())).uriSpec(new UriSpec("{scheme}://foo.com:{port}")).build());
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        this.sd.registerServiceImporter(new ZookeeperServiceImporter(), new JsonObject().put("connection", this.zkTestServer.getConnectString()).put("connectionTimeoutMs", 10).put("baseSleepTimeBetweenRetries", 10).put("maxRetries", 3), asyncResult -> {
            if (asyncResult.failed()) {
                asyncResult.cause().printStackTrace();
                testContext.fail(asyncResult.cause());
            } else {
                testContext.assertTrue(asyncResult.succeeded());
                fetchRecords(atomicBoolean, testContext);
            }
        });
        Awaitility.await().untilTrue(atomicBoolean);
        this.zkTestServer.stop();
        atomicBoolean.set(false);
        this.sd.getRecords(record -> {
            return true;
        }, asyncResult2 -> {
            if (asyncResult2.failed()) {
                asyncResult2.cause().printStackTrace();
            }
            testContext.assertTrue(asyncResult2.succeeded());
            testContext.assertTrue(((List) asyncResult2.result()).size() == 1);
            testContext.assertEquals("foo-service", ((Record) ((List) asyncResult2.result()).get(0)).getName());
            atomicBoolean.set(true);
        });
        Awaitility.await().untilAtomic(atomicBoolean, Matchers.is(true));
        this.zkTestServer.start();
        atomicBoolean.set(false);
        this.sd.getRecords(record2 -> {
            return true;
        }, asyncResult3 -> {
            if (asyncResult3.failed()) {
                asyncResult3.cause().printStackTrace();
            }
            testContext.assertTrue(asyncResult3.succeeded());
            testContext.assertTrue(((List) asyncResult3.result()).size() == 1);
            testContext.assertEquals("foo-service", ((Record) ((List) asyncResult3.result()).get(0)).getName());
            atomicBoolean.set(true);
        });
        Awaitility.await().untilAtomic(atomicBoolean, Matchers.is(true));
    }

    private <T> void waitUntil(Supplier<Future<T>> supplier, Handler<AsyncResult<T>> handler) {
        execute(0, supplier, handler);
    }

    private <T> void execute(int i, Supplier<Future<T>> supplier, Handler<AsyncResult<T>> handler) {
        supplier.get().onComplete(asyncResult -> {
            if (asyncResult.succeeded()) {
                handler.handle(Future.succeededFuture(asyncResult.result()));
            } else if (i > 20) {
                handler.handle(Future.failedFuture(new Exception("Max attempt reached", asyncResult.cause())));
            } else {
                this.vertx.setTimer(100L, l -> {
                    execute(i + 1, supplier, handler);
                });
            }
        });
    }
}
