package software.amazon.kinesis.coordinator.assignment;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.cloudwatch.model.StandardUnit;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.coordinator.LeaderDecider;
import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseManagementConfig;
import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.metrics.MetricsLevel;
import software.amazon.kinesis.metrics.MetricsScope;
import software.amazon.kinesis.metrics.MetricsUtil;
import software.amazon.kinesis.metrics.NullMetricsScope;
import software.amazon.kinesis.worker.metricstats.WorkerMetricStats;
import software.amazon.kinesis.worker.metricstats.WorkerMetricStatsDAO;

@KinesisClientInternalApi
/* loaded from: input_file:software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManager.class */
public final class LeaseAssignmentManager {
    private static final int DEFAULT_FAILURE_COUNT_TO_SWITCH_LEADER = 3;
    private static final int DEFAULT_LEASE_ASSIGNMENT_MANAGER_FREQ_MULTIPLIER = 2;
    private static final String FORCE_LEADER_RELEASE_METRIC_NAME = "ForceLeaderRelease";
    private static final int DDB_LOAD_RETRY_ATTEMPT = 1;
    private static final String METRICS_LEASE_ASSIGNMENT_MANAGER = "LeaseAssignmentManager";
    private static final String METRICS_INCOMPLETE_EXPIRED_LEASES_ASSIGNMENT = "LeaseAssignmentManager.IncompleteExpiredLeasesAssignment";
    public static final int DEFAULT_NO_OF_SKIP_STAT_FOR_DEAD_WORKER_THRESHOLD = 2;
    private final LeaseRefresher leaseRefresher;
    private final WorkerMetricStatsDAO workerMetricsDAO;
    private final LeaderDecider leaderDecider;
    private final LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig config;
    private final String currentWorkerId;
    private final Long leaseDurationMillis;
    private final MetricsFactory metricsFactory;
    private final ScheduledExecutorService executorService;
    private final Supplier<Long> nanoTimeProvider;
    private final int maxLeasesForWorker;
    private final LeaseManagementConfig.GracefulLeaseHandoffConfig gracefulLeaseHandoffConfig;
    private Future<?> managerFuture;
    private static final Logger log = LoggerFactory.getLogger(LeaseAssignmentManager.class);
    private static final ExecutorService LEASE_ASSIGNMENT_CALL_THREAD_POOL = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
    private boolean tookOverLeadershipInThisRun = false;
    private final Map<String, Lease> prevRunLeasesState = new HashMap();
    private int noOfContinuousFailedAttempts = 0;
    private int lamRunCounter = 0;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:software/amazon/kinesis/coordinator/assignment/LeaseAssignmentManager$InMemoryStorageView.class */
    public class InMemoryStorageView {
        private List<Lease> leaseList;
        private List<WorkerMetricStats> activeWorkerMetrics;
        private List<WorkerMetricStats> workerMetricsList;
        private Set<String> activeWorkerIdSet;
        private double targetAverageThroughput;
        private final Map<String, Set<Lease>> workerToLeasesMap = new HashMap();
        private final Map<String, Double> workerToTotalAssignedThroughputMap = new HashMap();
        private final Map<Lease, String> leaseToNewAssignedWorkerMap = new HashMap();
        private long leaseTableScanTime = 0;

        InMemoryStorageView() {
        }

        public void performLeaseAssignment(Lease lease, String str) {
            String actualOwner = lease.actualOwner();
            this.workerToLeasesMap.get(actualOwner).remove(lease);
            this.workerToLeasesMap.computeIfAbsent(str, str2 -> {
                return new HashSet();
            }).add(lease);
            updateWorkerThroughput(str, lease.throughputKBps().doubleValue());
            updateWorkerThroughput(actualOwner, -lease.throughputKBps().doubleValue());
            this.leaseToNewAssignedWorkerMap.put(lease, str);
        }

        public void loadInMemoryStorageView(MetricsScope metricsScope) throws Exception {
            CompletableFuture<Map.Entry<List<Lease>, List<String>>> loadLeaseListAsync = loadLeaseListAsync();
            List<WorkerMetricStats> join = loadWorkerMetricStats().join();
            List list = (List) join.stream().filter(workerMetricStats -> {
                return !workerMetricStats.isValidWorkerMetric();
            }).map((v0) -> {
                return v0.getWorkerId();
            }).collect(Collectors.toList());
            if (!list.isEmpty()) {
                LeaseAssignmentManager.log.warn("List of workerIds with invalid entries : {}", list);
                metricsScope.addData("NumWorkersWithInvalidEntry", list.size(), StandardUnit.COUNT, MetricsLevel.SUMMARY);
            }
            this.workerMetricsList = (List) join.stream().filter((v0) -> {
                return v0.isValidWorkerMetric();
            }).collect(Collectors.toList());
            LeaseAssignmentManager.log.info("Total WorkerMetricStats available : {}", Integer.valueOf(this.workerMetricsList.size()));
            long computeWorkerExpiryThresholdInSecond = computeWorkerExpiryThresholdInSecond();
            long count = this.workerMetricsList.stream().filter((v0) -> {
                return v0.isAnyWorkerMetricFailing();
            }).count();
            if (count != 0) {
                metricsScope.addData("NumWorkersWithFailingWorkerMetric", count, StandardUnit.COUNT, MetricsLevel.SUMMARY);
            }
            Map.Entry<List<Lease>, List<String>> join2 = loadLeaseListAsync.join();
            this.leaseList = join2.getKey();
            if (!join2.getValue().isEmpty()) {
                LeaseAssignmentManager.log.warn("Leases that failed deserialization : {}", join2.getValue());
                MetricsUtil.addCount(metricsScope, "LeaseDeserializationFailureCount", join2.getValue().size(), MetricsLevel.SUMMARY);
            }
            this.leaseTableScanTime = ((Long) LeaseAssignmentManager.this.nanoTimeProvider.get()).longValue();
            LeaseAssignmentManager.log.info("Total Leases available : {}", Integer.valueOf(this.leaseList.size()));
            double orElse = this.leaseList.stream().filter(lease -> {
                return Objects.nonNull(lease.throughputKBps());
            }).mapToDouble((v0) -> {
                return v0.throughputKBps();
            }).average().orElse(0.0d);
            this.activeWorkerMetrics = (List) this.workerMetricsList.stream().filter(workerMetricStats2 -> {
                return workerMetricStats2.getLastUpdateTime().longValue() >= computeWorkerExpiryThresholdInSecond && !workerMetricStats2.isAnyWorkerMetricFailing();
            }).collect(Collectors.toList());
            LeaseAssignmentManager.log.info("activeWorkerMetrics : {}", Integer.valueOf(this.activeWorkerMetrics.size()));
            this.targetAverageThroughput = (orElse * this.leaseList.size()) / Math.max(1, this.activeWorkerMetrics.size());
            this.leaseList.forEach(lease2 -> {
                if (Objects.isNull(lease2.throughputKBps())) {
                    lease2.throughputKBps(orElse);
                }
                this.workerToLeasesMap.computeIfAbsent(lease2.actualOwner(), str -> {
                    return new HashSet();
                }).add(lease2);
                updateWorkerThroughput(lease2.actualOwner(), lease2.throughputKBps().doubleValue());
            });
            this.activeWorkerIdSet = new HashSet();
            this.activeWorkerMetrics.forEach(workerMetricStats3 -> {
                this.activeWorkerIdSet.add(workerMetricStats3.getWorkerId());
                workerMetricStats3.setEmaAlpha(LeaseAssignmentManager.this.config.workerMetricsEMAAlpha());
                if (workerMetricStats3.isUsingDefaultWorkerMetric()) {
                    setOperatingRangeAndWorkerMetricsDataForDefaultWorker(workerMetricStats3, Double.valueOf(getTotalAssignedThroughput(workerMetricStats3.getWorkerId()).doubleValue() / this.targetAverageThroughput));
                }
            });
        }

        private void updateWorkerThroughput(String str, double d) {
            this.workerToTotalAssignedThroughputMap.put(str, Double.valueOf(this.workerToTotalAssignedThroughputMap.computeIfAbsent(str, str2 -> {
                return Double.valueOf(0.0d);
            }).doubleValue() + d));
        }

        private void setOperatingRangeAndWorkerMetricsDataForDefaultWorker(WorkerMetricStats workerMetricStats, Double d) {
            LeaseAssignmentManager.log.info("Worker [{}] is using default WorkerMetricStats, setting initial utilization ratio to [{}].", workerMetricStats.getWorkerId(), d);
            workerMetricStats.setOperatingRange(ImmutableMap.of("T", ImmutableList.of(100L)));
            workerMetricStats.setMetricStats(ImmutableMap.of("T", ImmutableList.of(Double.valueOf(d.doubleValue() * 100.0d), Double.valueOf(d.doubleValue() * 100.0d))));
        }

        private long computeWorkerExpiryThresholdInSecond() {
            long seconds = Duration.ofMillis(System.currentTimeMillis() - (2 * LeaseAssignmentManager.this.config.workerMetricsReporterFreqInMillis())).getSeconds();
            LeaseAssignmentManager.log.info("WorkerMetricStats expiry time in seconds : {}", Long.valueOf(seconds));
            return seconds;
        }

        public boolean isWorkerTotalThroughputLessThanMaxThroughput(String str) {
            return getTotalAssignedThroughput(str).doubleValue() <= LeaseAssignmentManager.this.config.maxThroughputPerHostKBps();
        }

        public boolean isWorkerAssignedLeasesLessThanMaxLeases(String str) {
            Set<Lease> set = this.workerToLeasesMap.get(str);
            return CollectionUtils.isEmpty(set) || set.size() < LeaseAssignmentManager.this.maxLeasesForWorker;
        }

        public Double getTotalAssignedThroughput(String str) {
            return this.workerToTotalAssignedThroughputMap.getOrDefault(str, Double.valueOf(0.0d));
        }

        private CompletableFuture<List<WorkerMetricStats>> loadWorkerMetricStats() {
            return CompletableFuture.supplyAsync(() -> {
                WorkerMetricStatsDAO workerMetricStatsDAO = LeaseAssignmentManager.this.workerMetricsDAO;
                Objects.requireNonNull(workerMetricStatsDAO);
                return (List) loadWithRetry(workerMetricStatsDAO::getAllWorkerMetricStats);
            });
        }

        private CompletableFuture<Map.Entry<List<Lease>, List<String>>> loadLeaseListAsync() {
            return CompletableFuture.supplyAsync(() -> {
                return (Map.Entry) loadWithRetry(() -> {
                    return LeaseAssignmentManager.this.leaseRefresher.listLeasesParallely(LeaseAssignmentManager.LEASE_ASSIGNMENT_CALL_THREAD_POOL, 0);
                });
            });
        }

        private <T> T loadWithRetry(Callable<T> callable) {
            int i = 0;
            while (true) {
                try {
                    return callable.call();
                } catch (Exception e) {
                    if (i >= 1) {
                        throw new CompletionException(e);
                    }
                    LeaseAssignmentManager.log.warn("Failed to load : {}, retrying", callable.getClass().getName(), e);
                    i++;
                }
            }
        }

        public Map<String, Set<Lease>> getWorkerToLeasesMap() {
            return this.workerToLeasesMap;
        }

        public Map<String, Double> getWorkerToTotalAssignedThroughputMap() {
            return this.workerToTotalAssignedThroughputMap;
        }

        public Map<Lease, String> getLeaseToNewAssignedWorkerMap() {
            return this.leaseToNewAssignedWorkerMap;
        }

        public List<Lease> getLeaseList() {
            return this.leaseList;
        }

        public List<WorkerMetricStats> getActiveWorkerMetrics() {
            return this.activeWorkerMetrics;
        }

        public List<WorkerMetricStats> getWorkerMetricsList() {
            return this.workerMetricsList;
        }

        public Set<String> getActiveWorkerIdSet() {
            return this.activeWorkerIdSet;
        }

        public long getLeaseTableScanTime() {
            return this.leaseTableScanTime;
        }

        public double getTargetAverageThroughput() {
            return this.targetAverageThroughput;
        }
    }

    public synchronized void start() {
        if (!Objects.isNull(this.managerFuture)) {
            log.info("LeaseAssignmentManager already running...");
            return;
        }
        this.tookOverLeadershipInThisRun = false;
        this.managerFuture = this.executorService.scheduleWithFixedDelay(this::performAssignment, 0L, this.leaseDurationMillis.longValue() * 2, TimeUnit.MILLISECONDS);
        log.info("Started LeaseAssignmentManager");
    }

    public synchronized void stop() {
        if (!Objects.nonNull(this.managerFuture)) {
            log.info("LeaseAssignmentManager is not running...");
            return;
        }
        log.info("Completed shutdown of LeaseAssignmentManager");
        this.managerFuture.cancel(true);
        this.managerFuture = null;
    }

    private MetricsScope createMetricsScope(String str) {
        try {
            return MetricsUtil.createMetricsWithOperation(this.metricsFactory, str);
        } catch (Exception e) {
            log.error("Failed to create metrics scope defaulting to no metrics.", e);
            return new NullMetricsScope();
        }
    }

    private void performAssignment() {
        MetricsScope createMetricsScope = createMetricsScope(METRICS_LEASE_ASSIGNMENT_MANAGER);
        long currentTimeMillis = System.currentTimeMillis();
        try {
            try {
                if (!this.leaderDecider.isLeader(this.currentWorkerId).booleanValue()) {
                    log.info("Current worker {} is not a leader, ignore", this.currentWorkerId);
                    this.tookOverLeadershipInThisRun = false;
                    MetricsUtil.addSuccessAndLatency(createMetricsScope, true, currentTimeMillis, MetricsLevel.SUMMARY);
                    MetricsUtil.endScope(createMetricsScope);
                    return;
                }
                if (!this.tookOverLeadershipInThisRun) {
                    this.tookOverLeadershipInThisRun = true;
                    this.lamRunCounter = 0;
                    prepareAfterLeaderSwitch();
                }
                log.info("Current worker {} is a leader, performing assignment", this.currentWorkerId);
                InMemoryStorageView inMemoryStorageView = new InMemoryStorageView();
                long currentTimeMillis2 = System.currentTimeMillis();
                inMemoryStorageView.loadInMemoryStorageView(createMetricsScope);
                MetricsUtil.addLatency(createMetricsScope, "LeaseAndWorkerMetricsLoad", currentTimeMillis2, MetricsLevel.DETAILED);
                publishLeaseAndWorkerCountMetrics(createMetricsScope, inMemoryStorageView);
                VarianceBasedLeaseAssignmentDecider varianceBasedLeaseAssignmentDecider = new VarianceBasedLeaseAssignmentDecider(inMemoryStorageView, this.config.dampeningPercentage(), this.config.reBalanceThresholdPercentage(), this.config.allowThroughputOvershoot());
                updateLeasesLastCounterIncrementNanosAndLeaseShutdownTimeout(inMemoryStorageView.getLeaseList(), Long.valueOf(inMemoryStorageView.getLeaseTableScanTime()));
                List<Lease> list = (List) inMemoryStorageView.getLeaseList().stream().filter(lease -> {
                    return lease.isExpired(TimeUnit.MILLISECONDS.toNanos(this.leaseDurationMillis.longValue()), inMemoryStorageView.getLeaseTableScanTime()) || Objects.isNull(lease.actualOwner());
                }).map(lease2 -> {
                    return lease2.isExpiredOrUnassigned(true);
                }).collect(Collectors.toList());
                log.info("Total expiredOrUnassignedLeases count : {}", Integer.valueOf(list.size()));
                createMetricsScope.addData("ExpiredLeases", list.size(), StandardUnit.COUNT, MetricsLevel.SUMMARY);
                long currentTimeMillis3 = System.currentTimeMillis();
                varianceBasedLeaseAssignmentDecider.assignExpiredOrUnassignedLeases(list);
                MetricsUtil.addLatency(createMetricsScope, "AssignExpiredOrUnassignedLeases", currentTimeMillis3, MetricsLevel.DETAILED);
                if (!list.isEmpty()) {
                    log.warn("Not able to assign all expiredOrUnAssignedLeases");
                    createMetricsScope.addData("LeaseSpillover", list.size(), StandardUnit.COUNT, MetricsLevel.SUMMARY);
                }
                if (shouldRunVarianceBalancing()) {
                    long currentTimeMillis4 = System.currentTimeMillis();
                    int size = inMemoryStorageView.leaseToNewAssignedWorkerMap.size();
                    varianceBasedLeaseAssignmentDecider.balanceWorkerVariance();
                    MetricsUtil.addLatency(createMetricsScope, "BalanceWorkerVariance", currentTimeMillis4, MetricsLevel.DETAILED);
                    createMetricsScope.addData("NumOfLeasesReassignment", inMemoryStorageView.leaseToNewAssignedWorkerMap.size() - size, StandardUnit.COUNT, MetricsLevel.SUMMARY);
                }
                if (inMemoryStorageView.leaseToNewAssignedWorkerMap.isEmpty()) {
                    log.info("No new lease assignment performed in this iteration");
                }
                parallelyAssignLeases(inMemoryStorageView, createMetricsScope);
                printPerWorkerLeases(inMemoryStorageView);
                deleteStaleWorkerMetricsEntries(inMemoryStorageView, createMetricsScope);
                this.noOfContinuousFailedAttempts = 0;
                MetricsUtil.addSuccessAndLatency(createMetricsScope, true, currentTimeMillis, MetricsLevel.SUMMARY);
                MetricsUtil.endScope(createMetricsScope);
            } catch (Exception e) {
                log.error("LeaseAssignmentManager failed to perform lease assignment.", e);
                this.noOfContinuousFailedAttempts++;
                if (this.noOfContinuousFailedAttempts >= 3) {
                    log.error("Failed to perform assignment {} times in a row, releasing leadership from worker : {}", 3, this.currentWorkerId);
                    MetricsUtil.addCount(createMetricsScope, FORCE_LEADER_RELEASE_METRIC_NAME, 1L, MetricsLevel.SUMMARY);
                    this.leaderDecider.releaseLeadershipIfHeld();
                }
                MetricsUtil.addSuccessAndLatency(createMetricsScope, false, currentTimeMillis, MetricsLevel.SUMMARY);
                MetricsUtil.endScope(createMetricsScope);
            }
        } catch (Throwable th) {
            MetricsUtil.addSuccessAndLatency(createMetricsScope, false, currentTimeMillis, MetricsLevel.SUMMARY);
            MetricsUtil.endScope(createMetricsScope);
            throw th;
        }
    }

    private boolean shouldRunVarianceBalancing() {
        boolean z = this.lamRunCounter == 0;
        this.lamRunCounter = (this.lamRunCounter + 1) % this.config.varianceBalancingFrequency();
        return z;
    }

    private void deleteStaleWorkerMetricsEntries(InMemoryStorageView inMemoryStorageView, MetricsScope metricsScope) {
        long currentTimeMillis = System.currentTimeMillis();
        try {
            List list = (List) inMemoryStorageView.getWorkerMetricsList().stream().filter(this::isWorkerMetricsEntryStale).collect(Collectors.toList());
            MetricsUtil.addCount(metricsScope, "TotalStaleWorkerMetricsEntry", list.size(), MetricsLevel.DETAILED);
            log.info("Number of stale workerMetrics entries : {}", Integer.valueOf(list.size()));
            log.info("Stale workerMetrics list : {}", list);
            CompletableFuture.allOf((CompletableFuture[]) ((List) list.stream().map(workerMetricStats -> {
                return CompletableFuture.supplyAsync(() -> {
                    return Boolean.valueOf(this.workerMetricsDAO.deleteMetrics(workerMetricStats));
                }, LEASE_ASSIGNMENT_CALL_THREAD_POOL);
            }).collect(Collectors.toList())).toArray(new CompletableFuture[0])).join();
            MetricsUtil.addLatency(metricsScope, "StaleWorkerMetricsCleanup", currentTimeMillis, MetricsLevel.DETAILED);
        } catch (Throwable th) {
            MetricsUtil.addLatency(metricsScope, "StaleWorkerMetricsCleanup", currentTimeMillis, MetricsLevel.DETAILED);
            throw th;
        }
    }

    private boolean isWorkerMetricsEntryStale(WorkerMetricStats workerMetricStats) {
        return Duration.between(Instant.ofEpochSecond(workerMetricStats.getLastUpdateTime().longValue()), Instant.now()).toMillis() > this.config.staleWorkerMetricsEntryCleanupDuration().toMillis();
    }

    private void printPerWorkerLeases(InMemoryStorageView inMemoryStorageView) {
        inMemoryStorageView.getActiveWorkerIdSet().forEach(str -> {
            log.info("Worker : {} and total leases : {} and totalThroughput : {}", new Object[]{str, Integer.valueOf(((Set) Optional.ofNullable(inMemoryStorageView.getWorkerToLeasesMap().get(str)).orElse(Collections.EMPTY_SET)).size()), inMemoryStorageView.getWorkerToTotalAssignedThroughputMap().get(str)});
        });
    }

    private void parallelyAssignLeases(InMemoryStorageView inMemoryStorageView, MetricsScope metricsScope) {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        long currentTimeMillis = System.currentTimeMillis();
        boolean z = false;
        try {
            CompletableFuture.allOf((CompletableFuture[]) inMemoryStorageView.getLeaseToNewAssignedWorkerMap().entrySet().stream().filter(entry -> {
                return !((Lease) entry.getKey()).blockedOnPendingCheckpoint(getNanoTimeMillis());
            }).map(entry2 -> {
                return CompletableFuture.supplyAsync(() -> {
                    try {
                        Lease lease = (Lease) entry2.getKey();
                        return (this.gracefulLeaseHandoffConfig.isGracefulLeaseHandoffEnabled() && lease.isEligibleForGracefulShutdown()) ? Boolean.valueOf(handleGracefulLeaseHandoff(lease, (String) entry2.getValue(), atomicInteger)) : Boolean.valueOf(handleRegularLeaseAssignment(lease, (String) entry2.getValue(), atomicInteger));
                    } catch (Exception e) {
                        throw new CompletionException(e);
                    }
                }, LEASE_ASSIGNMENT_CALL_THREAD_POOL);
            }).toArray(i -> {
                return new CompletableFuture[i];
            })).join();
            z = true;
            MetricsUtil.addCount(metricsScope, "FailedAssignmentCount", atomicInteger.get(), MetricsLevel.DETAILED);
            MetricsUtil.addSuccessAndLatency(metricsScope, "ParallelyAssignLeases", true, currentTimeMillis, MetricsLevel.DETAILED);
        } catch (Throwable th) {
            MetricsUtil.addCount(metricsScope, "FailedAssignmentCount", atomicInteger.get(), MetricsLevel.DETAILED);
            MetricsUtil.addSuccessAndLatency(metricsScope, "ParallelyAssignLeases", z, currentTimeMillis, MetricsLevel.DETAILED);
            throw th;
        }
    }

    private boolean handleGracefulLeaseHandoff(Lease lease, String str, AtomicInteger atomicInteger) throws ProvisionedThroughputException, InvalidStateException, DependencyException {
        boolean initiateGracefulLeaseHandoff = this.leaseRefresher.initiateGracefulLeaseHandoff(lease, str);
        if (initiateGracefulLeaseHandoff) {
            lease.checkpointOwnerTimeoutTimestampMillis(Long.valueOf(getCheckpointOwnerTimeoutTimestampMillis()));
        } else {
            atomicInteger.incrementAndGet();
        }
        return initiateGracefulLeaseHandoff;
    }

    private boolean handleRegularLeaseAssignment(Lease lease, String str, AtomicInteger atomicInteger) throws ProvisionedThroughputException, InvalidStateException, DependencyException {
        boolean assignLease = this.leaseRefresher.assignLease(lease, str);
        if (assignLease) {
            lease.lastCounterIncrementNanos(this.nanoTimeProvider.get());
        } else {
            atomicInteger.incrementAndGet();
        }
        return assignLease;
    }

    private void publishLeaseAndWorkerCountMetrics(MetricsScope metricsScope, InMemoryStorageView inMemoryStorageView) {
        metricsScope.addData("TotalLeases", inMemoryStorageView.leaseList.size(), StandardUnit.COUNT, MetricsLevel.SUMMARY);
        metricsScope.addData("NumWorkers", inMemoryStorageView.activeWorkerMetrics.size(), StandardUnit.COUNT, MetricsLevel.SUMMARY);
    }

    private void updateLeasesLastCounterIncrementNanosAndLeaseShutdownTimeout(List<Lease> list, Long l) {
        for (Lease lease : list) {
            Lease lease2 = this.prevRunLeasesState.get(lease.leaseKey());
            if (lease.shutdownRequested()) {
                if (!Objects.isNull(lease2) && lease2.shutdownRequested() && isSameOwners(lease, lease2)) {
                    lease.checkpointOwnerTimeoutTimestampMillis(lease2.checkpointOwnerTimeoutTimestampMillis());
                } else {
                    lease.checkpointOwnerTimeoutTimestampMillis(Long.valueOf(getCheckpointOwnerTimeoutTimestampMillis()));
                }
            }
            if (Objects.isNull(lease2)) {
                lease.lastCounterIncrementNanos(Long.valueOf(Objects.isNull(lease.actualOwner()) ? 0L : l.longValue()));
            } else {
                lease.lastCounterIncrementNanos(lease.leaseCounter().longValue() > lease2.leaseCounter().longValue() ? l : lease2.lastCounterIncrementNanos());
            }
        }
        this.prevRunLeasesState.clear();
        this.prevRunLeasesState.putAll((Map) list.stream().collect(Collectors.toMap((v0) -> {
            return v0.leaseKey();
        }, Function.identity())));
    }

    private void prepareAfterLeaderSwitch() {
        this.prevRunLeasesState.clear();
        this.noOfContinuousFailedAttempts = 0;
    }

    private long getCheckpointOwnerTimeoutTimestampMillis() {
        return getNanoTimeMillis() + this.gracefulLeaseHandoffConfig.gracefulLeaseHandoffTimeoutMillis() + this.leaseDurationMillis.longValue();
    }

    private long getNanoTimeMillis() {
        return TimeUnit.NANOSECONDS.toMillis(this.nanoTimeProvider.get().longValue());
    }

    private static boolean isSameOwners(Lease lease, Lease lease2) {
        return Objects.equals(lease.leaseOwner(), lease2.leaseOwner()) && Objects.equals(lease.checkpointOwner(), lease2.checkpointOwner());
    }

    public LeaseAssignmentManager(LeaseRefresher leaseRefresher, WorkerMetricStatsDAO workerMetricStatsDAO, LeaderDecider leaderDecider, LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig workerUtilizationAwareAssignmentConfig, String str, Long l, MetricsFactory metricsFactory, ScheduledExecutorService scheduledExecutorService, Supplier<Long> supplier, int i, LeaseManagementConfig.GracefulLeaseHandoffConfig gracefulLeaseHandoffConfig) {
        this.leaseRefresher = leaseRefresher;
        this.workerMetricsDAO = workerMetricStatsDAO;
        this.leaderDecider = leaderDecider;
        this.config = workerUtilizationAwareAssignmentConfig;
        this.currentWorkerId = str;
        this.leaseDurationMillis = l;
        this.metricsFactory = metricsFactory;
        this.executorService = scheduledExecutorService;
        this.nanoTimeProvider = supplier;
        this.maxLeasesForWorker = i;
        this.gracefulLeaseHandoffConfig = gracefulLeaseHandoffConfig;
    }
}
