package io.confluent.kafka.multitenant;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.confluent.kafka.clients.CloudAdmin;
import io.confluent.kafka.clients.DescribeTenantsOptions;
import io.confluent.kafka.multitenant.schema.TenantContext;
import io.confluent.kafka.multitenant.utils.Utils;
import io.confluent.kafka.server.plugins.policy.TopicPolicyConfig;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.DeleteClusterLinksOptions;
import org.apache.kafka.clients.admin.DescribeAclsOptions;
import org.apache.kafka.clients.admin.ListClusterLinksOptions;
import org.apache.kafka.clients.admin.ListTopicsOptions;
import org.apache.kafka.common.acl.AccessControlEntryFilter;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.errors.ClusterLinkDisabledException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePatternFilter;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/kafka/multitenant/TenantLifecycleManager.class */
public class TenantLifecycleManager {
    private static final int LIST_METADATA_TIMEOUT_MS = 30000;
    private Long deleteDelayMs;
    private int deleteBatchSize;
    private ExecutorService deletionExecutor;
    private CloudAdmin adminClient;
    private AtomicBoolean adminClientCreated;
    private final Time time;
    final Map<String, State> tenantLifecycleState;
    private final boolean isCellsEnabled;
    private static final Long CLOSE_TIMEOUT_MS = Long.valueOf(TimeUnit.SECONDS.toMillis(30));
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) TenantLifecycleManager.class);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/confluent/kafka/multitenant/TenantLifecycleManager$State.class */
    public enum State {
        ACTIVE,
        DEACTIVATED,
        DELETE_IN_PROGRESS,
        DELETED
    }

    public TenantLifecycleManager(Map<String, ?> map, Time time) {
        this.adminClientCreated = new AtomicBoolean(false);
        this.tenantLifecycleState = new ConcurrentHashMap();
        this.time = time;
        Object obj = map.get("multitenant.tenant.delete.delay");
        if (obj == null) {
            this.deleteDelayMs = ConfluentConfigs.MULTITENANT_TENANT_DELETE_DELAY_MS_DEFAULT;
        } else {
            this.deleteDelayMs = Long.valueOf(((Long) obj).longValue());
        }
        Object obj2 = map.get("multitenant.tenant.delete.batch.size");
        if (obj2 == null) {
            this.deleteBatchSize = ConfluentConfigs.MULTITENANT_TENANT_DELETE_BATCH_SIZE_DEFAULT.intValue();
        } else {
            this.deleteBatchSize = ((Integer) obj2).intValue();
        }
        Object obj3 = map.get("confluent.cells.enable");
        if (obj3 == null) {
            this.isCellsEnabled = false;
        } else {
            this.isCellsEnabled = ((Boolean) obj3).booleanValue();
        }
    }

    public void createAdminClient(Map<String, Object> map) {
        this.adminClient = Utils.createCloudAdmin(map);
        if (this.adminClient == null) {
            LOG.error("We will mark clusters as deleted but will not be able to delete topics and ACLs because we failed to create admin client.");
        } else {
            this.adminClientCreated.compareAndSet(false, true);
        }
        this.deletionExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("tenant-topic-deletion-thread-%d").build());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public TenantLifecycleManager(long j, CloudAdmin cloudAdmin) {
        this(j, true, cloudAdmin, Time.SYSTEM);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public TenantLifecycleManager(long j, boolean z, CloudAdmin cloudAdmin, Time time) {
        this.adminClientCreated = new AtomicBoolean(false);
        this.tenantLifecycleState = new ConcurrentHashMap();
        this.time = time;
        this.deleteDelayMs = Long.valueOf(j);
        this.isCellsEnabled = z;
        this.adminClient = cloudAdmin;
        if (this.adminClient != null) {
            this.adminClientCreated.compareAndSet(false, true);
        }
        this.deleteBatchSize = ConfluentConfigs.MULTITENANT_TENANT_DELETE_BATCH_SIZE_DEFAULT.intValue();
        this.deletionExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("tenant-topic-deletion-thread-%d").build());
    }

    public synchronized void updateTenantState(KafkaLogicalClusterMetadata kafkaLogicalClusterMetadata) {
        String logicalClusterId = kafkaLogicalClusterMetadata.logicalClusterId();
        if (kafkaLogicalClusterMetadata.isActive()) {
            State put = this.tenantLifecycleState.put(logicalClusterId, State.ACTIVE);
            if (put == State.DEACTIVATED) {
                LOG.info("Tenant {} was reactivated and will not be deleted", logicalClusterId);
                return;
            } else {
                if (put == State.DELETE_IN_PROGRESS || put == State.DELETED) {
                    LOG.warn("Attempted to reactive tenant {} but it was already deleted.", logicalClusterId);
                    return;
                }
                return;
            }
        }
        if (shouldDelete(kafkaLogicalClusterMetadata)) {
            this.tenantLifecycleState.put(logicalClusterId, State.DELETE_IN_PROGRESS);
            LOG.warn("Tenant {} was marked for immediate deletion.", logicalClusterId);
        } else if (shouldDeactivate(kafkaLogicalClusterMetadata)) {
            this.tenantLifecycleState.put(logicalClusterId, State.DEACTIVATED);
            LOG.warn("Tenant {} was deactivated and will be deleted in {}.", logicalClusterId, Duration.ofMillis(this.deleteDelayMs.longValue()));
        }
    }

    public boolean isClusterActive(String str) {
        return this.tenantLifecycleState.get(str) == State.ACTIVE;
    }

    public Set<String> inactiveClusters() {
        return (Set) this.tenantLifecycleState.entrySet().stream().filter(entry -> {
            return entry.getValue() != State.ACTIVE;
        }).map(entry2 -> {
            return (String) entry2.getKey();
        }).collect(Collectors.toSet());
    }

    public Set<String> deactivatedClusters() {
        return (Set) this.tenantLifecycleState.entrySet().stream().filter(entry -> {
            return entry.getValue() == State.DEACTIVATED;
        }).map(entry2 -> {
            return (String) entry2.getKey();
        }).collect(Collectors.toSet());
    }

    Set<String> deleteInProgressClusters() {
        return (Set) this.tenantLifecycleState.entrySet().stream().filter(entry -> {
            return entry.getValue() == State.DELETE_IN_PROGRESS;
        }).map(entry2 -> {
            return (String) entry2.getKey();
        }).collect(Collectors.toSet());
    }

    public Set<String> fullyDeletedClusters() {
        return (Set) this.tenantLifecycleState.entrySet().stream().filter(entry -> {
            return entry.getValue() == State.DELETED;
        }).map(entry2 -> {
            return (String) entry2.getKey();
        }).collect(Collectors.toSet());
    }

    public Set<String> deletedClusters() {
        return (Set) this.tenantLifecycleState.entrySet().stream().filter(entry -> {
            return entry.getValue() == State.DELETE_IN_PROGRESS || entry.getValue() == State.DELETED;
        }).map(entry2 -> {
            return (String) entry2.getKey();
        }).collect(Collectors.toSet());
    }

    public synchronized void deleteTenants() {
        if (this.adminClientCreated.get()) {
            Set<String> deleteInProgressClusters = deleteInProgressClusters();
            if (deleteInProgressClusters.isEmpty()) {
                return;
            }
            LOG.info("Deleting tenants in: {}", deleteInProgressClusters);
            Set<String> deleteClusterLinks = deleteClusterLinks(deleteInProgressClusters);
            Iterator<String> it = deleteTenantCellMetadata(Sets.intersection(deleteAcls(deleteClusterLinks), deleteTopics(deleteClusterLinks))).iterator();
            while (it.hasNext()) {
                this.tenantLifecycleState.put(it.next(), State.DELETED);
            }
        }
    }

    private Set<String> deleteTopics(Set<String> set) {
        Set hashSet = new HashSet();
        try {
            List list = (List) this.adminClient.listTopics(new ListTopicsOptions().timeoutMs((Integer) 30000)).names().get().stream().filter(str -> {
                return TenantContext.isTenantPrefixed(str) && set.contains(TenantContext.extractTenant(str));
            }).collect(Collectors.toList());
            Set set2 = (Set) list.stream().map(str2 -> {
                return TenantContext.extractTenant(str2);
            }).collect(Collectors.toSet());
            hashSet = Sets.difference(set, set2).immutableCopy();
            LOG.info("deleting topics {} because they belong to tenants {}", list, set2);
            this.deletionExecutor.execute(() -> {
                try {
                    for (List list2 : Lists.partition(list, this.deleteBatchSize)) {
                        try {
                            this.adminClient.deleteTopics(list2).all().get();
                            LOG.info("Successfully deleted topics {}", list2);
                        } catch (UnknownTopicOrPartitionException e) {
                        }
                    }
                } catch (InterruptedException e2) {
                    LOG.info("Deleting topics of deactivated tenants was interrupted");
                } catch (ExecutionException e3) {
                    LOG.error("Failed to delete topics for tenants {}. We'll try again next time", set, e3);
                }
            });
            return hashSet;
        } catch (Exception e) {
            LOG.error("Failed to get list of topics in cluster.", (Throwable) e);
            return hashSet;
        }
    }

    private Set<String> deleteTenantCellMetadata(Set<String> set) {
        if (!this.isCellsEnabled) {
            return set;
        }
        Set hashSet = new HashSet();
        try {
            Set set2 = (Set) ((List) this.adminClient.describeTenants(Collections.emptyList(), new DescribeTenantsOptions().timeoutMs(30000)).value().get()).stream().map((v0) -> {
                return v0.tenantId();
            }).collect(Collectors.toSet());
            hashSet = Sets.difference(set, set2).immutableCopy();
            LOG.info("deleting tenant cell metadata {} because they are empty", hashSet);
            Sets.SetView intersection = Sets.intersection(set, set2);
            this.deletionExecutor.execute(() -> {
                try {
                    for (List list : Lists.partition(new ArrayList(intersection), this.deleteBatchSize)) {
                        this.adminClient.deleteTenants(list).value().get();
                        LOG.info("Successfully deleted tenant cell metadata {}", list);
                    }
                } catch (InterruptedException e) {
                    LOG.info("Deleting tenant cell metadata of deactivated tenants was interrupted");
                } catch (ExecutionException e2) {
                    LOG.error("Failed to delete tenant cell metadata for tenants {}. We'll try again next time", intersection, e2.getCause());
                }
            });
            return hashSet;
        } catch (Exception e) {
            LOG.error("Failed to get list of tenants in cluster.", (Throwable) e);
            return hashSet;
        }
    }

    private Set<String> deleteClusterLinks(Set<String> set) {
        Set hashSet = new HashSet();
        try {
            List list = (List) ((Collection) this.adminClient.listClusterLinks(new ListClusterLinksOptions().includeTopics(false).timeoutMs(30000)).result().get()).stream().filter(clusterLinkListing -> {
                return TenantContext.isTenantPrefixed(clusterLinkListing.linkName()) && set.contains(TenantContext.extractTenant(clusterLinkListing.linkName()));
            }).map(clusterLinkListing2 -> {
                return clusterLinkListing2.linkName();
            }).collect(Collectors.toList());
            Set set2 = (Set) list.stream().map(str -> {
                return TenantContext.extractTenant(str);
            }).collect(Collectors.toSet());
            hashSet = Sets.difference(set, set2).immutableCopy();
            LOG.info("Deleting links {} because they belong to deleted tenants {}", list, set2);
            if (list.isEmpty()) {
                return hashSet;
            }
            this.deletionExecutor.execute(() -> {
                List<List> partition = Lists.partition(list, this.deleteBatchSize);
                DeleteClusterLinksOptions force = new DeleteClusterLinksOptions().force(true);
                try {
                    for (List list2 : partition) {
                        try {
                            this.adminClient.deleteClusterLinks(list2, force).all().get();
                            LOG.info("Successfully deleted links {}", list2);
                        } catch (ExecutionException e) {
                        }
                    }
                } catch (InterruptedException e2) {
                    LOG.info("Deleting links of deactivated tenants was interrupted");
                } catch (Exception e3) {
                    LOG.error("Failed to delete links for tenants {}. We'll try again next time", set, e3);
                }
            });
            return hashSet;
        } catch (ExecutionException e) {
            if (e.getCause() instanceof ClusterLinkDisabledException) {
                LOG.info("Skip deleting cluster links since cluster link is disabled");
                return set;
            }
            LOG.error("Failed to get list of cluster links in cluster.", (Throwable) e);
            return hashSet;
        } catch (Exception e2) {
            LOG.error("Failed to get list of cluster links in cluster.", (Throwable) e2);
            return hashSet;
        }
    }

    private Set<String> deleteAcls(Set<String> set) {
        HashSet hashSet = new HashSet();
        LinkedList linkedList = new LinkedList();
        DescribeAclsOptions timeoutMs = new DescribeAclsOptions().timeoutMs((Integer) 30000);
        for (String str : set) {
            AclBindingFilter aclBindingFilter = new AclBindingFilter(new ResourcePatternFilter(ResourceType.ANY, str + "_", PatternType.CONFLUENT_ALL_TENANT_ANY), new AccessControlEntryFilter(null, null, AclOperation.ANY, AclPermissionType.ANY));
            try {
                if (this.adminClient.describeAcls(aclBindingFilter, timeoutMs).values().get().isEmpty()) {
                    hashSet.add(str);
                } else {
                    linkedList.add(aclBindingFilter);
                }
            } catch (Exception e) {
                if (!(e.getCause() instanceof InvalidRequestException)) {
                    LOG.error("Failed to get ACLs for tenants {}. We'll try again next time", set, e);
                    return hashSet;
                }
                LOG.error("Failed to get ACLs for tenants {} because this operation isn't supporting on this physical cluster. We won't retry and will consider deletion of ACLs for all tenants in list complete.", set, e);
                hashSet.addAll(set);
                return hashSet;
            }
        }
        this.adminClient.deleteAcls(linkedList);
        return hashSet;
    }

    public void close() {
        if (this.deletionExecutor != null) {
            this.deletionExecutor.shutdownNow();
        }
        if (this.adminClient != null) {
            this.adminClient.close(Duration.ofMillis(CLOSE_TIMEOUT_MS.longValue()));
        }
    }

    private boolean shouldDeactivate(KafkaLogicalClusterMetadata kafkaLogicalClusterMetadata) {
        return kafkaLogicalClusterMetadata.lifecycleMetadata() != null && kafkaLogicalClusterMetadata.lifecycleMetadata().deletionDate() != null && kafkaLogicalClusterMetadata.lifecycleMetadata().deletionDate().getTime() < this.time.milliseconds() && this.tenantLifecycleState.getOrDefault(kafkaLogicalClusterMetadata.logicalClusterId(), State.ACTIVE) == State.ACTIVE;
    }

    private boolean shouldDelete(KafkaLogicalClusterMetadata kafkaLogicalClusterMetadata) {
        return (kafkaLogicalClusterMetadata.lifecycleMetadata() == null || kafkaLogicalClusterMetadata.lifecycleMetadata().deletionDate() == null || kafkaLogicalClusterMetadata.lifecycleMetadata().deletionDate().getTime() >= Long.valueOf(this.time.milliseconds() - this.deleteDelayMs.longValue()).longValue() || this.tenantLifecycleState.getOrDefault(kafkaLogicalClusterMetadata.logicalClusterId(), State.ACTIVE) == State.DELETED) ? false : true;
    }

    ExecutorService deletionExecutor() {
        return this.deletionExecutor;
    }

    public boolean updateMaxPartitionsIfNecessary(KafkaLogicalClusterMetadata kafkaLogicalClusterMetadata, KafkaLogicalClusterMetadata kafkaLogicalClusterMetadata2) {
        if (kafkaLogicalClusterMetadata == null) {
            return true;
        }
        if (kafkaLogicalClusterMetadata2 == null) {
            throw new IllegalArgumentException("newMeta cannot be null");
        }
        if (kafkaLogicalClusterMetadata2.isHealthcheckLogicalCluster()) {
            return true;
        }
        Integer maxPartitions = kafkaLogicalClusterMetadata.maxPartitions();
        Integer maxPartitions2 = kafkaLogicalClusterMetadata2.maxPartitions();
        if (maxPartitions2 == null || maxPartitions2.equals(maxPartitions)) {
            return true;
        }
        if (!this.adminClientCreated.get()) {
            LOG.warn("Failed to update max partitions per tenant from {} to {} because AdminClient was not created yet. We'll try again next time", maxPartitions, maxPartitions2);
            return false;
        }
        try {
            this.adminClient.incrementalAlterConfigs(Collections.singletonMap(new ConfigResource(ConfigResource.Type.BROKER, ""), Collections.singleton(new AlterConfigOp(new ConfigEntry(TopicPolicyConfig.MAX_PARTITIONS_PER_TENANT_CONFIG, maxPartitions2.toString()), AlterConfigOp.OpType.SET)))).all().get();
            LOG.info("Updated max partitions per tenant from {} to {}", maxPartitions, maxPartitions2);
            return true;
        } catch (Exception e) {
            LOG.error("Failed to update max partitions per tenant from {} to {}. We'll try again next time", maxPartitions, maxPartitions2, e);
            return false;
        }
    }
}
