package io.confluent.kafka.server.plugins.auth;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.confluent.kafka.multitenant.utils.AuthUtils;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.CumulativeSum;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.apache.kafka.server.authorizer.AuthorizerServerInfo;
import org.apache.kafka.server.multitenant.UserMetadataStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/kafka/server/plugins/auth/DefaultUserMetaDataStore.class */
public class DefaultUserMetaDataStore implements UserMetadataStore {
    private static final String METRICS_GROUP = "user-metadata-metrics";
    public static final String USER_METADATA_EVENT_RATE = "user-metadata-event-rate";
    public static final String USER_METADATA_EVENT_FAILURE_RATE = "user-metadata-event-failure-rate";
    public static final String USER_METADATA_COUNT = "user-metadata-count";
    public static final String USER_METADATA_TOPIC_LOAD_TIME = "user-metadata-topic-load-time";
    private static final String USER_METADATA_EVENT_SENSOR = "user-metadata-event";
    private static final String USER_METADATA_EVENT_FAILURE_SENSOR = "user-metadata-event-failure";
    private static final String USER_METADATA_COUNT_SENSOR = "user-metadata-count";
    private static final String USER_METADATA_TOPIC_LOAD_TIME_SENSOR = "user-metadata-topic-load-time";
    protected static final Map<String, DefaultUserMetaDataStore> INSTANCES = new HashMap();
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) DefaultUserMetaDataStore.class);
    private final Time time;
    protected String sessionUuid;
    protected String topicName;
    protected List<String> multitenantListenerNames;
    private KafkaBasedLog<String, String> userMetadataLog;
    private final Map<String, ?> interBrokerClientConfigs;
    private final ObjectMapper objectMapper;
    private final Metrics metrics;
    private final Sensor userMetaDataEventSensor;
    private final Sensor userMetaDataEventFailureSensor;
    private final Sensor userMetaDataCountSensor;
    private final Sensor topicLoadTimeSensor;
    private final Map<String, Long> lastSequenceId;
    private final Map<String, UserOrgData> userResourceIdToUserId;
    private final Map<String, String> userIdToUserResourceId;
    final AtomicReference<State> state;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/kafka/server/plugins/auth/DefaultUserMetaDataStore$ConsumeCallback.class */
    public class ConsumeCallback implements Callback<ConsumerRecord<String, String>> {
        private ConsumeCallback() {
        }

        @Override // org.apache.kafka.connect.util.Callback
        public void onCompletion(Throwable th, ConsumerRecord<String, String> consumerRecord) {
            if (th == null) {
                DefaultUserMetaDataStore.this.read(consumerRecord);
            } else {
                DefaultUserMetaDataStore.this.userMetaDataEventFailureSensor.record();
                DefaultUserMetaDataStore.LOG.error("Unexpected error in consumer callback for UserMetadataStore: ", th);
            }
        }
    }

    /* loaded from: input_file:io/confluent/kafka/server/plugins/auth/DefaultUserMetaDataStore$State.class */
    public enum State {
        NOT_STARTED((byte) 0),
        STARTING((byte) 1),
        RUNNING((byte) 2),
        SHUTTING_DOWN((byte) 3),
        FAILED_TO_START((byte) 4),
        CLOSED((byte) 5);

        private final byte value;

        State(byte b) {
            this.value = b;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/kafka/server/plugins/auth/DefaultUserMetaDataStore$UserOrgData.class */
    public static class UserOrgData {
        private String userId;
        private Set<String> orgResourceIds = new HashSet();

        UserOrgData(String str) {
            this.userId = str;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean addOrg(String str) {
            return this.orgResourceIds.add(str);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean removeOrg(String str) {
            return this.orgResourceIds.remove(str);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean orgsIsEmpty() {
            return this.orgResourceIds.isEmpty();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public String userId() {
            return this.userId;
        }
    }

    public DefaultUserMetaDataStore(Map<String, ?> map, Metrics metrics) {
        this(map, metrics, Time.SYSTEM);
    }

    public DefaultUserMetaDataStore(Map<String, ?> map, Metrics metrics, Time time) {
        this.multitenantListenerNames = Collections.emptyList();
        this.state = new AtomicReference<>(State.NOT_STARTED);
        this.time = time;
        this.interBrokerClientConfigs = map;
        this.metrics = metrics;
        this.objectMapper = new ObjectMapper();
        this.lastSequenceId = new HashMap();
        this.userIdToUserResourceId = new ConcurrentHashMap();
        this.userResourceIdToUserId = new ConcurrentHashMap();
        this.userMetaDataEventSensor = metrics.sensor(USER_METADATA_EVENT_SENSOR);
        this.userMetaDataEventSensor.add(metrics.metricName(USER_METADATA_EVENT_RATE, METRICS_GROUP, "The event rate for user metadata topic"), new Rate());
        this.userMetaDataEventFailureSensor = metrics.sensor(USER_METADATA_EVENT_FAILURE_SENSOR);
        this.userMetaDataEventFailureSensor.add(metrics.metricName(USER_METADATA_EVENT_FAILURE_RATE, METRICS_GROUP, "The failure event rate for user metadata topic"), new Rate());
        this.userMetaDataCountSensor = metrics.sensor("user-metadata-count");
        this.userMetaDataCountSensor.add(metrics.metricName("user-metadata-count", METRICS_GROUP, "The number unique keys in user metadata topic."), new CumulativeSum());
        this.topicLoadTimeSensor = metrics.sensor("user-metadata-topic-load-time");
        this.topicLoadTimeSensor.add(metrics.metricName("user-metadata-topic-load-time", METRICS_GROUP, "The loading time for the user metadata topic."), new Max());
    }

    public Map<Endpoint, CompletableFuture<Void>> start(AuthorizerServerInfo authorizerServerInfo) {
        if (!this.state.compareAndSet(State.NOT_STARTED, State.STARTING)) {
            throw new IllegalStateException("Trying to start a log from a state it can't be started in");
        }
        try {
            LOG.info("Starting " + getClass().getSimpleName());
            CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> {
                startLog();
            });
            HashMap hashMap = new HashMap(authorizerServerInfo.endpoints().size());
            Collection collection = (Collection) authorizerServerInfo.earlyStartListeners().stream().map(str -> {
                return (String) Optional.of(str).map(ListenerName::normalised).map((v0) -> {
                    return v0.value();
                }).orElse("");
            }).collect(Collectors.toList());
            authorizerServerInfo.endpoints().forEach(endpoint -> {
                Optional map = endpoint.listenerName().map(ListenerName::normalised).map((v0) -> {
                    return v0.value();
                });
                List<String> list = this.multitenantListenerNames;
                list.getClass();
                if (!((Boolean) map.map((v1) -> {
                    return r1.contains(v1);
                }).orElse(false)).booleanValue()) {
                    collection.getClass();
                    if (!((Boolean) map.map((v1) -> {
                        return r1.contains(v1);
                    }).orElse(false)).booleanValue()) {
                        hashMap.put(endpoint, CompletableFuture.completedFuture(null));
                        LOG.info("Immediately resolve {} future for endpoint: {}", DefaultUserMetaDataStore.class.getSimpleName(), endpoint);
                        return;
                    }
                }
                hashMap.put(endpoint, runAsync);
                LOG.info("Wait for {} future to complete before enabling endpoint: {}", DefaultUserMetaDataStore.class.getSimpleName(), endpoint);
            });
            return hashMap;
        } catch (Exception e) {
            this.state.set(State.FAILED_TO_START);
            throw new IllegalStateException("Unable to create a future for startLog()", e);
        }
    }

    private void startLog() {
        if (!this.state.get().equals(State.STARTING)) {
            throw new IllegalStateException("Trying to start log from a non starting state");
        }
        try {
            long nanoseconds = this.time.nanoseconds();
            this.userMetadataLog.start();
            this.state.set(State.RUNNING);
            long nanoseconds2 = this.time.nanoseconds() - nanoseconds;
            this.topicLoadTimeSensor.record(nanoseconds2);
            LOG.info("Consumed initial set of user metadata from topic took {} nanoseconds", Long.valueOf(nanoseconds2));
        } catch (Exception e) {
            this.state.set(State.FAILED_TO_START);
            throw new IllegalStateException("Unable to start consuming user metadata from topic", e);
        }
    }

    public void close() {
        if (this.sessionUuid == null) {
            LOG.warn("close() called without configure() being called first");
            return;
        }
        this.metrics.removeSensor(this.userMetaDataEventSensor.name());
        this.metrics.removeSensor(this.userMetaDataEventFailureSensor.name());
        this.metrics.removeSensor(this.userMetaDataCountSensor.name());
        this.metrics.removeSensor(this.topicLoadTimeSensor.name());
        LOG.info("Closing consumer for session {}", this.sessionUuid);
        close(this.sessionUuid);
    }

    private void close(String str) {
        synchronized (INSTANCES) {
            if (INSTANCES.get(str) != this) {
                LOG.error(DefaultUserMetaDataStore.class.getSimpleName() + " closing instance that doesn't match the instance in the static map with the same broker session {}. Will not close this instance or remove it from the map", str);
                return;
            }
            INSTANCES.remove(str);
            LOG.info("Removed instance for broker session {}", str);
            if (this.userMetadataLog != null) {
                try {
                    this.userMetadataLog.stop();
                    LOG.info("Successfully closed the consumer");
                } catch (Exception e) {
                    LOG.error("Error when shutting down the consumer", (Throwable) e);
                }
            }
            this.state.set(State.CLOSED);
        }
    }

    public void configure(Map<String, ?> map) {
        LOG.info("Configuring " + getClass().getSimpleName());
        this.sessionUuid = AuthUtils.getBrokerSessionUuid(map);
        this.multitenantListenerNames = ConfluentConfigs.multitenantListenerNames(map, (ListenerName) null);
        if (addInstance(this.sessionUuid)) {
            this.userMetadataLog = configureConsumer(map);
        } else {
            LOG.info("Skipping configuring {} instance is already configured for broker session {}", getClass().getSimpleName(), this.sessionUuid);
        }
    }

    private boolean addInstance(String str) {
        synchronized (INSTANCES) {
            DefaultUserMetaDataStore defaultUserMetaDataStore = INSTANCES.get(str);
            if (defaultUserMetaDataStore == null) {
                INSTANCES.put(str, this);
                return true;
            }
            if (this != defaultUserMetaDataStore) {
                throw new IllegalStateException(getClass().getSimpleName() + " instance already exists for broker session " + str);
            }
            return false;
        }
    }

    public static DefaultUserMetaDataStore getInstance(String str) {
        DefaultUserMetaDataStore defaultUserMetaDataStore;
        synchronized (INSTANCES) {
            defaultUserMetaDataStore = INSTANCES.get(str);
        }
        return defaultUserMetaDataStore;
    }

    private KafkaBasedLog<String, String> configureConsumer(Map<String, ?> map) {
        State state = this.state.get();
        if (!state.equals(State.NOT_STARTED)) {
            throw new IllegalStateException(getClass().getSimpleName() + " configureConsumer called in a state it can't start in: " + state);
        }
        String str = (String) map.get("confluent.cdc.user.metadata.topic");
        Long l = (Long) map.get("confluent.cdc.api.keys.topic.load.timeout.ms");
        if (l == null || l.longValue() <= 0) {
            throw new ConfigException("Value for config confluent.cdc.api.keys.topic.load.timeout.ms must be positive integer when using user metadata topic");
        }
        String format = String.format("%s-%s-%s", str, ConfluentConfigs.ClientType.CONSUMER, map.get("broker.id"));
        HashSet hashSet = new HashSet(ConsumerConfig.configNames());
        hashSet.remove("metric.reporters");
        HashMap hashMap = new HashMap(this.interBrokerClientConfigs);
        hashMap.keySet().retainAll(hashSet);
        hashMap.put("client.id", format);
        hashMap.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, false);
        hashMap.put("bootstrap.servers", this.interBrokerClientConfigs.get("bootstrap.servers"));
        hashMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        hashMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        hashMap.put("default.api.timeout.ms", Integer.valueOf((int) Math.min(l.longValue(), 2147483647L)));
        hashMap.put("enable.metrics.push", false);
        return new KafkaBasedLog<>(str, null, hashMap, () -> {
            return null;
        }, new ConsumeCallback(), this.time, null, l.longValue());
    }

    protected void read(ConsumerRecord<String, String> consumerRecord) {
        this.userMetaDataEventSensor.record();
        if (consumerRecord.key() == null) {
            this.userMetaDataEventFailureSensor.record();
            LOG.error("Missing key in user metadata message! (partition = {}, offset = {}, timestamp = {})", Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset()), Long.valueOf(consumerRecord.timestamp()));
            return;
        }
        Long tryParseEventsSequenceId = AuthUtils.tryParseEventsSequenceId(consumerRecord);
        if (tryParseEventsSequenceId != null) {
            updateUserMetadata(consumerRecord, tryParseEventsSequenceId);
        } else {
            this.userMetaDataEventFailureSensor.record();
            LOG.error("Missing sequence ID in userId metadata message! (partition = {}, offset = {}, timestamp = {})", Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset()), Long.valueOf(consumerRecord.timestamp()));
        }
    }

    private synchronized void updateUserMetadata(ConsumerRecord<String, String> consumerRecord, Long l) {
        String key = consumerRecord.key();
        String value = consumerRecord.value();
        Long l2 = this.lastSequenceId.get(key);
        if (l2 != null && l2.longValue() >= l.longValue()) {
            LOG.warn("Ignoring older message for key {} with sequence id: {} (last seen id is {})", key, l, l2);
            return;
        }
        try {
            try {
                UserMetaDataKey userMetaDataKey = (UserMetaDataKey) this.objectMapper.readValue(key, UserMetaDataKey.class);
                if (value != null) {
                    updateUserResourceIdMap(userMetaDataKey, (UserMetaDataValue) this.objectMapper.readValue(value, UserMetaDataValue.class));
                    maybeLogUserMetadataUpdate(key, l, true);
                } else {
                    String userResourceId = userMetaDataKey.userResourceId();
                    String orgResourceId = userMetaDataKey.orgResourceId();
                    Optional<String> userResourceIdToUserId = userResourceIdToUserId(userResourceId);
                    if (userResourceIdToUserId.isPresent()) {
                        removeFromUserResourceIdMap(userResourceId, orgResourceId, userResourceIdToUserId.get());
                    }
                    maybeLogUserMetadataUpdate(key, l, true);
                }
                this.lastSequenceId.put(key, l);
            } catch (Exception e) {
                this.userMetaDataEventFailureSensor.record();
                LOG.error("Error handling message for user metadata key: {}, value: {}, sequence id: {}, partition: {}, timestamp: {}", key, value, l, Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.timestamp()), e);
                this.lastSequenceId.put(key, l);
            }
        } catch (Throwable th) {
            this.lastSequenceId.put(key, l);
            throw th;
        }
    }

    private void updateUserResourceIdMap(UserMetaDataKey userMetaDataKey, UserMetaDataValue userMetaDataValue) {
        if (!this.userResourceIdToUserId.containsKey(userMetaDataKey.userResourceId())) {
            this.userResourceIdToUserId.put(userMetaDataKey.userResourceId(), new UserOrgData(userMetaDataValue.userId()));
        }
        if (this.userResourceIdToUserId.get(userMetaDataKey.userResourceId()).addOrg(userMetaDataKey.orgResourceId())) {
            this.userMetaDataCountSensor.record(1.0d);
        }
        this.userIdToUserResourceId.put(userMetaDataValue.userId(), userMetaDataKey.userResourceId());
    }

    private void removeFromUserResourceIdMap(String str, String str2, String str3) {
        if (this.userResourceIdToUserId.get(str).removeOrg(str2)) {
            this.userMetaDataCountSensor.record(-1.0d);
        }
        if (this.userResourceIdToUserId.get(str).orgsIsEmpty()) {
            this.userIdToUserResourceId.remove(str3);
            this.userResourceIdToUserId.remove(str);
        }
    }

    public Optional<String> userIdToUserResourceId(String str) {
        return Optional.ofNullable(this.userIdToUserResourceId.get(str));
    }

    public Optional<String> userResourceIdToUserId(String str) {
        return this.userResourceIdToUserId.containsKey(str) ? Optional.of(this.userResourceIdToUserId.get(str).userId()) : Optional.empty();
    }

    private void maybeLogUserMetadataUpdate(String str, Long l, boolean z) {
        String str2 = z ? "Updating userId metadata for {} from topic (sequence id: {})" : "Read null value for key {}, deleting from user metadata store (sequence id: {})";
        if (State.RUNNING.equals(this.state.get())) {
            LOG.info(str2, str, l);
        } else {
            LOG.debug(str2, str, l);
        }
    }

    Map<String, Long> lastSequenceId() {
        return this.lastSequenceId;
    }

    void configure(KafkaBasedLog<String, String> kafkaBasedLog, String str, List<String> list) {
        this.multitenantListenerNames = list;
        this.userMetadataLog = kafkaBasedLog;
        this.sessionUuid = str;
        addInstance(str);
    }

    public int numOfMapping() {
        return this.userIdToUserResourceId.size();
    }

    public Metrics metrics() {
        return this.metrics;
    }
}
