package org.apache.pulsar.broker.service.persistent;

import java.time.Clock;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.shade.io.prometheus.client.Summary;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.MarkersMessageIdData;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshotResponse;
import org.apache.pulsar.shade.org.apache.pulsar.common.protocol.Markers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilder.class */
public class ReplicatedSubscriptionsSnapshotBuilder {
    private final ReplicatedSubscriptionsController controller;
    private final List<String> remoteClusters;
    private final Set<String> missingClusters;
    private final boolean needTwoRounds;
    private boolean firstRoundComplete;
    private long startTimeMillis;
    private final long timeoutMillis;
    private final Clock clock;
    private static final Logger log = LoggerFactory.getLogger(ReplicatedSubscriptionsSnapshotBuilder.class);
    private static final Summary snapshotMetric = Summary.build("pulsar_replicated_subscriptions_snapshot_ms", "Time taken to create a consistent snapshot across clusters").labelNames("topic").register();
    private final Map<String, MarkersMessageIdData> responses = new TreeMap();
    private final String snapshotId = UUID.randomUUID().toString();

    public ReplicatedSubscriptionsSnapshotBuilder(ReplicatedSubscriptionsController replicatedSubscriptionsController, List<String> list, ServiceConfiguration serviceConfiguration, Clock clock) {
        this.controller = replicatedSubscriptionsController;
        this.remoteClusters = list;
        this.missingClusters = new TreeSet(list);
        this.clock = clock;
        this.timeoutMillis = TimeUnit.SECONDS.toMillis(serviceConfiguration.getReplicatedSubscriptionsSnapshotTimeoutSeconds());
        this.needTwoRounds = list.size() > 1;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public String getSnapshotId() {
        return this.snapshotId;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void start() {
        if (log.isDebugEnabled()) {
            log.debug("[{}] Starting new snapshot {} - Clusters: {}", new Object[]{this.controller.topic().getName(), this.snapshotId, this.missingClusters});
        }
        this.startTimeMillis = this.clock.millis();
        this.controller.writeMarker(Markers.newReplicatedSubscriptionsSnapshotRequest(this.snapshotId, this.controller.localCluster()));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void receivedSnapshotResponse(Position position, ReplicatedSubscriptionsSnapshotResponse replicatedSubscriptionsSnapshotResponse) {
        if (log.isDebugEnabled()) {
            log.debug("[{}] Received response from {}", this.controller.topic().getName(), replicatedSubscriptionsSnapshotResponse.getCluster().getCluster());
        }
        String cluster = replicatedSubscriptionsSnapshotResponse.getCluster().getCluster();
        this.responses.putIfAbsent(cluster, new MarkersMessageIdData().copyFrom(replicatedSubscriptionsSnapshotResponse.getCluster().getMessageId()));
        this.missingClusters.remove(cluster);
        if (log.isDebugEnabled()) {
            log.debug("[{}] Missing clusters {}", this.controller.topic().getName(), this.missingClusters);
        }
        if (this.missingClusters.isEmpty()) {
            if (this.needTwoRounds && !this.firstRoundComplete) {
                this.firstRoundComplete = true;
                this.missingClusters.addAll(this.remoteClusters);
                this.controller.writeMarker(Markers.newReplicatedSubscriptionsSnapshotRequest(this.snapshotId, this.controller.localCluster()));
            } else {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Snapshot is complete {}", this.controller.topic().getName(), this.snapshotId);
                }
                PositionImpl positionImpl = (PositionImpl) position;
                this.controller.writeMarker(Markers.newReplicatedSubscriptionsSnapshot(this.snapshotId, this.controller.localCluster(), positionImpl.getLedgerId(), positionImpl.getEntryId(), this.responses));
                this.controller.snapshotCompleted(this.snapshotId);
                snapshotMetric.labels(this.controller.topic().getName()).observe(this.clock.millis() - this.startTimeMillis);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isTimedOut() {
        return this.startTimeMillis + this.timeoutMillis < this.clock.millis();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long getStartTimeMillis() {
        return this.startTimeMillis;
    }
}
