package org.apache.pulsar.broker;

import com.github.benmanes.caffeine.cache.AsyncCache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.annotations.VisibleForTesting;
import io.netty.channel.EventLoopGroup;
import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.RejectedExecutionException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.stats.NullStatsProvider;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.stats.StatsProvider;
import org.apache.pulsar.broker.stats.prometheus.metrics.PrometheusMetricsProvider;
import org.apache.pulsar.broker.storage.ManagedLedgerStorage;
import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/ManagedLedgerClientFactory.class */
public class ManagedLedgerClientFactory implements ManagedLedgerStorage {
    private static final Logger log = LoggerFactory.getLogger(ManagedLedgerClientFactory.class);
    private ManagedLedgerFactory managedLedgerFactory;
    private BookKeeper defaultBkClient;
    private final AsyncCache<EnsemblePlacementPolicyConfig, BookKeeper> bkEnsemblePolicyToBkClientMap = Caffeine.newBuilder().buildAsync();
    private StatsProvider statsProvider = new NullStatsProvider();

    @Override // org.apache.pulsar.broker.storage.ManagedLedgerStorage
    public void initialize(ServiceConfiguration serviceConfiguration, MetadataStoreExtended metadataStoreExtended, BookKeeperClientFactory bookKeeperClientFactory, EventLoopGroup eventLoopGroup) throws Exception {
        ManagedLedgerFactoryConfig managedLedgerFactoryConfig = new ManagedLedgerFactoryConfig();
        managedLedgerFactoryConfig.setMaxCacheSize(serviceConfiguration.getManagedLedgerCacheSizeMB() * 1024 * 1024);
        managedLedgerFactoryConfig.setCacheEvictionWatermark(serviceConfiguration.getManagedLedgerCacheEvictionWatermark());
        managedLedgerFactoryConfig.setNumManagedLedgerSchedulerThreads(serviceConfiguration.getManagedLedgerNumSchedulerThreads());
        managedLedgerFactoryConfig.setCacheEvictionIntervalMs(serviceConfiguration.getManagedLedgerCacheEvictionIntervalMs());
        managedLedgerFactoryConfig.setCacheEvictionTimeThresholdMillis(serviceConfiguration.getManagedLedgerCacheEvictionTimeThresholdMillis());
        managedLedgerFactoryConfig.setCopyEntriesInCache(serviceConfiguration.isManagedLedgerCacheCopyEntries());
        long managedLedgerMaxReadsInFlightSizeInMB = serviceConfiguration.getManagedLedgerMaxReadsInFlightSizeInMB() * 1024 * 1024;
        if (managedLedgerMaxReadsInFlightSizeInMB > 0 && serviceConfiguration.getDispatcherMaxReadSizeBytes() > 0 && managedLedgerMaxReadsInFlightSizeInMB < serviceConfiguration.getDispatcherMaxReadSizeBytes()) {
            log.warn("Invalid configuration for managedLedgerMaxReadsInFlightSizeInMB: {}, dispatcherMaxReadSizeBytes: {}. managedLedgerMaxReadsInFlightSizeInMB in bytes should be greater than dispatcherMaxReadSizeBytes. You should set managedLedgerMaxReadsInFlightSizeInMB to at least {}", new Object[]{Long.valueOf(serviceConfiguration.getManagedLedgerMaxReadsInFlightSizeInMB()), Integer.valueOf(serviceConfiguration.getDispatcherMaxReadSizeBytes()), Long.valueOf((serviceConfiguration.getDispatcherMaxReadSizeBytes() / 1048576) + 1)});
        }
        managedLedgerFactoryConfig.setManagedLedgerMaxReadsInFlightSize(managedLedgerMaxReadsInFlightSizeInMB);
        managedLedgerFactoryConfig.setManagedLedgerMaxReadsInFlightPermitsAcquireTimeoutMillis(serviceConfiguration.getManagedLedgerMaxReadsInFlightPermitsAcquireTimeoutMillis());
        managedLedgerFactoryConfig.setManagedLedgerMaxReadsInFlightPermitsAcquireQueueSize(serviceConfiguration.getManagedLedgerMaxReadsInFlightPermitsAcquireQueueSize());
        managedLedgerFactoryConfig.setPrometheusStatsLatencyRolloverSeconds(serviceConfiguration.getManagedLedgerPrometheusStatsLatencyRolloverSeconds());
        managedLedgerFactoryConfig.setTraceTaskExecution(serviceConfiguration.isManagedLedgerTraceTaskExecution());
        managedLedgerFactoryConfig.setCursorPositionFlushSeconds(serviceConfiguration.getManagedLedgerCursorPositionFlushSeconds());
        managedLedgerFactoryConfig.setManagedLedgerInfoCompressionType(serviceConfiguration.getManagedLedgerInfoCompressionType());
        managedLedgerFactoryConfig.setStatsPeriodSeconds(serviceConfiguration.getManagedLedgerStatsPeriodSeconds());
        managedLedgerFactoryConfig.setManagedCursorInfoCompressionType(serviceConfiguration.getManagedCursorInfoCompressionType());
        ClientConfiguration clientConfiguration = new ClientConfiguration();
        if (serviceConfiguration.isBookkeeperClientExposeStatsToPrometheus()) {
            clientConfiguration.addProperty(PrometheusMetricsProvider.PROMETHEUS_STATS_LATENCY_ROLLOVER_SECONDS, Integer.valueOf(serviceConfiguration.getManagedLedgerPrometheusStatsLatencyRolloverSeconds()));
            clientConfiguration.addProperty(PrometheusMetricsProvider.CLUSTER_NAME, serviceConfiguration.getClusterName());
            this.statsProvider = new PrometheusMetricsProvider();
        }
        this.statsProvider.start(clientConfiguration);
        StatsLogger statsLogger = this.statsProvider.getStatsLogger("pulsar_managedLedger_client");
        this.defaultBkClient = bookKeeperClientFactory.create(serviceConfiguration, metadataStoreExtended, eventLoopGroup, Optional.empty(), null, statsLogger).get();
        this.managedLedgerFactory = new ManagedLedgerFactoryImpl(metadataStoreExtended, ensemblePlacementPolicyConfig -> {
            return (ensemblePlacementPolicyConfig == null || ensemblePlacementPolicyConfig.getPolicyClass() == null) ? CompletableFuture.completedFuture(this.defaultBkClient) : this.bkEnsemblePolicyToBkClientMap.get(ensemblePlacementPolicyConfig, (ensemblePlacementPolicyConfig, executor) -> {
                return bookKeeperClientFactory.create(serviceConfiguration, metadataStoreExtended, eventLoopGroup, Optional.ofNullable(ensemblePlacementPolicyConfig.getPolicyClass()), ensemblePlacementPolicyConfig.getProperties(), statsLogger);
            });
        }, managedLedgerFactoryConfig, statsLogger);
    }

    @Override // org.apache.pulsar.broker.storage.ManagedLedgerStorage
    public ManagedLedgerFactory getManagedLedgerFactory() {
        return this.managedLedgerFactory;
    }

    @Override // org.apache.pulsar.broker.storage.ManagedLedgerStorage
    public BookKeeper getBookKeeperClient() {
        return this.defaultBkClient;
    }

    @Override // org.apache.pulsar.broker.storage.ManagedLedgerStorage
    public StatsProvider getStatsProvider() {
        return this.statsProvider;
    }

    @VisibleForTesting
    public Map<EnsemblePlacementPolicyConfig, BookKeeper> getBkEnsemblePolicyToBookKeeperMap() {
        return this.bkEnsemblePolicyToBkClientMap.synchronous().asMap();
    }

    @Override // org.apache.pulsar.broker.storage.ManagedLedgerStorage, java.lang.AutoCloseable
    public void close() throws IOException {
        try {
            if (null != this.managedLedgerFactory) {
                this.managedLedgerFactory.shutdown();
                log.info("Closed managed ledger factory");
            }
            if (null != this.statsProvider) {
                this.statsProvider.stop();
            }
            try {
                if (null != this.defaultBkClient) {
                    this.defaultBkClient.close();
                }
            } catch (RejectedExecutionException e) {
                log.warn("Encountered exceptions on closing bookkeeper client", e);
            }
            this.bkEnsemblePolicyToBkClientMap.synchronous().asMap().forEach((ensemblePlacementPolicyConfig, bookKeeper) -> {
                if (bookKeeper != null) {
                    try {
                        bookKeeper.close();
                    } catch (Exception e2) {
                        log.warn("Failed to close bookkeeper-client for policy {}", ensemblePlacementPolicyConfig, e2);
                    }
                }
            });
            log.info("Closed BookKeeper client");
        } catch (Exception e2) {
            log.warn(e2.getMessage(), e2);
            throw new IOException(e2);
        }
    }
}
