package software.amazon.kinesis.lifecycle;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseCoordinator;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseCoordinator;
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;

@KinesisClientInternalApi
/* loaded from: input_file:software/amazon/kinesis/lifecycle/LeaseGracefulShutdownHandler.class */
public class LeaseGracefulShutdownHandler {
    private static final Logger log = LoggerFactory.getLogger(LeaseGracefulShutdownHandler.class);
    private static final long SHUTDOWN_CHECK_INTERVAL_MILLIS = 2000;
    private final long shutdownTimeoutMillis;
    private final ConcurrentMap<ShardInfo, ShardConsumer> shardInfoShardConsumerMap;
    private final LeaseCoordinator leaseCoordinator;
    private final Supplier<Long> currentTimeSupplier;
    private final ScheduledExecutorService executorService;
    private final ConcurrentMap<ShardInfo, LeasePendingShutdown> shardInfoLeasePendingShutdownMap = new ConcurrentHashMap();
    private volatile boolean isRunning = false;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:software/amazon/kinesis/lifecycle/LeaseGracefulShutdownHandler$LeasePendingShutdown.class */
    public static class LeasePendingShutdown {
        final Lease lease;
        final ShardConsumer shardConsumer;
        long timeoutTimestampMillis;
        boolean shutdownRequested = false;
        boolean leaseTransferCalled = false;

        public LeasePendingShutdown(Lease lease, ShardConsumer shardConsumer) {
            this.lease = lease;
            this.shardConsumer = shardConsumer;
        }

        public Lease getLease() {
            return this.lease;
        }

        public ShardConsumer getShardConsumer() {
            return this.shardConsumer;
        }

        public long getTimeoutTimestampMillis() {
            return this.timeoutTimestampMillis;
        }

        public boolean isShutdownRequested() {
            return this.shutdownRequested;
        }

        public boolean isLeaseTransferCalled() {
            return this.leaseTransferCalled;
        }

        public void setTimeoutTimestampMillis(long j) {
            this.timeoutTimestampMillis = j;
        }

        public void setShutdownRequested(boolean z) {
            this.shutdownRequested = z;
        }

        public void setLeaseTransferCalled(boolean z) {
            this.leaseTransferCalled = z;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof LeasePendingShutdown)) {
                return false;
            }
            LeasePendingShutdown leasePendingShutdown = (LeasePendingShutdown) obj;
            if (!leasePendingShutdown.canEqual(this) || getTimeoutTimestampMillis() != leasePendingShutdown.getTimeoutTimestampMillis() || isShutdownRequested() != leasePendingShutdown.isShutdownRequested() || isLeaseTransferCalled() != leasePendingShutdown.isLeaseTransferCalled()) {
                return false;
            }
            Lease lease = getLease();
            Lease lease2 = leasePendingShutdown.getLease();
            if (lease == null) {
                if (lease2 != null) {
                    return false;
                }
            } else if (!lease.equals(lease2)) {
                return false;
            }
            ShardConsumer shardConsumer = getShardConsumer();
            ShardConsumer shardConsumer2 = leasePendingShutdown.getShardConsumer();
            return shardConsumer == null ? shardConsumer2 == null : shardConsumer.equals(shardConsumer2);
        }

        protected boolean canEqual(Object obj) {
            return obj instanceof LeasePendingShutdown;
        }

        public int hashCode() {
            long timeoutTimestampMillis = getTimeoutTimestampMillis();
            int i = (((((1 * 59) + ((int) ((timeoutTimestampMillis >>> 32) ^ timeoutTimestampMillis))) * 59) + (isShutdownRequested() ? 79 : 97)) * 59) + (isLeaseTransferCalled() ? 79 : 97);
            Lease lease = getLease();
            int hashCode = (i * 59) + (lease == null ? 43 : lease.hashCode());
            ShardConsumer shardConsumer = getShardConsumer();
            return (hashCode * 59) + (shardConsumer == null ? 43 : shardConsumer.hashCode());
        }

        public String toString() {
            return "LeaseGracefulShutdownHandler.LeasePendingShutdown(lease=" + getLease() + ", shardConsumer=" + getShardConsumer() + ", timeoutTimestampMillis=" + getTimeoutTimestampMillis() + ", shutdownRequested=" + isShutdownRequested() + ", leaseTransferCalled=" + isLeaseTransferCalled() + ")";
        }
    }

    public static LeaseGracefulShutdownHandler create(long j, ConcurrentMap<ShardInfo, ShardConsumer> concurrentMap, LeaseCoordinator leaseCoordinator) {
        return new LeaseGracefulShutdownHandler(j, concurrentMap, leaseCoordinator, System::currentTimeMillis, Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("LeaseGracefulShutdown-%04d").setDaemon(true).build()));
    }

    public void start() {
        if (this.isRunning) {
            log.info("Graceful lease handoff thread already running, no need to start.");
            return;
        }
        log.info("Starting graceful lease handoff thread.");
        this.executorService.scheduleAtFixedRate(this::monitorGracefulShutdownLeases, 0L, SHUTDOWN_CHECK_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
        this.isRunning = true;
    }

    public void stop() {
        if (!this.isRunning) {
            log.info("Graceful lease handoff thread already stopped.");
            return;
        }
        log.info("Stopping graceful lease handoff thread.");
        this.executorService.shutdown();
        this.isRunning = false;
    }

    public void enqueueShutdown(Lease lease) {
        if (lease != null && lease.shutdownRequested() && this.isRunning) {
            ShardInfo convertLeaseToAssignment = DynamoDBLeaseCoordinator.convertLeaseToAssignment(lease);
            ShardConsumer shardConsumer = this.shardInfoShardConsumerMap.get(convertLeaseToAssignment);
            if (shardConsumer == null || shardConsumer.isShutdown()) {
                this.shardInfoLeasePendingShutdownMap.remove(convertLeaseToAssignment);
            } else {
                this.shardInfoLeasePendingShutdownMap.computeIfAbsent(convertLeaseToAssignment, shardInfo -> {
                    log.info("Calling graceful shutdown for lease {}", lease.leaseKey());
                    LeasePendingShutdown leasePendingShutdown = new LeasePendingShutdown(lease, shardConsumer);
                    initiateShutdown(leasePendingShutdown);
                    return leasePendingShutdown;
                });
            }
        }
    }

    private void monitorGracefulShutdownLeases() {
        String str = null;
        try {
            for (Map.Entry<ShardInfo, LeasePendingShutdown> entry : this.shardInfoLeasePendingShutdownMap.entrySet()) {
                LeasePendingShutdown value = entry.getValue();
                ShardInfo key = entry.getKey();
                str = value.lease.leaseKey();
                if (value.shardConsumer.isShutdown() || this.shardInfoShardConsumerMap.get(key) == null || this.leaseCoordinator.getCurrentlyHeldLease(str) == null) {
                    logTimeoutMessage(value);
                    this.shardInfoLeasePendingShutdownMap.remove(key);
                } else if (getCurrentTimeMillis() >= value.timeoutTimestampMillis && !value.leaseTransferCalled) {
                    try {
                        log.info("Timeout {} millisecond reached waiting for lease {} to graceful handoff. Attempting to transfer the lease to {}", new Object[]{Long.valueOf(this.shutdownTimeoutMillis), str, value.lease.leaseOwner()});
                        transferLeaseIfOwner(value);
                    } catch (DependencyException | InvalidStateException | ProvisionedThroughputException e) {
                        log.warn("Failed to transfer lease for key {}. Will retry", str, e);
                    }
                }
            }
        } catch (Exception e2) {
            log.error("Error in graceful shutdown for lease {}", str, e2);
        }
    }

    private void initiateShutdown(LeasePendingShutdown leasePendingShutdown) {
        leasePendingShutdown.shardConsumer.gracefulShutdown(null);
        leasePendingShutdown.shutdownRequested = true;
        leasePendingShutdown.timeoutTimestampMillis = getCurrentTimeMillis() + this.shutdownTimeoutMillis;
    }

    private void logTimeoutMessage(LeasePendingShutdown leasePendingShutdown) {
        if (leasePendingShutdown.leaseTransferCalled) {
            log.info("Lease {} took {} milliseconds to complete the shutdown. Consider tuning the GracefulLeaseHandoffTimeoutMillis to prevent timeouts, if necessary.", leasePendingShutdown.lease.leaseKey(), Long.valueOf((getCurrentTimeMillis() - leasePendingShutdown.timeoutTimestampMillis) + this.shutdownTimeoutMillis));
        }
    }

    private void transferLeaseIfOwner(LeasePendingShutdown leasePendingShutdown) throws ProvisionedThroughputException, InvalidStateException, DependencyException {
        Lease lease = leasePendingShutdown.lease;
        if (this.leaseCoordinator.workerIdentifier().equals(lease.checkpointOwner())) {
            this.leaseCoordinator.leaseRefresher().assignLease(lease, lease.leaseOwner());
        } else {
            log.error("Lease {} checkpoint owner mismatch found {} but it should be {}", new Object[]{lease.leaseKey(), lease.checkpointOwner(), this.leaseCoordinator.workerIdentifier()});
        }
        leasePendingShutdown.leaseTransferCalled = true;
    }

    private long getCurrentTimeMillis() {
        return this.currentTimeSupplier.get().longValue();
    }

    public LeaseGracefulShutdownHandler(long j, ConcurrentMap<ShardInfo, ShardConsumer> concurrentMap, LeaseCoordinator leaseCoordinator, Supplier<Long> supplier, ScheduledExecutorService scheduledExecutorService) {
        this.shutdownTimeoutMillis = j;
        this.shardInfoShardConsumerMap = concurrentMap;
        this.leaseCoordinator = leaseCoordinator;
        this.currentTimeSupplier = supplier;
        this.executorService = scheduledExecutorService;
    }
}
