package io.vertx.servicediscovery.zookeeper;

import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.internal.logging.Logger;
import io.vertx.core.internal.logging.LoggerFactory;
import io.vertx.core.json.JsonObject;
import io.vertx.servicediscovery.Record;
import io.vertx.servicediscovery.spi.ServiceImporter;
import io.vertx.servicediscovery.spi.ServicePublisher;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Stream;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.zookeeper.KeeperException;

/* loaded from: input_file:io/vertx/servicediscovery/zookeeper/ZookeeperServiceImporter.class */
public class ZookeeperServiceImporter implements ServiceImporter, TreeCacheListener {
    private static final Logger LOGGER = LoggerFactory.getLogger(ZookeeperServiceImporter.class);
    private ServicePublisher publisher;
    private CuratorFramework client;
    private ServiceDiscovery<JsonObject> discovery;
    private TreeCache cache;
    private volatile boolean started;
    private final Set<RegistrationHolder<ServiceInstance<JsonObject>>> registrations = ConcurrentHashMap.newKeySet();

    public void start(Vertx vertx, ServicePublisher servicePublisher, JsonObject jsonObject, Promise<Void> promise) {
        this.publisher = servicePublisher;
        String str = (String) Objects.requireNonNull(jsonObject.getString("connection"));
        int intValue = jsonObject.getInteger("maxRetries", 3).intValue();
        int intValue2 = jsonObject.getInteger("baseSleepTimeBetweenRetries", 1000).intValue();
        String string = jsonObject.getString("basePath", "/discovery");
        boolean booleanValue = jsonObject.getBoolean("canBeReadOnly", true).booleanValue();
        int intValue3 = jsonObject.getInteger("connectionTimeoutMs", 1000).intValue();
        vertx.executeBlocking(() -> {
            this.client = CuratorFrameworkFactory.builder().canBeReadOnly(booleanValue).connectString(str).connectionTimeoutMs(intValue3).retryPolicy(new ExponentialBackoffRetry(intValue2, intValue)).build();
            this.client.start();
            this.discovery = ServiceDiscoveryBuilder.builder(JsonObject.class).client(this.client).basePath(string).serializer(new JsonObjectSerializer()).watchInstances(true).build();
            this.discovery.start();
            this.cache = TreeCache.newBuilder(this.client, string).build();
            this.cache.start();
            this.cache.getListenable().addListener(this);
            return null;
        }).onComplete(asyncResult -> {
            if (asyncResult.failed()) {
                promise.fail(asyncResult.cause());
                return;
            }
            Promise<Void> promise2 = Promise.promise();
            promise2.future().onComplete(asyncResult -> {
                if (asyncResult.failed()) {
                    promise.fail(asyncResult.cause());
                } else {
                    promise.complete((Object) null);
                }
            });
            compute(promise2);
        });
    }

    private synchronized void compute(Promise<Void> promise) {
        ArrayList arrayList = new ArrayList();
        try {
            Iterator it = this.discovery.queryForNames().iterator();
            while (it.hasNext()) {
                arrayList.addAll(this.discovery.queryForInstances((String) it.next()));
            }
        } catch (KeeperException.NoNodeException e) {
        } catch (Exception e2) {
            if (promise == null) {
                LOGGER.error("Unable to retrieve service instances from Zookeeper", e2);
                return;
            }
            promise.fail(e2);
        }
        HashSet hashSet = new HashSet(this.registrations);
        HashSet hashSet2 = new HashSet(arrayList);
        ArrayList arrayList2 = new ArrayList();
        Stream map = RegistrationHolder.filter(hashSet, arrayList).stream().map(registrationHolder -> {
            Promise promise2 = Promise.promise();
            this.publisher.unpublish(registrationHolder.record().getRegistration()).onComplete(asyncResult -> {
                this.registrations.remove(registrationHolder);
                if (asyncResult.succeeded()) {
                    promise2.complete((Object) null);
                } else {
                    promise2.fail(asyncResult.cause());
                }
            });
            return promise2.future();
        });
        Objects.requireNonNull(arrayList2);
        map.forEach((v1) -> {
            r1.add(v1);
        });
        Stream map2 = RegistrationHolder.filter((Set) hashSet2, (Set) this.registrations).stream().map(serviceInstance -> {
            Promise promise2 = Promise.promise();
            this.publisher.publish(createRecordForInstance(serviceInstance)).onComplete(asyncResult -> {
                if (!asyncResult.succeeded()) {
                    promise2.fail(asyncResult.cause());
                } else {
                    this.registrations.add(new RegistrationHolder<>((Record) asyncResult.result(), serviceInstance));
                    promise2.complete((Object) null);
                }
            });
            return promise2.future();
        });
        Objects.requireNonNull(arrayList2);
        map2.forEach((v1) -> {
            r1.add(v1);
        });
        if (promise != null) {
            Future.all(arrayList2).onComplete(asyncResult -> {
                if (asyncResult.succeeded()) {
                    promise.complete((Object) null);
                } else {
                    promise.fail(asyncResult.cause());
                }
            });
        }
    }

    static Record createRecordForInstance(ServiceInstance<JsonObject> serviceInstance) {
        Record record = new Record();
        record.setName(serviceInstance.getName());
        JsonObject jsonObject = (JsonObject) serviceInstance.getPayload();
        record.setMetadata(jsonObject);
        record.getMetadata().put("zookeeper-service-type", serviceInstance.getServiceType().toString());
        record.getMetadata().put("zookeeper-address", serviceInstance.getAddress());
        record.getMetadata().put("zookeeper-registration-time", Long.valueOf(serviceInstance.getRegistrationTimeUTC()));
        record.getMetadata().put("zookeeper-port", serviceInstance.getPort());
        record.getMetadata().put("zookeeper-ssl-port", serviceInstance.getSslPort());
        record.getMetadata().put("zookeeper-id", serviceInstance.getId());
        record.setLocation(new JsonObject());
        if (serviceInstance.getUriSpec() != null) {
            record.getLocation().put("endpoint", serviceInstance.buildUriSpec());
        } else {
            record.getLocation().put("endpoint", serviceInstance.getSslPort() != null ? "http" + "s://" + serviceInstance.getAddress() + ":" + serviceInstance.getSslPort() : serviceInstance.getPort() != null ? "http" + "s://" + serviceInstance.getAddress() + ":" + serviceInstance.getPort() : "http" + "://" + serviceInstance.getAddress());
        }
        if (serviceInstance.getPort() != null) {
            record.getLocation().put("port", serviceInstance.getPort());
        }
        if (serviceInstance.getSslPort() != null) {
            record.getLocation().put("ssl-port", serviceInstance.getSslPort());
        }
        if (serviceInstance.getAddress() != null) {
            record.getLocation().put("address", serviceInstance.getAddress());
        }
        record.setType(jsonObject.getString("service-type", "unknown"));
        return record;
    }

    public void close(Handler<Void> handler) {
        Promise<Void> promise = Promise.promise();
        unregisterAllServices(promise);
        promise.future().onComplete(asyncResult -> {
            try {
                this.cache.close();
                this.discovery.close();
                this.client.close();
            } catch (Exception e) {
            }
            handler.handle((Object) null);
        });
    }

    public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) {
        if (treeCacheEvent.getType() == TreeCacheEvent.Type.INITIALIZED) {
            this.started = true;
        } else if (this.started) {
            compute(null);
        }
    }

    private synchronized void unregisterAllServices(Promise<Void> promise) {
        ArrayList arrayList = new ArrayList(this.registrations.size());
        new HashSet(this.registrations).forEach(registrationHolder -> {
            Promise promise2 = Promise.promise();
            this.publisher.unpublish(registrationHolder.record().getRegistration()).onComplete(promise2);
            arrayList.add(promise2.future());
        });
        this.registrations.clear();
        Future.all(arrayList).onComplete(asyncResult -> {
            if (asyncResult.failed()) {
                promise.fail(asyncResult.cause());
            } else {
                promise.complete();
            }
        });
    }
}
