package io.confluent.kafka.server.plugins.policy;

import io.confluent.kafka.multitenant.MultiTenantConfigRestrictions;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import kafka.server.link.ClusterLinkConfig;
import kafka.server.link.ClusterLinkUtils;
import kafka.server.link.ConnectionMode;
import kafka.server.link.ConnectionMode$Inbound$;
import org.apache.kafka.clients.ClientDnsLookup;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.errors.PolicyViolationException;
import org.apache.kafka.common.security.JaasContext;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule;
import org.apache.kafka.common.security.plain.PlainLoginModule;
import org.apache.kafka.common.security.scram.ScramLoginModule;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.config.ReplicationConfigs;

/* loaded from: input_file:io/confluent/kafka/server/plugins/policy/ClusterLinkPolicyConfig.class */
public class ClusterLinkPolicyConfig extends AbstractPolicyConfig {
    public static final String BASE_PREFIX = "confluent.plugins.";
    public static final int DEFAULT_ACL_SYNC_MS_MIN = 1000;
    public static final int DEFAULT_ACL_SYNC_MS_MAX = 300000;
    public static final int DEFAULT_CONSUMER_OFFSET_SYNC_MS_MIN = 1000;
    public static final int DEFAULT_CONSUMER_OFFSET_SYNC_MS_MAX = 300000;
    public static final int DEFAULT_TOPIC_CONFIG_SYNC_MS_MIN = 1000;
    public static final int DEFAULT_TOPIC_CONFIG_SYNC_MS_MAX = 300000;
    public static final int DEFAULT_REPLICA_SOCKET_RECEIVE_BUFFER_BYTES_MIN = 32768;
    public static final int DEFAULT_REPLICA_SOCKET_RECEIVE_BUFFER_BYTES_MAX = 1048576;
    public static final int DEFAULT_AVAILABILITY_CHECK_MS_MIN = 3000;
    public static final int DEFAULT_AVAILABILITY_CHECK_MS_MAX = 60000;
    private final int maxMessageSize;
    private final Set<String> allowedSaslMechanisms;
    private final Set<String> allowedSaslLoginModules;
    private final Set<String> allowedSaslLoginCallbackHandlers;
    public static final String CLUSTER_LINK_PREFIX = "confluent.plugins.link.policy.";
    public static final String ACL_SYNC_MS_MIN_CONFIG = CLUSTER_LINK_PREFIX + ClusterLinkConfig.AclSyncMsProp() + ".min";
    public static final String DOC_MIN_PREFIX = "The minimum allowed value for the ";
    public static final String DOC_SUFFIX = " cluster link config property.";
    protected static final String ACL_SYNC_MS_MIN_CONFIG_DOC = DOC_MIN_PREFIX + ClusterLinkConfig.AclSyncMsProp() + DOC_SUFFIX;
    public static final String ACL_SYNC_MS_MAX_CONFIG = CLUSTER_LINK_PREFIX + ClusterLinkConfig.AclSyncMsProp() + ".max";
    public static final String DOC_MAX_PREFIX = "The maximum allowed value for the ";
    protected static final String ACL_SYNC_MS_MAX_CONFIG_DOC = DOC_MAX_PREFIX + ClusterLinkConfig.AclSyncMsProp() + DOC_SUFFIX;
    public static final String CONSUMER_OFFSET_SYNC_MS_MIN_CONFIG = CLUSTER_LINK_PREFIX + ClusterLinkConfig.ConsumerOffsetSyncMsProp() + ".min";
    protected static final String CONSUMER_OFFSET_SYNC_MS_MIN_CONFIG_DOC = DOC_MIN_PREFIX + ClusterLinkConfig.ConsumerOffsetSyncMsProp() + DOC_SUFFIX;
    public static final String CONSUMER_OFFSET_SYNC_MS_MAX_CONFIG = CLUSTER_LINK_PREFIX + ClusterLinkConfig.ConsumerOffsetSyncMsProp() + ".max";
    protected static final String CONSUMER_OFFSET_SYNC_MS_MAX_CONFIG_DOC = DOC_MAX_PREFIX + ClusterLinkConfig.ConsumerOffsetSyncMsProp() + DOC_SUFFIX;
    public static final String TOPIC_CONFIG_SYNC_MS_MIN_CONFIG = CLUSTER_LINK_PREFIX + ClusterLinkConfig.TopicConfigSyncMsProp() + ".min";
    protected static final String TOPIC_CONFIG_SYNC_MS_MIN_CONFIG_DOC = DOC_MIN_PREFIX + ClusterLinkConfig.TopicConfigSyncMsProp() + DOC_SUFFIX;
    public static final String TOPIC_CONFIG_SYNC_MS_MAX_CONFIG = CLUSTER_LINK_PREFIX + ClusterLinkConfig.TopicConfigSyncMsProp() + ".max";
    protected static final String TOPIC_CONFIG_SYNC_MS_MAX_CONFIG_DOC = DOC_MAX_PREFIX + ClusterLinkConfig.TopicConfigSyncMsProp() + DOC_SUFFIX;
    public static final List<String> DEFAULT_SASL_MECHANISMS_ALLOWED = Arrays.asList("PLAIN", "SCRAM-SHA-256", "SCRAM-SHA-512", OAuthBearerLoginModule.OAUTHBEARER_MECHANISM);
    public static final List<String> DEFAULT_SASL_LOGIN_MODULES_ALLOWED = Arrays.asList(PlainLoginModule.class.getName(), ScramLoginModule.class.getName(), OAuthBearerLoginModule.class.getName());
    public static final List<String> DEFAULT_SASL_LOGIN_CALLBACK_HANDLER_ALLOWED = Collections.singletonList(OAuthBearerLoginCallbackHandler.class.getName());
    public static final String AVAILABILITY_CHECK_MS_MIN_CONFIG = CLUSTER_LINK_PREFIX + ClusterLinkConfig.AvailabilityCheckMsProp() + ".min";
    public static final String AVAILABILITY_CHECK_MS_MIN_DOC = DOC_MIN_PREFIX + ClusterLinkConfig.AvailabilityCheckMsProp() + DOC_SUFFIX;
    public static final String AVAILABILITY_CHECK_MS_MAX_CONFIG = CLUSTER_LINK_PREFIX + ClusterLinkConfig.AvailabilityCheckMsProp() + ".max";
    public static final String AVAILABILITY_CHECK_MS_MAX_DOC = DOC_MAX_PREFIX + ClusterLinkConfig.AvailabilityCheckMsProp() + DOC_SUFFIX;
    private static final Set<String> ALLOWED_CCLOUD_TO_CCLOUD_SECURITY_PROTOCOLS = Utils.mkSet(SecurityProtocol.SASL_SSL.name);
    public static final String SASL_MECHANISMS_ALLOWED_CONFIG = "confluent.plugins.link.policy.sasl.mechanism.allowed";
    protected static final String SASL_MECHANISMS_ALLOWED_CONFIG_DOC = "The allowed values for the sasl.mechanism cluster link config property.";
    public static final String SASL_LOGIN_MODULES_ALLOWED_CONFIG = "confluent.plugins.link.policy.sasl.login.module.allowed";
    protected static final String SASL_LOGIN_MODULES_ALLOWED_CONFIG_DOC = "The allowed values for the login module specified in sasl.jaas.config cluster link config property.";
    public static final String SASL_LOGIN_CALLBACK_HANDLER_ALLOWED_CONFIG = "confluent.plugins.link.policy.sasl.login.callback.handler.class.allowed";
    protected static final String SASL_LOGIN_CALLBACK_HANDLER_ALLOWED_CONFIG_DOC = "The allowed values for the login callback handler specified in sasl.login.callback.handler.class cluster link config property.";
    public static final String REPLICA_SOCKET_RECEIVE_BUFFER_BYTES_MIN_CONFIG = "confluent.plugins.link.policy.replica.socket.receive.buffer.bytes.min";
    protected static final String REPLICA_SOCKET_RECEIVE_BUFFER_BYTES_MIN_DOC = "The minimum allowed value for the replica.socket.receive.buffer.bytes cluster link config property.";
    public static final String REPLICA_SOCKET_RECEIVE_BUFFER_BYTES_MAX_CONFIG = "confluent.plugins.link.policy.replica.socket.receive.buffer.bytes.max";
    protected static final String REPLICA_SOCKET_RECEIVE_BUFFER_BYTES_MAX_DOC = "The maximum allowed value for the replica.socket.receive.buffer.bytes cluster link config property.";
    public static final String MAX_DEST_LINKS_PER_TENANT_CONFIG = "confluent.plugins.cluster.link.policy.max.destination.links.per.tenant";
    public static final String MAX_SOURCE_LINKS_PER_TENANT_CONFIG = "confluent.plugins.cluster.link.policy.max.source.links.per.tenant";
    private static final ConfigDef CONFIG = new ConfigDef().define(ACL_SYNC_MS_MIN_CONFIG, ConfigDef.Type.INT, 1000, ConfigDef.Importance.LOW, ACL_SYNC_MS_MIN_CONFIG_DOC).define(ACL_SYNC_MS_MAX_CONFIG, ConfigDef.Type.INT, 300000, ConfigDef.Importance.LOW, ACL_SYNC_MS_MAX_CONFIG_DOC).define(CONSUMER_OFFSET_SYNC_MS_MIN_CONFIG, ConfigDef.Type.INT, 1000, ConfigDef.Importance.LOW, CONSUMER_OFFSET_SYNC_MS_MIN_CONFIG_DOC).define(CONSUMER_OFFSET_SYNC_MS_MAX_CONFIG, ConfigDef.Type.INT, 300000, ConfigDef.Importance.LOW, CONSUMER_OFFSET_SYNC_MS_MAX_CONFIG_DOC).define(TOPIC_CONFIG_SYNC_MS_MIN_CONFIG, ConfigDef.Type.INT, 1000, ConfigDef.Importance.LOW, TOPIC_CONFIG_SYNC_MS_MIN_CONFIG_DOC).define(TOPIC_CONFIG_SYNC_MS_MAX_CONFIG, ConfigDef.Type.INT, 300000, ConfigDef.Importance.LOW, TOPIC_CONFIG_SYNC_MS_MAX_CONFIG_DOC).define(SASL_MECHANISMS_ALLOWED_CONFIG, ConfigDef.Type.LIST, DEFAULT_SASL_MECHANISMS_ALLOWED, ConfigDef.Importance.MEDIUM, SASL_MECHANISMS_ALLOWED_CONFIG_DOC).define(SASL_LOGIN_MODULES_ALLOWED_CONFIG, ConfigDef.Type.LIST, DEFAULT_SASL_LOGIN_MODULES_ALLOWED, ConfigDef.Importance.MEDIUM, SASL_LOGIN_MODULES_ALLOWED_CONFIG_DOC).define(SASL_LOGIN_CALLBACK_HANDLER_ALLOWED_CONFIG, ConfigDef.Type.LIST, DEFAULT_SASL_LOGIN_CALLBACK_HANDLER_ALLOWED, ConfigDef.Importance.MEDIUM, SASL_LOGIN_CALLBACK_HANDLER_ALLOWED_CONFIG_DOC).define(REPLICA_SOCKET_RECEIVE_BUFFER_BYTES_MIN_CONFIG, ConfigDef.Type.INT, 32768, ConfigDef.Importance.LOW, REPLICA_SOCKET_RECEIVE_BUFFER_BYTES_MIN_DOC).define(REPLICA_SOCKET_RECEIVE_BUFFER_BYTES_MAX_CONFIG, ConfigDef.Type.INT, 1048576, ConfigDef.Importance.LOW, REPLICA_SOCKET_RECEIVE_BUFFER_BYTES_MAX_DOC).define(AVAILABILITY_CHECK_MS_MIN_CONFIG, ConfigDef.Type.INT, 3000, ConfigDef.Importance.LOW, AVAILABILITY_CHECK_MS_MIN_DOC).define(AVAILABILITY_CHECK_MS_MAX_CONFIG, ConfigDef.Type.INT, 60000, ConfigDef.Importance.LOW, AVAILABILITY_CHECK_MS_MAX_DOC).define(MAX_DEST_LINKS_PER_TENANT_CONFIG, ConfigDef.Type.INT, 10, ConfigDef.Importance.LOW, "The maximum destination cluster links per tenant.").define(MAX_SOURCE_LINKS_PER_TENANT_CONFIG, ConfigDef.Type.INT, 10, ConfigDef.Importance.LOW, "The maximum source cluster links per tenant. This limit is only applicable to source initiated links.").define(TopicPolicyConfig.MAX_MESSAGE_BYTES_MAX_CONFIG, ConfigDef.Type.INT, Integer.valueOf(ClusterLinkConfig.MaxMessageSizeUnlimited()), ConfigDef.Importance.LOW, "The maximum allowed value for the max.message.bytes topic config property.");

    public ClusterLinkPolicyConfig(Map<String, ?> map) {
        super(CONFIG, map);
        this.allowedSaslMechanisms = new HashSet();
        this.allowedSaslLoginModules = new HashSet();
        this.allowedSaslLoginCallbackHandlers = new HashSet();
        this.maxMessageSize = getInt(TopicPolicyConfig.MAX_MESSAGE_BYTES_MAX_CONFIG).intValue();
        this.allowedSaslMechanisms.addAll(getList(SASL_MECHANISMS_ALLOWED_CONFIG));
        this.allowedSaslLoginModules.addAll(getList(SASL_LOGIN_MODULES_ALLOWED_CONFIG));
        this.allowedSaslLoginCallbackHandlers.addAll(getList(SASL_LOGIN_CALLBACK_HANDLER_ALLOWED_CONFIG));
    }

    public static void main(String[] strArr) {
        System.out.println(CONFIG.toRst());
    }

    private void validateConfigsAreInRange(Map<String, String> map) {
        checkPolicyMin(map, ACL_SYNC_MS_MIN_CONFIG, ClusterLinkConfig.AclSyncMsProp());
        checkPolicyMax(map, ACL_SYNC_MS_MAX_CONFIG, ClusterLinkConfig.AclSyncMsProp());
        checkPolicyMin(map, CONSUMER_OFFSET_SYNC_MS_MIN_CONFIG, ClusterLinkConfig.ConsumerOffsetSyncMsProp());
        checkPolicyMax(map, CONSUMER_OFFSET_SYNC_MS_MAX_CONFIG, ClusterLinkConfig.ConsumerOffsetSyncMsProp());
        checkPolicyMin(map, TOPIC_CONFIG_SYNC_MS_MIN_CONFIG, ClusterLinkConfig.TopicConfigSyncMsProp());
        checkPolicyMax(map, TOPIC_CONFIG_SYNC_MS_MAX_CONFIG, ClusterLinkConfig.TopicConfigSyncMsProp());
        checkPolicyMin(map, REPLICA_SOCKET_RECEIVE_BUFFER_BYTES_MIN_CONFIG, ReplicationConfigs.REPLICA_SOCKET_RECEIVE_BUFFER_BYTES_CONFIG);
        checkPolicyMax(map, REPLICA_SOCKET_RECEIVE_BUFFER_BYTES_MAX_CONFIG, ReplicationConfigs.REPLICA_SOCKET_RECEIVE_BUFFER_BYTES_CONFIG);
        checkPolicyMin(map, AVAILABILITY_CHECK_MS_MIN_CONFIG, ClusterLinkConfig.AvailabilityCheckMsProp());
        checkPolicyMax(map, AVAILABILITY_CHECK_MS_MAX_CONFIG, ClusterLinkConfig.AvailabilityCheckMsProp());
        List<String> parseList = parseList(map, ClusterLinkConfig.TopicConfigSyncIncludeProp());
        if (parseList != null) {
            Set set = (Set) parseList.stream().filter(str -> {
                return !MultiTenantConfigRestrictions.updatableTopicConfig(str, false);
            }).collect(Collectors.toSet());
            if (!set.isEmpty()) {
                throw new PolicyViolationException(ClusterLinkConfig.TopicConfigSyncIncludeProp() + " should not include " + set);
            }
        }
    }

    public void validateClusterLinkConfigs(Map<String, String> map) {
        if (map == null) {
            return;
        }
        PolicyUtils.validateConfigsAreUpdatable(map, str -> {
            return MultiTenantConfigRestrictions.UPDATABLE_CLUSTER_LINK_CONFIGS.contains(str) || MultiTenantConfigRestrictions.UPDATABLE_CLUSTER_LINK_INTERNAL_CONFIGS.contains(str);
        });
        validateConfigsAreInRange(map);
        validateSecurityConfigs(map);
        validateBootstrap(map);
        validateMaxMessageSize(map);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void validateMaxMessageSize(Map<String, String> map) {
        String str = map.get(ClusterLinkConfig.MaxMessageSizeProp());
        if (str != null && Integer.parseInt(str) > this.maxMessageSize) {
            throw new PolicyViolationException(String.format("Cluster link cannot be configured with %s=%s because the maximum allowed message size is %d", ClusterLinkConfig.MaxMessageSizeProp(), str, Integer.valueOf(this.maxMessageSize)));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void validateSecurityConfigs(Map<String, String> map) {
        if (usesInboundConnections(map)) {
            return;
        }
        validateSecurityProtocol(map);
        String str = map.get(SaslConfigs.SASL_MECHANISM);
        if (str != null && !this.allowedSaslMechanisms.contains(str.toUpperCase(Locale.ROOT))) {
            throw new PolicyViolationException("sasl.mechanism=" + str + " must be one of: " + this.allowedSaslMechanisms);
        }
        validateSaslJaasConfig(map.get(SaslConfigs.SASL_JAAS_CONFIG));
        String str2 = map.get(SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS);
        if (str2 != null && !this.allowedSaslLoginCallbackHandlers.contains(str2)) {
            throw new PolicyViolationException("sasl.login.callback.handler.class=" + str2 + ", can only be set to " + OAuthBearerLoginCallbackHandler.class);
        }
        validateOAuthTokenEndpoint(map.get(SaslConfigs.SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL));
    }

    private void validateSecurityProtocol(Map<String, String> map) {
        String str = map.get("security.protocol");
        ConnectionMode clusterLinkConnectionMode = ClusterLinkConfig.clusterLinkConnectionMode(map);
        String str2 = map.get("bootstrap.servers");
        if (str2 == null || str == null || clusterLinkConnectionMode == null) {
            return;
        }
        List list = (List) ConfigDef.parseType("bootstrap.servers", str2, ConfigDef.Type.LIST);
        if (!ALLOWED_CCLOUD_TO_CCLOUD_SECURITY_PROTOCOLS.contains(str.toUpperCase(Locale.ROOT)) && ClusterLinkUtils.isOutboundBootstrapCCloudHost(values(), clusterLinkConnectionMode, list)) {
            throw new PolicyViolationException(String.format("Invalid security protocol %s for a Confluent Cloud to Confluent Cloud link, it must be SASL_SSL.", str));
        }
    }

    private void validateSaslJaasConfig(String str) {
        if (str == null || str.trim().isEmpty()) {
            return;
        }
        try {
            JaasContext.loadClientContext(Collections.singletonMap(SaslConfigs.SASL_JAAS_CONFIG, new Password(str))).configurationEntries().forEach(appConfigurationEntry -> {
                String loginModuleName = appConfigurationEntry.getLoginModuleName();
                if (loginModuleName != null && !this.allowedSaslLoginModules.contains(loginModuleName)) {
                    throw new PolicyViolationException("sasl.jaas.config contains unsupported login moodule '" + loginModuleName + "', must be one of: " + this.allowedSaslLoginModules);
                }
            });
        } catch (Exception e) {
            throw new InvalidConfigurationException("sasl.jaas.config could not be loaded.");
        }
    }

    private void validateOAuthTokenEndpoint(String str) {
        if (str != null) {
            try {
                if (!str.isEmpty()) {
                    String lowerCase = new URL(str).getProtocol().toLowerCase(Locale.ROOT);
                    if (!lowerCase.equals("http") && !lowerCase.equals("https")) {
                        throw new PolicyViolationException("sasl.oauthbearer.token.endpoint.url=" + str + " contains unsupported protocol '" + lowerCase + "', only http and https are supported.");
                    }
                }
            } catch (MalformedURLException e) {
                throw new InvalidConfigurationException("sasl.oauthbearer.token.endpoint.url=" + str + " contains malformed URL : " + e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void validateBootstrap(Map<String, String> map) {
        String str = map.get("bootstrap.servers");
        String str2 = map.get("client.dns.lookup");
        if (str == null || usesInboundConnections(map)) {
            return;
        }
        ClientDnsLookup forConfig = str2 == null ? ClientDnsLookup.USE_ALL_DNS_IPS : ClientDnsLookup.forConfig(str2);
        List<String> list = (List) ConfigDef.parseType("bootstrap.servers", str, ConfigDef.Type.LIST);
        for (String str3 : list) {
            if (str3 != null && !str3.isEmpty() && ClusterLinkUtils.isCCloudIntranetHost(map, Utils.getHost(str3))) {
                throw new PolicyViolationException("Invalid bootstrap addresses or ports that cannot be used for cluster links on Confluent Cloud: " + str3);
            }
        }
        ArrayList arrayList = new ArrayList();
        for (InetSocketAddress inetSocketAddress : ClientUtils.parseAndValidateAddresses((List<String>) list, forConfig)) {
            if (ClusterLinkUtils.isInternalNetworkOrPort(inetSocketAddress)) {
                arrayList.add(inetSocketAddress);
            }
        }
        if (!arrayList.isEmpty()) {
            throw new PolicyViolationException("Invalid bootstrap addresses or ports that cannot be used for cluster links on Confluent Cloud: " + arrayList);
        }
    }

    private static boolean usesInboundConnections(Map<String, String> map) {
        return ClusterLinkConfig.clusterLinkConnectionMode(map) == ConnectionMode$Inbound$.MODULE$;
    }
}
