package org.apache.kafka.controller;

import io.confluent.kafka.link.ClusterLinkConfig;
import io.confluent.kafka.link.ClusterLinkUtils;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.ClusterLinkDescription;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.CreateClusterLinksRequestData;
import org.apache.kafka.common.message.CreateClusterLinksResponseData;
import org.apache.kafka.common.message.DeleteClusterLinksRequestData;
import org.apache.kafka.common.message.DeleteClusterLinksResponseData;
import org.apache.kafka.common.metadata.ClusterLinkRecord;
import org.apache.kafka.common.metadata.RemoveClusterLinkRecord;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.metadata.ClusterLink;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.interceptor.ClusterLinkInterceptor;
import org.apache.kafka.server.mutable.BoundedList;
import org.apache.kafka.server.policy.CreateClusterLinkPolicy;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/kafka/controller/ClusterLinkControlManager.class */
public class ClusterLinkControlManager {
    private final ConfigurationControlManager configManager;
    private final MirrorTopicControlManager mirrorTopicControl;
    private final FeatureControlManager featureControl;
    private final Consumer<Uuid> topicUnlinker;
    private final Consumer<Uuid> aclUnlinker;
    private final Logger log;
    private final TimelineHashMap<String, Uuid> linkNameToId;
    private final TimelineHashMap<Uuid, ClusterLink> clusterLinks;
    private final String localClusterId;
    private final Optional<CreateClusterLinkPolicy> createClusterLinkPolicy;

    /* loaded from: input_file:org/apache/kafka/controller/ClusterLinkControlManager$ClusterLinkControlState.class */
    public static class ClusterLinkControlState {
        public final Set<Uuid> links;
        public final Optional<Set<Uuid>> policyLinks;

        ClusterLinkControlState(Set<Uuid> set, Optional<Set<Uuid>> optional) {
            this.links = set;
            this.policyLinks = optional;
        }
    }

    public ClusterLinkControlManager(SnapshotRegistry snapshotRegistry, LogContext logContext, ConfigurationControlManager configurationControlManager, MirrorTopicControlManager mirrorTopicControlManager, FeatureControlManager featureControlManager, Consumer<Uuid> consumer, Consumer<Uuid> consumer2, String str, Optional<CreateClusterLinkPolicy> optional) {
        this.linkNameToId = new TimelineHashMap<>(snapshotRegistry, 0);
        this.clusterLinks = new TimelineHashMap<>(snapshotRegistry, 0);
        this.log = logContext.logger(ClusterLinkControlManager.class);
        this.configManager = configurationControlManager;
        this.mirrorTopicControl = mirrorTopicControlManager;
        this.featureControl = featureControlManager;
        this.topicUnlinker = consumer;
        this.aclUnlinker = consumer2;
        this.localClusterId = str;
        this.createClusterLinkPolicy = optional;
    }

    public ControllerResult<CreateClusterLinksResponseData> createClusterLinks(CreateClusterLinksRequestData createClusterLinksRequestData, KafkaPrincipal kafkaPrincipal) {
        BoundedList newArrayBacked = BoundedList.newArrayBacked(10000);
        CreateClusterLinksResponseData createClusterLinksResponseData = new CreateClusterLinksResponseData();
        createClusterLinksResponseData.setEntries(new ArrayList());
        createClusterLinksRequestData.entries().forEach(entryData -> {
            ResultOrError<Uuid> of;
            try {
                newArrayBacked.getClass();
                of = createClusterLink(entryData, (v1) -> {
                    r2.add(v1);
                }, kafkaPrincipal);
            } catch (Throwable th) {
                of = ResultOrError.of(ApiError.fromThrowable(th));
            }
            if (of.isResult()) {
                createClusterLinksResponseData.entries().add(new CreateClusterLinksResponseData.EntryData().setLinkName(entryData.linkName()).setLinkId(of.result()));
            } else {
                createClusterLinksResponseData.entries().add(new CreateClusterLinksResponseData.EntryData().setLinkName(entryData.linkName()).setLinkId(Uuid.ZERO_UUID).setErrorCode(of.error().error().code()).setErrorMessage(of.error().message()));
            }
        });
        return createClusterLinksRequestData.validateOnly() ? ControllerResult.of(Collections.emptyList(), createClusterLinksResponseData) : ControllerResult.atomicOf(newArrayBacked, createClusterLinksResponseData);
    }

    public ControllerResult<DeleteClusterLinksResponseData> deleteClusterLinks(DeleteClusterLinksRequestData deleteClusterLinksRequestData) {
        BoundedList newArrayBacked = BoundedList.newArrayBacked(10000);
        DeleteClusterLinksResponseData deleteClusterLinksResponseData = new DeleteClusterLinksResponseData();
        deleteClusterLinksResponseData.setEntries(new ArrayList());
        deleteClusterLinksRequestData.linkNames().forEach(str -> {
            newArrayBacked.getClass();
            ApiError deleteClusterLink = deleteClusterLink(str, (v1) -> {
                r2.add(v1);
            }, deleteClusterLinksRequestData.force());
            deleteClusterLinksResponseData.entries().add(new DeleteClusterLinksResponseData.EntryData().setLinkName(str).setErrorCode(deleteClusterLink.error().code()).setErrorMessage(deleteClusterLink.message()));
        });
        return deleteClusterLinksRequestData.validateOnly() ? ControllerResult.atomicOf(Collections.emptyList(), deleteClusterLinksResponseData) : ControllerResult.atomicOf(newArrayBacked, deleteClusterLinksResponseData);
    }

    public Optional<Uuid> getClusterLinkId(String str) {
        return Optional.ofNullable(this.linkNameToId.get(str));
    }

    public Optional<ClusterLink> getClusterLink(String str) {
        try {
            return Optional.ofNullable(this.clusterLinks.get(Uuid.fromString(str)));
        } catch (IllegalArgumentException e) {
            return Optional.empty();
        }
    }

    ResultOrError<Uuid> createClusterLink(CreateClusterLinksRequestData.EntryData entryData, Consumer<ApiMessageAndVersion> consumer, KafkaPrincipal kafkaPrincipal) {
        String linkName = entryData.linkName();
        Map<String, String> configMap = toConfigMap(entryData.configs());
        ApiError validateLinkName = validateLinkName(linkName);
        if (validateLinkName != ApiError.NONE) {
            return ResultOrError.of(validateLinkName);
        }
        if (this.linkNameToId.containsKey(linkName)) {
            return ResultOrError.of(new ApiError(Errors.CLUSTER_LINK_EXISTS, "Cluster Link " + linkName + " already exists."));
        }
        if (entryData.clusterId() == null) {
            return ResultOrError.of(new ApiError(Errors.INVALID_REQUEST, "Source cluster ID cannot be null."));
        }
        if (this.localClusterId.equals(entryData.clusterId())) {
            return ResultOrError.of(new ApiError(Errors.INVALID_REQUEST, "Source cluster ID matches local cluster ID " + this.localClusterId + ". Cannot create cluster link to self."));
        }
        ClusterLinkConfig.LinkMode linkMode = getLinkMode(configMap);
        Uuid validateAndGetLinkIdForClusterLink = validateAndGetLinkIdForClusterLink(entryData, linkMode, configMap);
        if (this.clusterLinks.containsKey(validateAndGetLinkIdForClusterLink)) {
            return ResultOrError.of(new ApiError(Errors.CLUSTER_LINK_EXISTS, "Cluster Link " + validateAndGetLinkIdForClusterLink + " already exists."));
        }
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.CLUSTER_LINK, validateAndGetLinkIdForClusterLink.toString());
        ControllerResult<Map<ConfigResource, ApiError>> incrementalAlterConfigs = this.configManager.incrementalAlterConfigs(Collections.singletonMap(configResource, toAlterConfigs(entryData.configs())), true, kafkaPrincipal);
        ApiError apiError = incrementalAlterConfigs.response().get(configResource);
        if (apiError != ApiError.NONE) {
            return ResultOrError.of(apiError);
        }
        validateClusterLinkPolicy(entryData.tenantPrefix(), linkMode.name(), configMap);
        ClusterLinkRecord tenantPrefix = new ClusterLinkRecord().setClusterLinkName(entryData.linkName()).setClusterLinkId(validateAndGetLinkIdForClusterLink).setRemoteClusterId(entryData.clusterId()).setTenantPrefix(entryData.tenantPrefix());
        if (!this.featureControl.metadataVersion().isClusterLinkModeSupported() && linkMode != ClusterLinkConfig.LinkMode.DESTINATION) {
            throw new UnsupportedVersionException("Attempted to write non-default link mode at unsupported version.");
        }
        consumer.accept(new ApiMessageAndVersion(tenantPrefix.setLinkMode(linkMode.toString()), this.featureControl.metadataVersion().clusterLinkRecordVersion()));
        incrementalAlterConfigs.records().forEach(consumer);
        return ResultOrError.of(validateAndGetLinkIdForClusterLink);
    }

    private void validateClusterLinkPolicy(String str, String str2, Map<String, String> map) {
        Optional filter = Optional.ofNullable(str).filter(str3 -> {
            return !str3.isEmpty();
        });
        this.createClusterLinkPolicy.ifPresent(createClusterLinkPolicy -> {
            if (!filter.isPresent()) {
                throw new InvalidRequestException("Tenant prefix cannot be null if a CreateClusterLinkPolicy is set.");
            }
            this.createClusterLinkPolicy.get().validate(filter, str2, map);
        });
    }

    ApiError deleteClusterLink(String str, Consumer<ApiMessageAndVersion> consumer, boolean z) {
        ApiError validateLinkName = validateLinkName(str);
        if (validateLinkName != ApiError.NONE) {
            return validateLinkName;
        }
        Optional<Uuid> clusterLinkId = getClusterLinkId(str);
        if (!clusterLinkId.isPresent()) {
            return new ApiError(Errors.CLUSTER_LINK_NOT_FOUND, "Cluster Link " + str + " does not exist.");
        }
        Set<String> set = this.mirrorTopicControl.topicsInUse(clusterLinkId.get());
        if (!set.isEmpty() && !z) {
            return new ApiError(Errors.CLUSTER_LINK_IN_USE, "Cluster link " + str + " with ID " + clusterLinkId.get() + " in use by topics: " + set);
        }
        consumer.accept(new ApiMessageAndVersion(new RemoveClusterLinkRecord().setClusterLinkName(str).setClusterLinkId(clusterLinkId.get()), (short) 0));
        return ApiError.NONE;
    }

    public void replay(ClusterLinkRecord clusterLinkRecord) {
        this.log.info("Created cluster link {} with ID {}", clusterLinkRecord.clusterLinkName(), clusterLinkRecord.clusterLinkId());
        this.linkNameToId.put(clusterLinkRecord.clusterLinkName(), clusterLinkRecord.clusterLinkId());
        this.clusterLinks.put(clusterLinkRecord.clusterLinkId(), new ClusterLink(clusterLinkRecord));
        this.createClusterLinkPolicy.filter(createClusterLinkPolicy -> {
            return createClusterLinkPolicy instanceof ClusterLinkInterceptor;
        }).ifPresent(createClusterLinkPolicy2 -> {
            ((ClusterLinkInterceptor) createClusterLinkPolicy2).linkAdded(clusterLinkRecord.clusterLinkId(), Optional.of(clusterLinkRecord.tenantPrefix()), clusterLinkRecord.linkMode());
        });
    }

    public void replay(RemoveClusterLinkRecord removeClusterLinkRecord) {
        ClusterLink remove = this.clusterLinks.remove(removeClusterLinkRecord.clusterLinkId());
        if (remove == null) {
            this.log.error("Trying to remove cluster link {} with ID {}, but we have no record of it.", removeClusterLinkRecord.clusterLinkName(), removeClusterLinkRecord.clusterLinkId());
            throw new IllegalStateException();
        }
        this.log.info("Deleting cluster link {} with ID {}", removeClusterLinkRecord.clusterLinkName(), removeClusterLinkRecord.clusterLinkId());
        String linkName = remove.linkName();
        Uuid linkId = remove.linkId();
        this.linkNameToId.remove(linkName);
        this.configManager.deleteClusterLinkConfigs(linkId.toString());
        this.mirrorTopicControl.topicIdsForClusterLinkId(linkId, false).forEach(this.topicUnlinker);
        this.mirrorTopicControl.unLinkMirrorTopics(linkId, linkName);
        this.createClusterLinkPolicy.filter(createClusterLinkPolicy -> {
            return createClusterLinkPolicy instanceof ClusterLinkInterceptor;
        }).ifPresent(createClusterLinkPolicy2 -> {
            ((ClusterLinkInterceptor) createClusterLinkPolicy2).linkDeleted(linkId);
        });
        this.aclUnlinker.accept(linkId);
    }

    public Iterator<List<ApiMessageAndVersion>> iterator(long j) {
        return this.clusterLinks.entrySet(j).stream().map(entry -> {
            return Collections.singletonList(new ApiMessageAndVersion(((ClusterLink) entry.getValue()).toRecord(), (short) 1));
        }).iterator();
    }

    static ApiError validateLinkName(String str) {
        try {
            ClusterLinkUtils.validateLinkNameOrThrow(str);
            return ApiError.NONE;
        } catch (Throwable th) {
            return ApiError.fromThrowable(th);
        }
    }

    private static Uuid validateAndGetLinkIdForClusterLink(CreateClusterLinksRequestData.EntryData entryData, ClusterLinkConfig.LinkMode linkMode, Map<String, String> map) {
        boolean z = !entryData.linkId().equals(Uuid.ZERO_UUID);
        switch (linkMode) {
            case SOURCE:
                if (!z) {
                    throw new InvalidRequestException("Cluster link id should be provided in source initiated cluster link.");
                }
                break;
            case DESTINATION:
                if (z) {
                    throw new InvalidRequestException("Unexpected cluster link id " + entryData.linkId() + ". Should not be provided for destination initiated cluster link.");
                }
                break;
            case BIDIRECTIONAL:
                String str = map.get(ClusterLinkConfig.CONNECTION_MODE_PROP);
                boolean z2 = str != null && str.equalsIgnoreCase(ClusterLinkDescription.ConnectionMode.INBOUND.name().toLowerCase(Locale.ROOT));
                if (z && z2) {
                    throw new InvalidRequestException("Unexpected cluster link id " + entryData.linkId() + ". Should not be provided for bi-directional cluster link with inbound connections.");
                }
                break;
            default:
                throw new InvalidRequestException("Unknown link mode " + linkMode);
        }
        return !entryData.linkId().equals(Uuid.ZERO_UUID) ? entryData.linkId() : Uuid.randomUuid();
    }

    static Map<String, Map.Entry<AlterConfigOp.OpType, String>> toAlterConfigs(List<CreateClusterLinksRequestData.ConfigData> list) {
        HashMap hashMap = new HashMap();
        list.forEach(configData -> {
        });
        return hashMap;
    }

    static Map<String, String> toConfigMap(List<CreateClusterLinksRequestData.ConfigData> list) {
        return (Map) list.stream().collect(Collectors.toMap((v0) -> {
            return v0.key();
        }, (v0) -> {
            return v0.value();
        }));
    }

    static ClusterLinkConfig.LinkMode getLinkMode(Map<String, String> map) {
        return ClusterLinkConfig.LinkMode.fromString(map.getOrDefault(ClusterLinkConfig.LINK_MODE_PROP, ClusterLinkConfig.LinkMode.DESTINATION.name()));
    }

    public ClusterLinkControlState getClusterLinkControlState() {
        return new ClusterLinkControlState(Collections.unmodifiableSet(this.clusterLinks.keySet()), this.createClusterLinkPolicy.filter(createClusterLinkPolicy -> {
            return createClusterLinkPolicy instanceof ClusterLinkInterceptor;
        }).map(createClusterLinkPolicy2 -> {
            return ((ClusterLinkInterceptor) createClusterLinkPolicy2).links();
        }));
    }

    public boolean isValidLinkId(Uuid uuid) {
        return this.clusterLinks.containsKey(uuid);
    }
}
