package software.amazon.kinesis.coordinator;

import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.annotations.ThreadSafe;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.coordinator.MigrationAdaptiveLeaseAssignmentModeProvider;
import software.amazon.kinesis.coordinator.assignment.LeaseAssignmentManager;
import software.amazon.kinesis.coordinator.migration.ClientVersion;
import software.amazon.kinesis.leader.DynamoDBLockBasedLeaderDecider;
import software.amazon.kinesis.leader.MigrationAdaptiveLeaderDecider;
import software.amazon.kinesis.leases.LeaseManagementConfig;
import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.worker.metricstats.WorkerMetricStatsDAO;
import software.amazon.kinesis.worker.metricstats.WorkerMetricStatsManager;
import software.amazon.kinesis.worker.metricstats.WorkerMetricStatsReporter;

@ThreadSafe
@KinesisClientInternalApi
/* loaded from: input_file:software/amazon/kinesis/coordinator/DynamicMigrationComponentsInitializer.class */
public final class DynamicMigrationComponentsInitializer {
    private static final Logger log = LoggerFactory.getLogger(DynamicMigrationComponentsInitializer.class);
    private static final long SCHEDULER_SHUTDOWN_TIMEOUT_SECONDS = 60;
    private final MetricsFactory metricsFactory;
    private final LeaseRefresher leaseRefresher;
    private final CoordinatorStateDAO coordinatorStateDAO;
    private final ScheduledExecutorService workerMetricsThreadPool;
    private final WorkerMetricStatsDAO workerMetricsDAO;
    private final WorkerMetricStatsManager workerMetricsManager;
    private final ScheduledExecutorService lamThreadPool;
    private final BiFunction<ScheduledExecutorService, LeaderDecider, LeaseAssignmentManager> lamCreator;
    private final Supplier<MigrationAdaptiveLeaderDecider> adaptiveLeaderDeciderCreator;
    private final Supplier<DeterministicShuffleShardSyncLeaderDecider> deterministicLeaderDeciderCreator;
    private final Supplier<DynamoDBLockBasedLeaderDecider> ddbLockBasedLeaderDeciderCreator;
    private final String workerIdentifier;
    private final LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig workerUtilizationAwareAssignmentConfig;
    private final long workerMetricsExpirySeconds;
    private final MigrationAdaptiveLeaseAssignmentModeProvider leaseModeChangeConsumer;
    private LeaderDecider leaderDecider;
    private LeaseAssignmentManager leaseAssignmentManager;
    private ScheduledFuture<?> workerMetricsReporterFuture;
    private MigrationAdaptiveLeaseAssignmentModeProvider.LeaseAssignmentMode currentAssignmentMode;
    private boolean dualMode;
    private boolean initialized;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:software/amazon/kinesis/coordinator/DynamicMigrationComponentsInitializer$DynamicMigrationComponentsInitializerBuilder.class */
    public static class DynamicMigrationComponentsInitializerBuilder {
        private MetricsFactory metricsFactory;
        private LeaseRefresher leaseRefresher;
        private CoordinatorStateDAO coordinatorStateDAO;
        private ScheduledExecutorService workerMetricsThreadPool;
        private WorkerMetricStatsDAO workerMetricsDAO;
        private WorkerMetricStatsManager workerMetricsManager;
        private ScheduledExecutorService lamThreadPool;
        private BiFunction<ScheduledExecutorService, LeaderDecider, LeaseAssignmentManager> lamCreator;
        private Supplier<MigrationAdaptiveLeaderDecider> adaptiveLeaderDeciderCreator;
        private Supplier<DeterministicShuffleShardSyncLeaderDecider> deterministicLeaderDeciderCreator;
        private Supplier<DynamoDBLockBasedLeaderDecider> ddbLockBasedLeaderDeciderCreator;
        private String workerIdentifier;
        private LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig workerUtilizationAwareAssignmentConfig;
        private MigrationAdaptiveLeaseAssignmentModeProvider leaseAssignmentModeProvider;

        DynamicMigrationComponentsInitializerBuilder() {
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public DynamicMigrationComponentsInitializerBuilder metricsFactory(MetricsFactory metricsFactory) {
            this.metricsFactory = metricsFactory;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public DynamicMigrationComponentsInitializerBuilder leaseRefresher(LeaseRefresher leaseRefresher) {
            this.leaseRefresher = leaseRefresher;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public DynamicMigrationComponentsInitializerBuilder coordinatorStateDAO(CoordinatorStateDAO coordinatorStateDAO) {
            this.coordinatorStateDAO = coordinatorStateDAO;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public DynamicMigrationComponentsInitializerBuilder workerMetricsThreadPool(ScheduledExecutorService scheduledExecutorService) {
            this.workerMetricsThreadPool = scheduledExecutorService;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public DynamicMigrationComponentsInitializerBuilder workerMetricsDAO(WorkerMetricStatsDAO workerMetricStatsDAO) {
            this.workerMetricsDAO = workerMetricStatsDAO;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public DynamicMigrationComponentsInitializerBuilder workerMetricsManager(WorkerMetricStatsManager workerMetricStatsManager) {
            this.workerMetricsManager = workerMetricStatsManager;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public DynamicMigrationComponentsInitializerBuilder lamThreadPool(ScheduledExecutorService scheduledExecutorService) {
            this.lamThreadPool = scheduledExecutorService;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public DynamicMigrationComponentsInitializerBuilder lamCreator(BiFunction<ScheduledExecutorService, LeaderDecider, LeaseAssignmentManager> biFunction) {
            this.lamCreator = biFunction;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public DynamicMigrationComponentsInitializerBuilder adaptiveLeaderDeciderCreator(Supplier<MigrationAdaptiveLeaderDecider> supplier) {
            this.adaptiveLeaderDeciderCreator = supplier;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public DynamicMigrationComponentsInitializerBuilder deterministicLeaderDeciderCreator(Supplier<DeterministicShuffleShardSyncLeaderDecider> supplier) {
            this.deterministicLeaderDeciderCreator = supplier;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public DynamicMigrationComponentsInitializerBuilder ddbLockBasedLeaderDeciderCreator(Supplier<DynamoDBLockBasedLeaderDecider> supplier) {
            this.ddbLockBasedLeaderDeciderCreator = supplier;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public DynamicMigrationComponentsInitializerBuilder workerIdentifier(String str) {
            this.workerIdentifier = str;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public DynamicMigrationComponentsInitializerBuilder workerUtilizationAwareAssignmentConfig(LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig workerUtilizationAwareAssignmentConfig) {
            this.workerUtilizationAwareAssignmentConfig = workerUtilizationAwareAssignmentConfig;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public DynamicMigrationComponentsInitializerBuilder leaseAssignmentModeProvider(MigrationAdaptiveLeaseAssignmentModeProvider migrationAdaptiveLeaseAssignmentModeProvider) {
            this.leaseAssignmentModeProvider = migrationAdaptiveLeaseAssignmentModeProvider;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public DynamicMigrationComponentsInitializer build() {
            return new DynamicMigrationComponentsInitializer(this.metricsFactory, this.leaseRefresher, this.coordinatorStateDAO, this.workerMetricsThreadPool, this.workerMetricsDAO, this.workerMetricsManager, this.lamThreadPool, this.lamCreator, this.adaptiveLeaderDeciderCreator, this.deterministicLeaderDeciderCreator, this.ddbLockBasedLeaderDeciderCreator, this.workerIdentifier, this.workerUtilizationAwareAssignmentConfig, this.leaseAssignmentModeProvider);
        }

        public String toString() {
            return "DynamicMigrationComponentsInitializer.DynamicMigrationComponentsInitializerBuilder(metricsFactory=" + this.metricsFactory + ", leaseRefresher=" + this.leaseRefresher + ", coordinatorStateDAO=" + this.coordinatorStateDAO + ", workerMetricsThreadPool=" + this.workerMetricsThreadPool + ", workerMetricsDAO=" + this.workerMetricsDAO + ", workerMetricsManager=" + this.workerMetricsManager + ", lamThreadPool=" + this.lamThreadPool + ", lamCreator=" + this.lamCreator + ", adaptiveLeaderDeciderCreator=" + this.adaptiveLeaderDeciderCreator + ", deterministicLeaderDeciderCreator=" + this.deterministicLeaderDeciderCreator + ", ddbLockBasedLeaderDeciderCreator=" + this.ddbLockBasedLeaderDeciderCreator + ", workerIdentifier=" + this.workerIdentifier + ", workerUtilizationAwareAssignmentConfig=" + this.workerUtilizationAwareAssignmentConfig + ", leaseAssignmentModeProvider=" + this.leaseAssignmentModeProvider + ")";
        }
    }

    DynamicMigrationComponentsInitializer(MetricsFactory metricsFactory, LeaseRefresher leaseRefresher, CoordinatorStateDAO coordinatorStateDAO, ScheduledExecutorService scheduledExecutorService, WorkerMetricStatsDAO workerMetricStatsDAO, WorkerMetricStatsManager workerMetricStatsManager, ScheduledExecutorService scheduledExecutorService2, BiFunction<ScheduledExecutorService, LeaderDecider, LeaseAssignmentManager> biFunction, Supplier<MigrationAdaptiveLeaderDecider> supplier, Supplier<DeterministicShuffleShardSyncLeaderDecider> supplier2, Supplier<DynamoDBLockBasedLeaderDecider> supplier3, String str, LeaseManagementConfig.WorkerUtilizationAwareAssignmentConfig workerUtilizationAwareAssignmentConfig, MigrationAdaptiveLeaseAssignmentModeProvider migrationAdaptiveLeaseAssignmentModeProvider) {
        this.metricsFactory = metricsFactory;
        this.leaseRefresher = leaseRefresher;
        this.coordinatorStateDAO = coordinatorStateDAO;
        this.workerIdentifier = str;
        this.workerUtilizationAwareAssignmentConfig = workerUtilizationAwareAssignmentConfig;
        this.workerMetricsExpirySeconds = Duration.ofMillis(2 * workerUtilizationAwareAssignmentConfig.workerMetricsReporterFreqInMillis()).getSeconds();
        this.workerMetricsManager = workerMetricStatsManager;
        this.workerMetricsDAO = workerMetricStatsDAO;
        this.workerMetricsThreadPool = scheduledExecutorService;
        this.lamThreadPool = scheduledExecutorService2;
        this.lamCreator = biFunction;
        this.adaptiveLeaderDeciderCreator = supplier;
        this.deterministicLeaderDeciderCreator = supplier2;
        this.ddbLockBasedLeaderDeciderCreator = supplier3;
        this.leaseModeChangeConsumer = migrationAdaptiveLeaseAssignmentModeProvider;
    }

    public void initialize(ClientVersion clientVersion) throws DependencyException {
        if (this.initialized) {
            log.info("Already initialized, nothing to do");
            return;
        }
        log.info("Start collection of WorkerMetricStats");
        this.workerMetricsManager.startManager();
        if (clientVersion == ClientVersion.CLIENT_VERSION_3X) {
            initializeComponentsFor3x();
        } else {
            initializeComponentsForMigration(clientVersion);
        }
        log.info("Initialized dual mode {} current assignment mode {}", Boolean.valueOf(this.dualMode), this.currentAssignmentMode);
        log.info("Creating LAM");
        this.leaseAssignmentManager = this.lamCreator.apply(this.lamThreadPool, this.leaderDecider);
        log.info("Initializing {}", this.leaseModeChangeConsumer.getClass().getSimpleName());
        this.leaseModeChangeConsumer.initialize(this.dualMode, this.currentAssignmentMode);
        this.initialized = true;
    }

    private void initializeComponentsFor3x() {
        log.info("Initializing for 3x functionality");
        this.dualMode = false;
        this.currentAssignmentMode = MigrationAdaptiveLeaseAssignmentModeProvider.LeaseAssignmentMode.WORKER_UTILIZATION_AWARE_ASSIGNMENT;
        log.info("Initializing dualMode {} assignmentMode {}", Boolean.valueOf(this.dualMode), this.currentAssignmentMode);
        this.leaderDecider = this.ddbLockBasedLeaderDeciderCreator.get();
        log.info("Initializing {}", this.leaderDecider.getClass().getSimpleName());
        this.leaderDecider.initialize();
    }

    private void initializeComponentsForMigration(ClientVersion clientVersion) {
        LeaderDecider leaderDecider;
        log.info("Initializing for migration to 3x");
        this.dualMode = true;
        if (clientVersion == ClientVersion.CLIENT_VERSION_3X_WITH_ROLLBACK) {
            this.currentAssignmentMode = MigrationAdaptiveLeaseAssignmentModeProvider.LeaseAssignmentMode.WORKER_UTILIZATION_AWARE_ASSIGNMENT;
            leaderDecider = this.ddbLockBasedLeaderDeciderCreator.get();
        } else {
            this.currentAssignmentMode = MigrationAdaptiveLeaseAssignmentModeProvider.LeaseAssignmentMode.DEFAULT_LEASE_COUNT_BASED_ASSIGNMENT;
            leaderDecider = this.deterministicLeaderDeciderCreator.get();
        }
        log.info("Initializing dualMode {} assignmentMode {}", Boolean.valueOf(this.dualMode), this.currentAssignmentMode);
        MigrationAdaptiveLeaderDecider migrationAdaptiveLeaderDecider = this.adaptiveLeaderDeciderCreator.get();
        log.info("Initializing MigrationAdaptiveLeaderDecider with {}", leaderDecider.getClass().getSimpleName());
        migrationAdaptiveLeaderDecider.updateLeaderDecider(leaderDecider);
        this.leaderDecider = migrationAdaptiveLeaderDecider;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void shutdown() {
        log.info("Shutting down components");
        if (this.initialized) {
            log.info("Stopping LAM, LeaderDecider, workerMetrics reporting and collection");
            this.leaseAssignmentManager.stop();
            stopWorkerMetricsReporter();
            this.workerMetricsManager.stopManager();
        }
        log.info("Shutting down lamThreadPool and workerMetrics reporter thread pool");
        this.lamThreadPool.shutdown();
        this.workerMetricsThreadPool.shutdown();
        try {
            if (!this.lamThreadPool.awaitTermination(SCHEDULER_SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
                log.info("LamThreadPool did not shutdown in {}s, forcefully shutting down", Long.valueOf(SCHEDULER_SHUTDOWN_TIMEOUT_SECONDS));
                this.lamThreadPool.shutdownNow();
            }
        } catch (InterruptedException e) {
            log.warn("Interrupted while waiting for shutdown of LeaseAssignmentManager ThreadPool", e);
            this.lamThreadPool.shutdownNow();
        }
        try {
            if (!this.workerMetricsThreadPool.awaitTermination(SCHEDULER_SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
                log.info("WorkerMetricsThreadPool did not shutdown in {}s, forcefully shutting down", Long.valueOf(SCHEDULER_SHUTDOWN_TIMEOUT_SECONDS));
                this.workerMetricsThreadPool.shutdownNow();
            }
        } catch (InterruptedException e2) {
            Thread.currentThread().interrupt();
            log.warn("Interrupted while waiting for shutdown of WorkerMetricStatsManager ThreadPool", e2);
            this.workerMetricsThreadPool.shutdownNow();
        }
    }

    private void startWorkerMetricsReporting() throws DependencyException {
        if (this.workerMetricsReporterFuture != null) {
            log.info("Worker metrics reporting is already running...");
            return;
        }
        log.info("Initializing WorkerMetricStats");
        this.workerMetricsDAO.initialize();
        log.info("Starting worker metrics reporter");
        this.workerMetricsReporterFuture = this.workerMetricsThreadPool.scheduleAtFixedRate(new WorkerMetricStatsReporter(this.metricsFactory, this.workerIdentifier, this.workerMetricsManager, this.workerMetricsDAO), this.workerUtilizationAwareAssignmentConfig.inMemoryWorkerMetricsCaptureFrequencyMillis() * 2, this.workerUtilizationAwareAssignmentConfig.workerMetricsReporterFreqInMillis(), TimeUnit.MILLISECONDS);
    }

    private void stopWorkerMetricsReporter() {
        log.info("Stopping worker metrics reporter");
        if (this.workerMetricsReporterFuture != null) {
            this.workerMetricsReporterFuture.cancel(false);
            this.workerMetricsReporterFuture = null;
        }
    }

    private void createGsi(boolean z) throws DependencyException {
        log.info("Creating Lease table GSI if it does not exist");
        this.leaseRefresher.createLeaseOwnerToLeaseKeyIndexIfNotExists();
        if (z) {
            log.info("Waiting for Lease table GSI creation");
            if (!this.leaseRefresher.waitUntilLeaseOwnerToLeaseKeyIndexExists(10L, 600L)) {
                throw new DependencyException(new IllegalStateException("Creating LeaseOwnerToLeaseKeyIndex on Lease table timed out"));
            }
        }
    }

    public synchronized void initializeClientVersionForUpgradeFrom2x(ClientVersion clientVersion) throws DependencyException {
        log.info("Initializing KCL components for upgrade from 2x from {}", clientVersion);
        createGsi(false);
        startWorkerMetricsReporting();
    }

    public synchronized void initializeClientVersionFor3x(ClientVersion clientVersion) throws DependencyException {
        log.info("Initializing KCL components for 3x from {}", clientVersion);
        log.info("Initializing LeaseAssignmentManager, DDB-lock-based leader decider, WorkerMetricStats manager and creating the Lease table GSI if it does not exist");
        if (clientVersion == ClientVersion.CLIENT_VERSION_INIT) {
            createGsi(true);
            startWorkerMetricsReporting();
            log.info("Starting LAM");
            this.leaseAssignmentManager.start();
        }
    }

    public synchronized void initializeClientVersionFor2x(ClientVersion clientVersion) {
        log.info("Initializing KCL components for rollback to 2x from {}", clientVersion);
        if (clientVersion != ClientVersion.CLIENT_VERSION_INIT) {
            stopWorkerMetricsReporter();
        }
        if (clientVersion == ClientVersion.CLIENT_VERSION_3X_WITH_ROLLBACK) {
            this.currentAssignmentMode = MigrationAdaptiveLeaseAssignmentModeProvider.LeaseAssignmentMode.DEFAULT_LEASE_COUNT_BASED_ASSIGNMENT;
            notifyLeaseAssignmentModeChange();
            log.info("Stopping LAM");
            this.leaseAssignmentManager.stop();
            DeterministicShuffleShardSyncLeaderDecider deterministicShuffleShardSyncLeaderDecider = this.deterministicLeaderDeciderCreator.get();
            if (!(this.leaderDecider instanceof MigrationAdaptiveLeaderDecider)) {
                throw new IllegalStateException(String.format("Unexpected leader decider %s", this.leaderDecider));
            }
            log.info("Updating LeaderDecider to {}", deterministicShuffleShardSyncLeaderDecider.getClass().getSimpleName());
            ((MigrationAdaptiveLeaderDecider) this.leaderDecider).updateLeaderDecider(deterministicShuffleShardSyncLeaderDecider);
        }
    }

    public synchronized void initializeClientVersionFor3xWithRollback(ClientVersion clientVersion) throws DependencyException {
        log.info("Initializing KCL components for 3x with rollback from {}", clientVersion);
        if (clientVersion == ClientVersion.CLIENT_VERSION_UPGRADE_FROM_2X) {
            this.currentAssignmentMode = MigrationAdaptiveLeaseAssignmentModeProvider.LeaseAssignmentMode.WORKER_UTILIZATION_AWARE_ASSIGNMENT;
            notifyLeaseAssignmentModeChange();
            DynamoDBLockBasedLeaderDecider dynamoDBLockBasedLeaderDecider = this.ddbLockBasedLeaderDeciderCreator.get();
            log.info("Updating LeaderDecider to {}", dynamoDBLockBasedLeaderDecider.getClass().getSimpleName());
            ((MigrationAdaptiveLeaderDecider) this.leaderDecider).updateLeaderDecider(dynamoDBLockBasedLeaderDecider);
        } else {
            startWorkerMetricsReporting();
        }
        log.info("Starting LAM");
        this.leaseAssignmentManager.start();
    }

    private void notifyLeaseAssignmentModeChange() {
        if (!this.dualMode) {
            throw new IllegalStateException("Unexpected assignment mode change");
        }
        log.info("Notifying {} of {}", this.leaseModeChangeConsumer, this.currentAssignmentMode);
        if (Objects.nonNull(this.leaseModeChangeConsumer)) {
            try {
                this.leaseModeChangeConsumer.updateLeaseAssignmentMode(this.currentAssignmentMode);
            } catch (Exception e) {
                log.warn("LeaseAssignmentMode change consumer threw exception", e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static DynamicMigrationComponentsInitializerBuilder builder() {
        return new DynamicMigrationComponentsInitializerBuilder();
    }

    public MetricsFactory metricsFactory() {
        return this.metricsFactory;
    }

    public LeaseRefresher leaseRefresher() {
        return this.leaseRefresher;
    }

    public WorkerMetricStatsDAO workerMetricsDAO() {
        return this.workerMetricsDAO;
    }

    public String workerIdentifier() {
        return this.workerIdentifier;
    }

    public long workerMetricsExpirySeconds() {
        return this.workerMetricsExpirySeconds;
    }

    public LeaderDecider leaderDecider() {
        return this.leaderDecider;
    }
}
