package io.confluent.kafka.traffic;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Sets;
import io.confluent.kafka.multitenant.utils.AuthUtils;
import io.confluent.kafka.schemaregistry.utils.QualifiedSubject;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import kafka.server.BrokerSession;
import kafka.server.KafkaConfig;
import kafka.server.MetadataCache;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.network.PublicCredential;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.ThreadUtils;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.apache.kafka.server.traffic.TrafficNetworkIdRoutes;
import org.apache.kafka.server.traffic.TrafficNetworkIdRoutesStore;
import org.apache.kafka.server.traffic.TrafficNetworkIdRoutesUpdater;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/kafka/traffic/TopicBasedTrafficNetworkIdRoutesUpdater.class */
public class TopicBasedTrafficNetworkIdRoutesUpdater implements TrafficNetworkIdRoutesUpdater {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) TopicBasedTrafficNetworkIdRoutesUpdater.class);
    private static final List<String> SUPPORTED_SASL_MECHANISMS = Arrays.asList("PLAIN", OAuthBearerLoginModule.OAUTHBEARER_MECHANISM);
    public static final String METRICS_GROUP = "traffic-metrics";
    private final Sensor topicLoadTimeSensor;
    private List<String> routesListenerNames;
    final AtomicReference<State> state;
    private final Time time;
    private String sessionUuid;
    private String clusterNetworkId;
    private String networkIdRoutesTopic;
    private final Map<String, ?> interBrokerClientConfig;
    private final ObjectMapper objectMapper;
    protected final Metrics metrics;
    private final AtomicReference<Long> lastSequenceId;
    private KafkaBasedLog<String, String> networkIdRoutesLog;
    private final MetadataCache metadataCache;
    private ScheduledExecutorService executorService;
    private long periodicStartTaskMs;
    Future<?> storeStartTaskFuture;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/kafka/traffic/TopicBasedTrafficNetworkIdRoutesUpdater$ConsumeCallback.class */
    public class ConsumeCallback implements Callback<ConsumerRecord<String, String>> {
        private ConsumeCallback() {
        }

        @Override // org.apache.kafka.connect.util.Callback
        public void onCompletion(Throwable th, ConsumerRecord<String, String> consumerRecord) {
            if (th != null) {
                TopicBasedTrafficNetworkIdRoutesUpdater.LOG.error("Unexpected error in ConsumeCallback", th);
            } else {
                TopicBasedTrafficNetworkIdRoutesUpdater.this.consume(consumerRecord);
            }
        }
    }

    /* loaded from: input_file:io/confluent/kafka/traffic/TopicBasedTrafficNetworkIdRoutesUpdater$State.class */
    public enum State {
        NOT_STARTED,
        NOT_ENABLED,
        STARTING,
        RUNNING,
        CLOSED,
        FAILED_TO_START
    }

    public TopicBasedTrafficNetworkIdRoutesUpdater(MetadataCache metadataCache, Map<String, ?> map, Metrics metrics) {
        this(metadataCache, map, metrics, Time.SYSTEM);
    }

    public TopicBasedTrafficNetworkIdRoutesUpdater(MetadataCache metadataCache, Map<String, ?> map, Metrics metrics, Time time) {
        this.routesListenerNames = Collections.emptyList();
        this.state = new AtomicReference<>(State.NOT_STARTED);
        this.lastSequenceId = new AtomicReference<>();
        this.metadataCache = metadataCache;
        this.time = time;
        this.interBrokerClientConfig = map;
        this.objectMapper = new ObjectMapper();
        this.metrics = metrics;
        this.topicLoadTimeSensor = metrics.sensor("NetworkIdRoutesTopicLoadTime");
        this.topicLoadTimeSensor.add(metrics.metricName("network-id-routes-topic-load-time", METRICS_GROUP, "The loading time for the network ID routes topic."), new Max());
    }

    private boolean checkAndSetEnabledState(Map<String, ?> map) {
        if (!this.state.get().equals(State.NOT_STARTED)) {
            throw new IllegalStateException("checkAndSetEnabledState called in a non-starting state. Ignoring");
        }
        Boolean bool = (Boolean) map.get("confluent.traffic.cdc.network.id.routes.enable");
        if (bool != null && bool.booleanValue()) {
            return true;
        }
        LOG.info("Loading network ID routes from the sync pipelines is disabled in config {}", "confluent.traffic.cdc.network.id.routes.enable");
        this.state.set(State.NOT_ENABLED);
        return false;
    }

    void configureInternal(KafkaBasedLog<String, String> kafkaBasedLog, String str, String str2, String str3, List<String> list, long j) {
        LOG.warn("configure(KafkaBasedLog<>, ...) called, which should only happen in tests (ignore if this is one)");
        this.sessionUuid = str;
        this.clusterNetworkId = str2;
        this.networkIdRoutesTopic = str3;
        this.networkIdRoutesLog = kafkaBasedLog;
        this.routesListenerNames = list;
        this.periodicStartTaskMs = j;
        LOG.info("Configured an instance for broker session {}", str);
    }

    private String getBrokerNetworkId(Map<String, ?> map) {
        String str = (String) map.get("confluent.traffic.network.id");
        if (str == null) {
            throw new ConfigException("confluent.traffic.network.id is not set");
        }
        return str;
    }

    public void configure(Map<String, ?> map) {
        this.sessionUuid = AuthUtils.getBrokerSessionUuid(map);
        this.routesListenerNames = ConfluentConfigs.listenerNames("confluent.traffic.cdc.network.id.routes.listener.names", map, (ListenerName) null);
        LOG.info("Configured an instance for broker session {}", this.sessionUuid);
        if (checkAndSetEnabledState(map)) {
            this.clusterNetworkId = getBrokerNetworkId(map);
            this.networkIdRoutesTopic = getNetworkIdRoutesTopic(map);
            this.networkIdRoutesLog = configureConsumer(map, this.interBrokerClientConfig);
            this.periodicStartTaskMs = getPeriodicStartTaskMs(map).longValue();
        }
    }

    private KafkaBasedLog<String, String> configureConsumer(Map<String, ?> map, Map<String, ?> map2) {
        State state = this.state.get();
        if (state != State.NOT_STARTED) {
            throw new IllegalStateException("configureConsumer called in a state it can't start in: " + state);
        }
        String format = String.format("%s-%s-%s", this.networkIdRoutesTopic, ConfluentConfigs.ClientType.CONSUMER, map.get(KafkaConfig.BrokerIdProp()));
        Long l = (Long) map.get("confluent.cdc.api.keys.topic.load.timeout.ms");
        if (l == null || l.longValue() <= 0) {
            throw new ConfigException("Value for config confluent.cdc.api.keys.topic.load.timeout.ms must be positive integer when using networkId routes");
        }
        HashSet hashSet = new HashSet(ConsumerConfig.configNames());
        hashSet.remove("metric.reporters");
        HashMap hashMap = new HashMap(map2);
        hashMap.keySet().retainAll(hashSet);
        hashMap.put("client.id", format);
        hashMap.put("bootstrap.servers", map2.get("bootstrap.servers"));
        hashMap.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, false);
        hashMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        hashMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        hashMap.put("default.api.timeout.ms", Integer.valueOf((int) Math.min(l.longValue(), 2147483647L)));
        return new KafkaBasedLog<>(this.networkIdRoutesTopic, null, hashMap, () -> {
            return null;
        }, new ConsumeCallback(), this.time, null, l.longValue());
    }

    private Long getPeriodicStartTaskMs(Map<String, ?> map) {
        Long l = (Long) map.get("confluent.traffic.cdc.network.id.routes.periodic.start.task.ms");
        if (l == null || l.longValue() <= 0) {
            throw new ConfigException("Value for config confluent.traffic.cdc.network.id.routes.periodic.start.task.ms must be positive integer when using networkId routes");
        }
        return l;
    }

    private String getNetworkIdRoutesTopic(Map<String, ?> map) {
        String str = (String) map.get("confluent.traffic.cdc.network.id.routes.topic.name");
        if (str == null || str.isEmpty()) {
            throw new ConfigException("Value for config confluent.traffic.cdc.network.id.routes.topic.name can not be empty when networkId routes are enabled");
        }
        return str;
    }

    public void close() {
        if (this.sessionUuid == null) {
            LOG.warn("close() called without configure() being called first");
            return;
        }
        LOG.info("Closing consumer for session {}", this.sessionUuid);
        close(this.sessionUuid);
        this.metrics.removeSensor(this.topicLoadTimeSensor.name());
    }

    private void close(String str) {
        TrafficNetworkIdRoutesStore.removeRoutes(str);
        LOG.info("Removed instance for broker session {}", str);
        stopLog();
        stopPeriodicStartTask();
    }

    private void stopPeriodicStartTask() {
        if (this.executorService != null) {
            if (this.storeStartTaskFuture != null) {
                this.storeStartTaskFuture.cancel(false);
                this.storeStartTaskFuture = null;
            }
            this.executorService.shutdownNow();
            try {
                this.executorService.awaitTermination(TimeUnit.SECONDS.toMillis(30L), TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                LOG.debug("Shutting down executor was interrupted", (Throwable) e);
            }
            this.executorService = null;
        }
    }

    private void stopLog() {
        State andSet;
        State state = this.state.get();
        if (this.networkIdRoutesLog != null) {
            try {
                this.networkIdRoutesLog.stop();
                LOG.info("Successfully closed the consumer");
            } catch (Exception e) {
                LOG.error("Error when shutting down the consumer", (Throwable) e);
            }
        }
        if (state == State.NOT_ENABLED || state == State.FAILED_TO_START || (andSet = this.state.getAndSet(State.CLOSED)) == State.RUNNING || andSet == State.STARTING) {
            return;
        }
        LOG.debug("Asked to close from a non-running state {}", andSet);
    }

    void consume(ConsumerRecord<String, String> consumerRecord) {
        String key = consumerRecord.key();
        if (consumerRecord.key() == null) {
            LOG.error("Missing key in network ID route message! (partition = {}, offset = {}, timestamp = {})", Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset()), Long.valueOf(consumerRecord.timestamp()));
            return;
        }
        Long tryParseEventsSequenceId = AuthUtils.tryParseEventsSequenceId(consumerRecord);
        if (tryParseEventsSequenceId == null) {
            LOG.error("Missing sequence ID in network ID routes message! (partition = {}, offset = {}, timestamp = {})", Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset()), Long.valueOf(consumerRecord.timestamp()));
        } else if (verifySequenceId(tryParseEventsSequenceId.longValue()) && verifyKey(key)) {
            updateRoutes(consumerRecord, tryParseEventsSequenceId.longValue(), key);
        }
    }

    private void updateRoutes(ConsumerRecord<String, String> consumerRecord, long j, String str) {
        String value = consumerRecord.value();
        try {
            try {
                if (value == null) {
                    updateRoutesWithForceDisconnect(TrafficNetworkIdRoutes.EMPTY);
                    maybeLogRouteUpdate(str, j, value, false);
                } else {
                    TrafficNetworkIdAllowedRoutes trafficNetworkIdAllowedRoutes = (TrafficNetworkIdAllowedRoutes) this.objectMapper.readValue(value, TrafficNetworkIdAllowedRoutes.class);
                    updateRoutesWithForceDisconnect(new TrafficNetworkIdRoutes(trafficNetworkIdAllowedRoutes.allowedNetworkIds(), trafficNetworkIdAllowedRoutes.allowedDNSDomainSuffixes()));
                    maybeLogRouteUpdate(str, j, value, true);
                }
                this.lastSequenceId.set(Long.valueOf(j));
                LOG.trace("Finished reading record with sequence id: {}", Long.valueOf(j));
            } catch (Exception e) {
                LOG.error(String.format("Can't parse value in network ID routes message! (partition = %d, offset = %d, timestamp = %d, key = %s, sequence ID = %d)", Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset()), Long.valueOf(consumerRecord.timestamp()), str, Long.valueOf(j)), (Throwable) e);
                this.lastSequenceId.set(Long.valueOf(j));
                LOG.trace("Finished reading record with sequence id: {}", Long.valueOf(j));
            }
        } catch (Throwable th) {
            this.lastSequenceId.set(Long.valueOf(j));
            LOG.trace("Finished reading record with sequence id: {}", Long.valueOf(j));
            throw th;
        }
    }

    private void maybeLogRouteUpdate(String str, long j, String str2, boolean z) {
        maybeLogMessage(z ? "Updated routes for key {} from topic (sequence id: {}, jsonMsg:{}), new routes:{}" : "Deleted routes, read null value for key {} from topic (sequence id: {}, jsonMsg:{}), new routes:{}", str, Long.valueOf(j), str2, TrafficNetworkIdRoutesStore.getRoutes(this.sessionUuid));
    }

    private void maybeLogMessage(String str, Object... objArr) {
        if (State.RUNNING == this.state.get()) {
            LOG.info(str, objArr);
        } else {
            LOG.debug(str, objArr);
        }
    }

    Long getLastSeenSequenceId() {
        return this.lastSequenceId.get();
    }

    public Map<Endpoint, CompletableFuture<Void>> start(Collection<Endpoint> collection) {
        CompletableFuture<Void> completedFuture;
        LOG.info("Starting store from state {}", this.state);
        if (this.state.get() == State.NOT_ENABLED) {
            LOG.debug("Trying to start from a non enabled state. Ignoring");
            return Collections.emptyMap();
        }
        if (!this.state.compareAndSet(State.NOT_STARTED, State.STARTING)) {
            throw new IllegalStateException("Trying to start a log from a state it can't be started in");
        }
        try {
            if (this.metadataCache.contains(this.networkIdRoutesTopic)) {
                completedFuture = CompletableFuture.runAsync(() -> {
                    startLog();
                });
            } else {
                completedFuture = CompletableFuture.completedFuture(null);
                startPeriodicStartTask();
            }
            CompletableFuture<Void> completableFuture = completedFuture;
            return (Map) collection.stream().collect(Collectors.toMap(Function.identity(), endpoint -> {
                Optional<String> listenerName = endpoint.listenerName();
                List<String> list = this.routesListenerNames;
                list.getClass();
                return ((Boolean) listenerName.map((v1) -> {
                    return r1.contains(v1);
                }).orElse(false)).booleanValue() ? completableFuture : CompletableFuture.completedFuture(null);
            }));
        } catch (Exception e) {
            this.state.set(State.FAILED_TO_START);
            LOG.error("Setting state as FAILED_TO_START due to:", (Throwable) e);
            throw new IllegalStateException("Unable to create a future for startLog()", e);
        }
    }

    private void startPeriodicStartTask() {
        LOG.info("Starting periodic task for checking topic {} existence and consumer start", this.networkIdRoutesTopic);
        this.executorService = Executors.newSingleThreadScheduledExecutor(ThreadUtils.createThreadFactory("traffic-route-store-%d", true));
        this.storeStartTaskFuture = this.executorService.scheduleAtFixedRate(() -> {
            LOG.trace("Running periodic task for checking topic {} existence and starting consumer", this.networkIdRoutesTopic);
            if (startLog()) {
                LOG.info("Successfully started the consumer on {} and loaded routes", this.networkIdRoutesTopic);
                this.storeStartTaskFuture.cancel(false);
            }
        }, this.periodicStartTaskMs, this.periodicStartTaskMs, TimeUnit.MILLISECONDS);
    }

    private boolean startLog() {
        if (this.state.get() != State.STARTING) {
            throw new IllegalStateException("Trying to start log from a non starting state");
        }
        if (!this.metadataCache.contains(this.networkIdRoutesTopic)) {
            return false;
        }
        try {
            long nanoseconds = this.time.nanoseconds();
            this.networkIdRoutesLog.start();
            this.state.set(State.RUNNING);
            long nanoseconds2 = this.time.nanoseconds() - nanoseconds;
            this.topicLoadTimeSensor.record(nanoseconds2);
            LOG.info("Consumed initial set of network ID routes from topic took {} nanoseconds, routes:{}", Long.valueOf(nanoseconds2), TrafficNetworkIdRoutesStore.getRoutes(this.sessionUuid));
            return true;
        } catch (Exception e) {
            this.state.set(State.FAILED_TO_START);
            LOG.error("Setting state as FAILED_TO_START, unable to start consuming network ID routes from topic due to:", (Throwable) e);
            throw new IllegalStateException("Unable to start consuming network ID routes from topic", e);
        }
    }

    boolean verifySequenceId(long j) {
        Long lastSeenSequenceId = getLastSeenSequenceId();
        if (lastSeenSequenceId == null || j > lastSeenSequenceId.longValue()) {
            return true;
        }
        LOG.info("Received network ID routes for with an earlier sequence id (last seen = {}, recent = {}), ignoring", lastSeenSequenceId, Long.valueOf(j));
        return false;
    }

    boolean verifyKey(String str) {
        String[] split = str.split(QualifiedSubject.CONTEXT_DELIMITER);
        if (split.length == 2 && Objects.equals(split[0], this.clusterNetworkId)) {
            return true;
        }
        LOG.warn("Received record with key = {} not matching this cluster's networkId = {}, ignoring", str, this.clusterNetworkId);
        return false;
    }

    private void updateRoutesWithForceDisconnect(TrafficNetworkIdRoutes trafficNetworkIdRoutes) {
        TrafficNetworkIdRoutes routes = TrafficNetworkIdRoutesStore.getRoutes(this.sessionUuid);
        TrafficNetworkIdRoutesStore.addRoutes(this.sessionUuid, trafficNetworkIdRoutes);
        if (routes != null) {
            Sets.difference(routes.networkIdRoutes(), trafficNetworkIdRoutes.networkIdRoutes()).forEach(str -> {
                closeConnections(str);
            });
        }
    }

    private void closeConnections(String str) {
        BrokerSession session = this.sessionUuid != null ? BrokerSession.session(this.sessionUuid) : null;
        if (session == null) {
            LOG.warn("Ignoring close of connections from disallowedNetworkId {} because broker session {} is not available.", str, this.sessionUuid);
        } else {
            maybeLogMessage("Forcing close of connections from disallowedNetworkId {} for broker session {}", str, this.sessionUuid);
            SUPPORTED_SASL_MECHANISMS.forEach(str2 -> {
                session.handleCredentialDelete(PublicCredential.saslNetworkIdCredential(str, str2));
            });
        }
    }
}
