package org.apache.pulsar.bookie.rackawareness;

import java.lang.reflect.Field;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import org.apache.bookkeeper.client.DefaultBookieAddressResolver;
import org.apache.bookkeeper.client.ITopologyAwareEnsemblePlacementPolicy;
import org.apache.bookkeeper.client.RackChangeNotifier;
import org.apache.bookkeeper.discover.RegistrationClient;
import org.apache.bookkeeper.meta.exceptions.Code;
import org.apache.bookkeeper.meta.exceptions.MetadataException;
import org.apache.bookkeeper.net.AbstractDNSToSwitchMapping;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.net.BookieNode;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookieAddressResolver;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.policies.data.BookieInfo;
import org.apache.pulsar.common.policies.data.BookiesRackConfiguration;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.class */
public class BookieRackAffinityMapping extends AbstractDNSToSwitchMapping implements RackChangeNotifier {
    private static final Logger LOG = LoggerFactory.getLogger(BookieRackAffinityMapping.class);
    public static final String BOOKIE_INFO_ROOT_PATH = "/bookies";
    public static final String METADATA_STORE_INSTANCE = "METADATA_STORE_INSTANCE";
    private MetadataCache<BookiesRackConfiguration> bookieMappingCache = null;
    private ITopologyAwareEnsemblePlacementPolicy<BookieNode> rackawarePolicy = null;
    private List<BookieId> bookieAddressListLastTime = new ArrayList();
    private BookiesRackConfiguration racksWithHost = new BookiesRackConfiguration();
    private Map<String, BookieInfo> bookieInfoMap = new HashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    public static MetadataStore getMetadataStore(Configuration configuration) throws MetadataException {
        String replace;
        MetadataStore create;
        Object property = configuration.getProperty(METADATA_STORE_INSTANCE);
        if (property == null) {
            String castToString = ConfigurationStringUtil.castToString(configuration.getProperty("metadataServiceUri"));
            if (StringUtils.isNotBlank(castToString)) {
                try {
                    replace = castToString.replaceFirst("metadata-store:", "").replace(";", ",");
                } catch (Exception e) {
                    throw new MetadataException(Code.METADATA_SERVICE_ERROR, e);
                }
            } else {
                String castToString2 = ConfigurationStringUtil.castToString(configuration.getProperty("zkServers"));
                if (StringUtils.isBlank(castToString2)) {
                    throw new RuntimeException(String.format("Neither %s configuration set in the BK client configuration nor metadataServiceUri/zkServers set in bk server configuration", METADATA_STORE_INSTANCE));
                }
                replace = castToString2;
            }
            try {
                create = MetadataStoreExtended.create(replace, MetadataStoreConfig.builder().sessionTimeoutMillis(Integer.parseInt((String) configuration.getProperty("zkTimeout"))).build());
            } catch (MetadataStoreException e2) {
                throw new MetadataException(Code.METADATA_SERVICE_ERROR, e2);
            }
        } else {
            if (!(property instanceof MetadataStore)) {
                throw new RuntimeException("METADATA_STORE_INSTANCE is not an instance of MetadataStore");
            }
            create = (MetadataStore) property;
        }
        return create;
    }

    public synchronized void setConf(Configuration configuration) {
        super.setConf(configuration);
        try {
            MetadataStore metadataStore = getMetadataStore(configuration);
            this.bookieMappingCache = metadataStore.getMetadataCache(BookiesRackConfiguration.class);
            metadataStore.registerListener(this::handleUpdates);
            try {
                BookiesRackConfiguration bookiesRackConfiguration = (BookiesRackConfiguration) this.bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH).thenApply(optional -> {
                    return (BookiesRackConfiguration) optional.orElseGet(BookiesRackConfiguration::new);
                }).get();
                Iterator it = bookiesRackConfiguration.values().iterator();
                while (it.hasNext()) {
                    Iterator it2 = ((Map) it.next()).keySet().iterator();
                    while (it2.hasNext()) {
                        this.bookieAddressListLastTime.add(BookieId.parse((String) it2.next()));
                    }
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("BookieRackAffinityMapping init, bookieAddressListLastTime {}", this.bookieAddressListLastTime);
                    }
                }
                updateRacksWithHost(bookiesRackConfiguration);
                watchAvailableBookies();
            } catch (InterruptedException | ExecutionException e) {
                LOG.error("Failed to update rack info. ", e);
                throw new RuntimeException(e);
            }
        } catch (MetadataException e2) {
            throw new RuntimeException("METADATA_STORE_INSTANCE failed to init BookieId list");
        }
    }

    private void watchAvailableBookies() {
        BookieAddressResolver bookieAddressResolver = getBookieAddressResolver();
        if (bookieAddressResolver instanceof DefaultBookieAddressResolver) {
            try {
                Field declaredField = DefaultBookieAddressResolver.class.getDeclaredField("registrationClient");
                declaredField.setAccessible(true);
                ((RegistrationClient) declaredField.get(bookieAddressResolver)).watchWritableBookies(versioned -> {
                    this.bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH).thenApply(optional -> {
                        return (BookiesRackConfiguration) optional.orElseGet(BookiesRackConfiguration::new);
                    }).thenAccept(this::updateRacksWithHost).exceptionally(th -> {
                        LOG.error("Failed to update rack info. ", th);
                        return null;
                    });
                });
            } catch (IllegalAccessException | NoSuchFieldException e) {
                LOG.error("Failed watch available bookies.", e);
            }
        }
    }

    private synchronized void updateRacksWithHost(BookiesRackConfiguration bookiesRackConfiguration) {
        BookiesRackConfiguration bookiesRackConfiguration2 = new BookiesRackConfiguration();
        HashMap hashMap = new HashMap();
        bookiesRackConfiguration.forEach((str, map) -> {
            map.forEach((str, bookieInfo) -> {
                try {
                    BookieId parse = BookieId.parse(str);
                    BookieAddressResolver bookieAddressResolver = getBookieAddressResolver();
                    if (bookieAddressResolver == null) {
                        LOG.warn("Bookie address resolver not yet initialized, skipping resolution");
                    } else {
                        BookieSocketAddress resolve = bookieAddressResolver.resolve(parse);
                        bookiesRackConfiguration2.updateBookie(str, resolve.toString(), bookieInfo);
                        hashMap.put(resolve.getSocketAddress().getHostName(), bookieInfo);
                        InetAddress address = resolve.getSocketAddress().getAddress();
                        if (null != address) {
                            String hostAddress = address.getHostAddress();
                            if (null != hostAddress) {
                                hashMap.put(hostAddress, bookieInfo);
                            }
                        } else {
                            LOG.info("Network address for {} is unresolvable yet.", str);
                        }
                    }
                } catch (BookieAddressResolver.BookieIdNotResolvedException e) {
                    LOG.info("Network address for {} is unresolvable yet. error is {}", str, e);
                }
            });
        });
        this.racksWithHost = bookiesRackConfiguration2;
        this.bookieInfoMap = hashMap;
    }

    public synchronized List<String> resolve(List<String> list) {
        ArrayList arrayList = new ArrayList(list.size());
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(getRack(it.next()));
        }
        return arrayList;
    }

    private String getRack(String str) {
        BookieInfo bookieInfo = this.bookieInfoMap.get(str);
        if (bookieInfo == null) {
            bookieInfo = (BookieInfo) this.racksWithHost.getBookie(str).orElse(null);
        }
        if (bookieInfo == null || StringUtils.isEmpty(bookieInfo.getRack()) || bookieInfo.getRack().trim().equals("/")) {
            return null;
        }
        String rack = bookieInfo.getRack();
        if (!rack.startsWith("/")) {
            rack = "/" + rack;
        }
        return rack;
    }

    public String toString() {
        return "zk based bookie rack affinity mapping";
    }

    public void reloadCachedMappings() {
    }

    private void handleUpdates(Notification notification) {
        if (notification.getPath().equals(BOOKIE_INFO_ROOT_PATH)) {
            this.bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH).thenAccept(optional -> {
                HashSet hashSet = new HashSet();
                synchronized (this) {
                    LOG.info("Bookie rack info updated to {}. Notifying rackaware policy.", optional);
                    updateRacksWithHost((BookiesRackConfiguration) optional.orElseGet(BookiesRackConfiguration::new));
                    ArrayList arrayList = new ArrayList();
                    Iterator it = ((Collection) optional.map((v0) -> {
                        return v0.values();
                    }).orElse(Collections.emptyList())).iterator();
                    while (it.hasNext()) {
                        Iterator it2 = ((Map) it.next()).keySet().iterator();
                        while (it2.hasNext()) {
                            arrayList.add(BookieId.parse((String) it2.next()));
                        }
                    }
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Bookies with rack update from {} to {}", this.bookieAddressListLastTime, arrayList);
                    }
                    hashSet.addAll(arrayList);
                    hashSet.addAll(this.bookieAddressListLastTime);
                    this.bookieAddressListLastTime = arrayList;
                }
                if (this.rackawarePolicy != null) {
                    this.rackawarePolicy.onBookieRackChange(new ArrayList(hashSet));
                }
            });
        }
    }

    public void registerRackChangeListener(ITopologyAwareEnsemblePlacementPolicy<BookieNode> iTopologyAwareEnsemblePlacementPolicy) {
        this.rackawarePolicy = iTopologyAwareEnsemblePlacementPolicy;
    }
}
