package com.uid2.shared.vertx;

import com.uid2.shared.Const;
import com.uid2.shared.cloud.CloudStorageException;
import com.uid2.shared.cloud.ICloudStorage;
import com.uid2.shared.health.HealthComponent;
import com.uid2.shared.health.HealthManager;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.Metrics;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.AsyncResult;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.WorkerExecutor;
import io.vertx.core.eventbus.Message;
import io.vertx.core.json.JsonObject;
import java.io.InputStream;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/uid2/shared/vertx/CloudSyncVerticle.class */
public class CloudSyncVerticle extends AbstractVerticle {
    private static final Logger LOGGER = LoggerFactory.getLogger(CloudSyncVerticle.class);
    private final HealthComponent healthComponent;
    private final Counter counterRefreshed;
    private final Counter counterRefreshSkipped;
    private final Counter counterRefreshFailures;
    private final Counter counterDownloaded;
    private final Counter counterUploaded;
    private final Counter counterDownloadFailures;
    private final Counter counterUploadFailures;
    private final Gauge gaugeConsecutiveRefreshFailures;
    private final String name;
    private final ICloudStorage cloudStorage;
    private final ICloudStorage localStorage;
    private final ICloudSync cloudSync;
    private final int downloadThreads;
    private final int uploadThreads;
    private final AtomicInteger storeRefreshIsFailing;
    private final String eventRefresh;
    private final String eventRefreshed;
    private final String eventUpload;
    private final String eventDownloaded;
    private final HashSet<String> pendingUpload;
    private final HashSet<String> pendingDownload;
    private WorkerExecutor downloadExecutor;
    private WorkerExecutor uploadExecutor;
    private boolean isRefreshing;

    public CloudSyncVerticle(String str, ICloudStorage iCloudStorage, ICloudStorage iCloudStorage2, ICloudSync iCloudSync, JsonObject jsonObject) {
        this(str, iCloudStorage, iCloudStorage2, iCloudSync, jsonObject.getInteger(Const.Config.CloudDownloadThreadsProp).intValue(), jsonObject.getInteger(Const.Config.CloudUploadThreadsProp).intValue());
    }

    public CloudSyncVerticle(String str, ICloudStorage iCloudStorage, ICloudStorage iCloudStorage2, ICloudSync iCloudSync, int i, int i2) {
        this.storeRefreshIsFailing = new AtomicInteger(0);
        this.pendingUpload = new HashSet<>();
        this.pendingDownload = new HashSet<>();
        this.downloadExecutor = null;
        this.uploadExecutor = null;
        this.isRefreshing = false;
        this.healthComponent = HealthManager.instance.registerComponent("cloudsync-" + str);
        this.healthComponent.setHealthStatus(false, "not started");
        this.name = str;
        this.cloudStorage = iCloudStorage;
        this.localStorage = iCloudStorage2;
        this.cloudSync = iCloudSync;
        this.downloadThreads = i;
        this.uploadThreads = i2;
        String str2 = "cloudsync." + this.name + ".";
        this.eventUpload = str2 + "upload";
        this.eventDownloaded = str2 + "downloaded";
        this.eventRefresh = str2 + "refresh";
        this.eventRefreshed = str2 + "refreshed";
        Gauge.builder("uid2.cloud_downloading", () -> {
            return Integer.valueOf(this.pendingDownload.size());
        }).tag("store", str).description("gauge for how many s3 files are pending download").register(Metrics.globalRegistry);
        Gauge.builder("uid2.cloud_uploading", () -> {
            return Integer.valueOf(this.pendingUpload.size());
        }).tag("store", str).description("gauge for how many s3 files are pending upload").register(Metrics.globalRegistry);
        this.counterRefreshed = Counter.builder("uid2.cloud_refreshed").tag("store", str).description("counter for how many times cloud storage files are refreshed").register(Metrics.globalRegistry);
        this.counterRefreshSkipped = Counter.builder("uid2.cloud_refresh_skipped").tag("store", str).description("counter for how many times cloud storage refresh events are skipped due to in-progress refreshing").register(Metrics.globalRegistry);
        this.counterRefreshFailures = Counter.builder("uid2.cloud_refresh_failures").tag("store", str).description("counter for number of " + str + " store refresh failures").register(Metrics.globalRegistry);
        this.counterDownloaded = Counter.builder("uid2.cloud_downloaded").tag("store", str).description("counter for how many cloud files are downloaded").register(Metrics.globalRegistry);
        this.counterUploaded = Counter.builder("uid2.cloud_uploaded").tag("store", str).description("counter for how many cloud files are uploaded").register(Metrics.globalRegistry);
        this.counterDownloadFailures = Counter.builder("uid2.cloud_download_failures").tag("store", str).description("counter for how many cloud files downloads have failed").register(Metrics.globalRegistry);
        this.counterUploadFailures = Counter.builder("uid2.cloud_upload_failures").tag("store", str).description("counter for how many cloud files uploads have failed").register(Metrics.globalRegistry);
        this.gaugeConsecutiveRefreshFailures = Gauge.builder("uid2.cloud_downloaded.consecutive_refresh_failures", () -> {
            return Integer.valueOf(this.storeRefreshIsFailing.get());
        }).tag("store", str).description("gauge for number of consecutive " + str + " store refresh failures").register(Metrics.globalRegistry);
    }

    public void start(Promise<Void> promise) {
        LOGGER.info("starting CloudSyncVerticle." + this.name);
        this.healthComponent.setHealthStatus(false, "still starting");
        this.downloadExecutor = this.vertx.createSharedWorkerExecutor("cloudsync-" + this.name + "-download-pool", this.downloadThreads);
        this.uploadExecutor = this.vertx.createSharedWorkerExecutor("cloudsync-" + this.name + "-upload-pool", this.uploadThreads);
        this.vertx.eventBus().consumer(this.eventRefresh, message -> {
            handleRefresh(message);
        });
        this.vertx.eventBus().consumer(this.eventUpload, message2 -> {
            handleUpload(message2);
        });
        cloudRefresh().onFailure(th -> {
            LOGGER.error("cloudRefresh failed: " + th.getMessage(), new Exception(th));
        }).onComplete(asyncResult -> {
            promise.handle(asyncResult);
        });
        promise.future().onSuccess(r4 -> {
            LOGGER.info("started CloudSyncVerticle." + this.name);
            this.healthComponent.setHealthStatus(true);
        }).onFailure(th2 -> {
            LOGGER.error("failed starting CloudSyncVerticle." + this.name, new Exception(th2));
            this.healthComponent.setHealthStatus(false, th2.getMessage());
        });
    }

    public void stop() {
        LOGGER.info("shutting down CloudSyncVerticle" + this.name);
    }

    public String eventRefresh() {
        return this.eventRefresh;
    }

    public String eventRefreshed() {
        return this.eventRefreshed;
    }

    public String eventUpload() {
        return this.eventUpload;
    }

    public String eventDownloaded() {
        return this.eventDownloaded;
    }

    private void handleRefresh(Message message) {
        cloudRefresh().onSuccess(r4 -> {
            this.storeRefreshIsFailing.set(0);
        }).onFailure(th -> {
            this.counterRefreshFailures.increment();
            this.storeRefreshIsFailing.set(1);
            LOGGER.error("handleRefresh error: " + th.getMessage(), new Exception(th));
        });
    }

    private Future<Void> cloudRefresh() {
        if (this.isRefreshing) {
            LOGGER.debug("existing s3 refresh in-progress, skipping this one");
            this.counterRefreshSkipped.increment();
            return Future.succeededFuture();
        }
        Promise promise = Promise.promise();
        this.isRefreshing = true;
        this.vertx.executeBlocking(promise2 -> {
            cloudRefreshEnsureInSync(promise, 0);
            promise2.complete();
        }, asyncResult -> {
        });
        return promise.future().onComplete(asyncResult2 -> {
            this.isRefreshing = false;
            emitRefreshedEvent();
        });
    }

    private void cloudRefreshEnsureInSync(Promise<Void> promise, int i) {
        try {
            ArrayList arrayList = new ArrayList();
            boolean refresh = this.cloudSync.refresh(Instant.now(), this.cloudStorage, this.localStorage, set -> {
                Iterator it = set.iterator();
                while (it.hasNext()) {
                    arrayList.add(cloudDownloadFile((String) it.next()));
                }
            }, set2 -> {
                Iterator it = set2.iterator();
                while (it.hasNext()) {
                    arrayList.add(localDelete((String) it.next()));
                }
            });
            CompositeFuture.all(arrayList).onFailure(th -> {
                promise.fail(new Exception(th));
            }).onSuccess(compositeFuture -> {
                if (refresh) {
                    promise.complete();
                } else {
                    if (i >= 19) {
                        promise.fail(new Exception("Cannot full sync in 20 refresh iterations"));
                        return;
                    }
                    if (i > 1) {
                        LOGGER.warn("Not synced in " + (i + 1) + " iterations, last iteration contains " + arrayList.size() + " download/delete jobs");
                    }
                    cloudRefreshEnsureInSync(promise, i + 1);
                }
            });
        } catch (CloudStorageException e) {
            promise.fail(new Exception(e));
        } catch (Exception e2) {
            promise.fail(new Exception("unexpected error in cloudRefresh(): " + e2.getMessage(), e2));
        }
    }

    private Future<Void> localDelete(String str) {
        boolean z;
        try {
            this.localStorage.delete(str);
            z = true;
        } catch (Exception e) {
            z = false;
        }
        LOGGER.info("delete " + str + ": " + z);
        return Future.succeededFuture();
    }

    private void emitRefreshedEvent() {
        int count = (int) this.counterRefreshed.count();
        this.counterRefreshed.increment();
        LOGGER.trace("cloudsync " + this.name + " refreshed " + count);
        this.vertx.eventBus().publish(this.eventRefreshed, Integer.valueOf(count));
    }

    private void handleUpload(Message<String> message) {
        String str = (String) message.body();
        if (str == null) {
            LOGGER.warn("Received null filename for s3 upload, likely snapshot/log produce failed");
            message.reply(false);
        } else if (this.pendingUpload.contains(str)) {
            LOGGER.warn("Skip due to upload pending: " + str);
            message.reply(false);
        } else {
            LOGGER.info("Uploading: " + str);
            this.pendingUpload.add(str);
            this.uploadExecutor.executeBlocking(promise -> {
                cloudUploadBlocking(promise, (String) message.body());
            }, asyncResult -> {
                this.pendingUpload.remove(str);
                handleAsyncResult(asyncResult);
                message.reply(Boolean.valueOf(asyncResult.succeeded()));
                if (asyncResult.succeeded()) {
                    this.counterUploaded.increment();
                } else {
                    this.counterUploadFailures.increment();
                }
                LOGGER.info("Upload result: " + asyncResult.succeeded() + ", " + str);
            });
        }
    }

    private void cloudUploadBlocking(Promise<Void> promise, String str) {
        try {
            String cloudPath = this.cloudSync.toCloudPath(str);
            InputStream download = this.localStorage.download(str);
            try {
                this.cloudStorage.upload(download, cloudPath);
                if (download != null) {
                    download.close();
                }
                promise.complete();
            } finally {
            }
        } catch (Exception e) {
            LOGGER.error(e.getMessage(), e);
            promise.fail(new Throwable(e));
        }
    }

    private Future<Void> cloudDownloadFile(String str) {
        if (str == null) {
            LOGGER.warn("Received null path for s3 download");
            return Future.succeededFuture();
        }
        if (this.pendingDownload.contains(str)) {
            LOGGER.warn("Skip due to download pending: " + this.cloudStorage.mask(str));
            return Future.succeededFuture();
        }
        LOGGER.trace("Downloading: " + this.cloudStorage.mask(str));
        this.pendingDownload.add(str);
        Promise promise = Promise.promise();
        this.downloadExecutor.executeBlocking(promise2 -> {
            cloudDownloadBlocking(promise2, str);
        }, asyncResult -> {
            this.pendingDownload.remove(str);
            handleAsyncResult(asyncResult);
            promise.complete();
            if (asyncResult.succeeded()) {
                this.vertx.eventBus().publish(this.eventDownloaded, this.cloudSync.toLocalPath(str));
                this.counterDownloaded.increment();
            } else {
                this.counterDownloadFailures.increment();
            }
            LOGGER.trace("Download result: " + asyncResult.succeeded() + ", " + this.cloudStorage.mask(str));
        });
        return promise.future();
    }

    private void cloudDownloadBlocking(Promise<Void> promise, String str) {
        try {
            String localPath = this.cloudSync.toLocalPath(str);
            InputStream download = this.cloudStorage.download(str);
            try {
                this.localStorage.upload(download, localPath);
                if (download != null) {
                    download.close();
                }
                promise.complete();
            } finally {
            }
        } catch (Exception e) {
            LOGGER.error("download error: " + e.getClass().getSimpleName());
            promise.fail(new Throwable(e));
        }
    }

    private void handleAsyncResult(AsyncResult asyncResult) {
        if (asyncResult.failed()) {
            Throwable cause = asyncResult.cause();
            LOGGER.error(cause.getMessage(), cause);
        }
    }
}
