package io.vertx.servicediscovery.backend.zookeeper;

import com.jayway.awaitility.Awaitility;
import io.vertx.core.json.JsonObject;
import io.vertx.servicediscovery.Record;
import io.vertx.servicediscovery.Status;
import io.vertx.servicediscovery.spi.ServiceDiscoveryBackend;
import io.vertx.servicediscovery.spi.ServiceDiscoveryBackendTest;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingServer;
import org.apache.zookeeper.KeeperException;
import org.assertj.core.api.Assertions;
import org.hamcrest.core.Is;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:io/vertx/servicediscovery/backend/zookeeper/ZookeeperBackendServiceTest.class */
public class ZookeeperBackendServiceTest extends ServiceDiscoveryBackendTest {
    private static final Integer DEFAULT_PORT = 2181;
    private static TestingServer server;

    @BeforeClass
    public static void startZookeeper() throws Exception {
        server = new TestingServer(DEFAULT_PORT.intValue());
        server.start();
    }

    @AfterClass
    public static void stopZookeeper() throws IOException {
        server.stop();
    }

    protected ServiceDiscoveryBackend createBackend() {
        ZookeeperBackendService zookeeperBackendService = new ZookeeperBackendService();
        zookeeperBackendService.init(this.vertx, new JsonObject().put("connection", server.getConnectString()).put("baseSleepTimeBetweenRetries", 10).put("connectionTimeoutMs", 1000));
        return zookeeperBackendService;
    }

    @Test
    public void testReconnection() throws Exception {
        Record status = new Record().setName("my-service").setStatus(Status.UP);
        Assertions.assertThat(status.getRegistration()).isNull();
        AtomicReference atomicReference = new AtomicReference();
        this.backend.store(status, asyncResult -> {
            if (!asyncResult.succeeded()) {
                asyncResult.cause().printStackTrace();
            }
            atomicReference.set(asyncResult.result());
        });
        Awaitility.await().until(() -> {
            return Boolean.valueOf(atomicReference.get() != null);
        });
        Assertions.assertThat(((Record) atomicReference.get()).getName()).isEqualToIgnoringCase("my-service");
        Assertions.assertThat(((Record) atomicReference.get()).getRegistration()).isNotNull();
        Record record = (Record) atomicReference.get();
        atomicReference.set(null);
        this.backend.getRecord(record.getRegistration(), asyncResult2 -> {
            atomicReference.set(asyncResult2.result());
        });
        Awaitility.await().until(() -> {
            return Boolean.valueOf(atomicReference.get() != null);
        });
        Assertions.assertThat(((Record) atomicReference.get()).getName()).isEqualToIgnoringCase("my-service");
        Assertions.assertThat(((Record) atomicReference.get()).getRegistration()).isNotNull();
        server.stop();
        AtomicReference atomicReference2 = new AtomicReference();
        this.backend.getRecord(record.getRegistration(), asyncResult3 -> {
            atomicReference2.set(asyncResult3.cause());
        });
        Awaitility.await().until(() -> {
            return Boolean.valueOf(atomicReference2.get() != null);
        });
        Assertions.assertThat((Throwable) atomicReference2.get()).isInstanceOf(KeeperException.ConnectionLossException.class);
        server.restart();
        CuratorFramework build = CuratorFrameworkFactory.builder().connectString(server.getConnectString()).retryPolicy(new ExponentialBackoffRetry(10, 3)).build();
        build.start();
        build.blockUntilConnected();
        atomicReference.set(null);
        this.backend.getRecord(record.getRegistration(), asyncResult4 -> {
            if (asyncResult4.failed()) {
                asyncResult4.cause().printStackTrace();
            }
            atomicReference.set(asyncResult4.result());
        });
        Awaitility.await().until(() -> {
            return Boolean.valueOf(atomicReference.get() != null);
        });
        Assertions.assertThat(((Record) atomicReference.get()).getName()).isEqualToIgnoringCase("my-service");
        Assertions.assertThat(((Record) atomicReference.get()).getRegistration()).isNotNull();
        server.stop();
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        atomicBoolean.set(false);
        this.backend.remove(record, asyncResult5 -> {
            atomicBoolean.set(asyncResult5.failed());
        });
        Awaitility.await().untilAtomic(atomicBoolean, Is.is(true));
        server.restart();
        atomicBoolean.set(false);
        this.backend.remove(record, asyncResult6 -> {
            atomicBoolean.set(asyncResult6.succeeded());
        });
        Awaitility.await().untilAtomic(atomicBoolean, Is.is(true));
        build.close();
    }
}
