package org.openremote.manager.mqtt;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.mqtt.MqttQoS;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.regex.Pattern;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.keycloak.KeycloakSecurityContext;
import org.openremote.container.security.AuthContext;
import org.openremote.manager.asset.AssetProcessingService;
import org.openremote.model.PersistenceEvent;
import org.openremote.model.asset.AssetEvent;
import org.openremote.model.asset.AssetFilter;
import org.openremote.model.asset.UserAssetLink;
import org.openremote.model.attribute.AttributeEvent;
import org.openremote.model.event.Event;
import org.openremote.model.event.shared.EventSubscription;
import org.openremote.model.syslog.SyslogCategory;
import org.openremote.model.util.ValueUtil;

/* loaded from: input_file:org/openremote/manager/mqtt/DefaultMQTTHandler.class */
public class DefaultMQTTHandler extends MQTTHandler {
    public static final int PRIORITY = -2147482648;
    public static final String ASSET_TOPIC = "asset";
    public static final String ATTRIBUTE_TOPIC = "attribute";
    public static final String ATTRIBUTE_VALUE_TOPIC = "attributevalue";
    public static final String ATTRIBUTE_VALUE_WRITE_TOPIC = "writeattributevalue";
    public static final String ATTRIBUTE_WRITE_TOPIC = "writeattribute";
    private static final Logger LOG = SyslogCategory.getLogger(SyslogCategory.API, DefaultMQTTHandler.class);
    protected final Map<String, Map<String, Consumer<? extends Event>>> sessionSubscriptionConsumers = new HashMap();
    protected final Cache<String, ConcurrentHashSet<String>> authorizationCache = CacheBuilder.newBuilder().maximumSize(100000).expireAfterWrite(300000, TimeUnit.MILLISECONDS).build();

    @Override // org.openremote.manager.mqtt.MQTTHandler
    public int getPriority() {
        return PRIORITY;
    }

    @Override // org.openremote.manager.mqtt.MQTTHandler
    public void onConnect(RemotingConnection remotingConnection) {
        super.onConnect(remotingConnection);
    }

    @Override // org.openremote.manager.mqtt.MQTTHandler
    public void onDisconnect(RemotingConnection remotingConnection) {
        super.onDisconnect(remotingConnection);
        String sessionKey = getSessionKey(remotingConnection);
        LOG.log(Level.FINER, "Removing subscriptions for connection: " + MQTTBrokerService.connectionToString(remotingConnection));
        synchronized (this.sessionSubscriptionConsumers) {
            this.sessionSubscriptionConsumers.computeIfPresent(sessionKey, (str, map) -> {
                map.forEach((str, consumer) -> {
                    this.clientEventService.removeSubscription(consumer);
                });
                return null;
            });
        }
        this.authorizationCache.invalidate(MQTTBrokerService.getConnectionIDString(remotingConnection));
    }

    @Override // org.openremote.manager.mqtt.MQTTHandler
    public void onConnectionLost(RemotingConnection remotingConnection) {
        super.onConnectionLost(remotingConnection);
        String sessionKey = getSessionKey(remotingConnection);
        LOG.log(Level.FINER, "Removing subscriptions for connection: " + MQTTBrokerService.connectionToString(remotingConnection));
        synchronized (this.sessionSubscriptionConsumers) {
            this.sessionSubscriptionConsumers.computeIfPresent(sessionKey, (str, map) -> {
                map.forEach((str, consumer) -> {
                    this.clientEventService.removeSubscription(consumer);
                });
                return null;
            });
        }
        this.authorizationCache.invalidate(MQTTBrokerService.getConnectionIDString(remotingConnection));
    }

    @Override // org.openremote.manager.mqtt.MQTTHandler
    public boolean topicMatches(Topic topic) {
        return isAttributeTopic(topic) || isAssetTopic(topic) || isAttributeValueWriteTopic(topic) || isAttributeWriteTopic(topic);
    }

    @Override // org.openremote.manager.mqtt.MQTTHandler
    protected Logger getLogger() {
        return LOG;
    }

    @Override // org.openremote.manager.mqtt.MQTTHandler
    public boolean canSubscribe(RemotingConnection remotingConnection, KeycloakSecurityContext keycloakSecurityContext, Topic topic) {
        if (!this.isKeycloak) {
            LOG.finest("Identity provider is not keycloak");
            return false;
        }
        AuthContext authContextFromSecurityContext = getAuthContextFromSecurityContext(keycloakSecurityContext);
        if (authContextFromSecurityContext == null) {
            Logger logger = LOG;
            String valueOf = String.valueOf(topic);
            MQTTBrokerService mQTTBrokerService = this.mqttBrokerService;
            logger.finest("Anonymous connection not supported: topic=" + valueOf + ", " + MQTTBrokerService.connectionToString(remotingConnection));
            return false;
        }
        boolean isAttributeTopic = isAttributeTopic(topic);
        boolean isAssetTopic = isAssetTopic(topic);
        if (!isAssetTopic && !isAttributeTopic) {
            Logger logger2 = LOG;
            String valueOf2 = String.valueOf(topic);
            MQTTBrokerService mQTTBrokerService2 = this.mqttBrokerService;
            logger2.finest("Topic must have 3 or more tokens and third token must be 'asset, attribute or attributevalue': topic=" + valueOf2 + ", " + MQTTBrokerService.connectionToString(remotingConnection));
            return false;
        }
        if (isAssetTopic) {
            if (topic.getTokens().size() < 4 || topic.getTokens().size() > 5) {
                Logger logger3 = LOG;
                String valueOf3 = String.valueOf(topic);
                MQTTBrokerService mQTTBrokerService3 = this.mqttBrokerService;
                logger3.finest("Asset subscribe token count should be 4 or 5: topic=" + valueOf3 + ", " + MQTTBrokerService.connectionToString(remotingConnection));
                return false;
            }
            if (topic.getTokens().size() == 4) {
                if (!Pattern.matches("^[0-9A-Za-z]{22}$", topicTokenIndexToString(topic, 3)) && !"#".equals(topicTokenIndexToString(topic, 3)) && !"+".equals(topicTokenIndexToString(topic, 3))) {
                    Logger logger4 = LOG;
                    String valueOf4 = String.valueOf(topic);
                    MQTTBrokerService mQTTBrokerService4 = this.mqttBrokerService;
                    logger4.fine("Asset subscribe forth token must be an asset ID or wildcard: topic=" + valueOf4 + ", " + MQTTBrokerService.connectionToString(remotingConnection));
                    return false;
                }
            } else if (topic.getTokens().size() == 5) {
                if (!Pattern.matches("^[0-9A-Za-z]{22}$", topicTokenIndexToString(topic, 3))) {
                    Logger logger5 = LOG;
                    String valueOf5 = String.valueOf(topic);
                    MQTTBrokerService mQTTBrokerService5 = this.mqttBrokerService;
                    logger5.fine("Asset subscribe forth token must be an asset ID: topic=" + valueOf5 + ", " + MQTTBrokerService.connectionToString(remotingConnection));
                    return false;
                }
                if (!"#".equals(topicTokenIndexToString(topic, 4)) && !"+".equals(topicTokenIndexToString(topic, 4))) {
                    Logger logger6 = LOG;
                    String valueOf6 = String.valueOf(topic);
                    MQTTBrokerService mQTTBrokerService6 = this.mqttBrokerService;
                    logger6.fine("Asset subscribe fifth token must be a wildcard: topic=" + valueOf6 + ", " + MQTTBrokerService.connectionToString(remotingConnection));
                    return false;
                }
            }
        } else {
            if (topic.getTokens().size() < 5 || topic.getTokens().size() > 6) {
                Logger logger7 = LOG;
                String valueOf7 = String.valueOf(topic);
                MQTTBrokerService mQTTBrokerService7 = this.mqttBrokerService;
                logger7.fine("Attribute subscribe token count should be 5 or 6: topic=" + valueOf7 + ", " + MQTTBrokerService.connectionToString(remotingConnection));
                return false;
            }
            if (topic.getTokens().size() == 5) {
                if ("#".equals(topicTokenIndexToString(topic, 3))) {
                    Logger logger8 = LOG;
                    String valueOf8 = String.valueOf(topic);
                    MQTTBrokerService mQTTBrokerService8 = this.mqttBrokerService;
                    logger8.fine("Attribute subscribe multi level wildcard must be last token: topic=" + valueOf8 + ", " + MQTTBrokerService.connectionToString(remotingConnection));
                    return false;
                }
                if (!Pattern.matches("^[0-9A-Za-z]{22}$", topicTokenIndexToString(topic, 4)) && !"#".equals(topicTokenIndexToString(topic, 4)) && !"+".equals(topicTokenIndexToString(topic, 4))) {
                    Logger logger9 = LOG;
                    String valueOf9 = String.valueOf(topic);
                    MQTTBrokerService mQTTBrokerService9 = this.mqttBrokerService;
                    logger9.fine("Attribute subscribe fifth token must be an asset ID or a wildcard: topic=" + valueOf9 + ", " + MQTTBrokerService.connectionToString(remotingConnection));
                    return false;
                }
            } else if (topic.getTokens().size() == 6) {
                if (!Pattern.matches("^[0-9A-Za-z]{22}$", topicTokenIndexToString(topic, 4))) {
                    Logger logger10 = LOG;
                    String valueOf10 = String.valueOf(topic);
                    MQTTBrokerService mQTTBrokerService10 = this.mqttBrokerService;
                    logger10.fine("Attribute subscribe fifth token must be an asset ID: topic=" + valueOf10 + ", " + MQTTBrokerService.connectionToString(remotingConnection));
                    return false;
                }
                if (!"#".equals(topicTokenIndexToString(topic, 5)) && !"+".equals(topicTokenIndexToString(topic, 5))) {
                    Logger logger11 = LOG;
                    String valueOf11 = String.valueOf(topic);
                    MQTTBrokerService mQTTBrokerService11 = this.mqttBrokerService;
                    logger11.fine("Attribute subscribe sixth token must be a wildcard: topic=" + valueOf11 + ", " + MQTTBrokerService.connectionToString(remotingConnection));
                    return false;
                }
            }
        }
        AssetFilter<?> buildAssetFilter = buildAssetFilter(topic);
        if (buildAssetFilter != null) {
            return this.clientEventService.authorizeEventSubscription(topicRealm(topic), authContextFromSecurityContext, new EventSubscription<>(isAssetTopic ? AssetEvent.class : AttributeEvent.class, buildAssetFilter));
        }
        Logger logger12 = LOG;
        String valueOf12 = String.valueOf(topic);
        MQTTBrokerService mQTTBrokerService12 = this.mqttBrokerService;
        logger12.info("Failed to process subscription topic: topic=" + valueOf12 + ", " + MQTTBrokerService.connectionToString(remotingConnection));
        return false;
    }

    @Override // org.openremote.manager.mqtt.MQTTHandler
    public boolean canPublish(RemotingConnection remotingConnection, KeycloakSecurityContext keycloakSecurityContext, Topic topic) {
        ConcurrentHashSet concurrentHashSet;
        if (!this.isKeycloak) {
            LOG.fine("Identity provider is not keycloak");
            return false;
        }
        AuthContext authContextFromSecurityContext = getAuthContextFromSecurityContext(keycloakSecurityContext);
        if (authContextFromSecurityContext == null) {
            LOG.finer("Anonymous publish not supported: topic=" + String.valueOf(topic) + ", connection=" + MQTTBrokerService.connectionToString(remotingConnection));
            return false;
        }
        if (!isAttributeValueWriteTopic(topic) && !isAttributeWriteTopic(topic)) {
            return false;
        }
        if (topic.getTokens().size() != 5 || !Pattern.matches("^[0-9A-Za-z]{22}$", topicTokenIndexToString(topic, 4))) {
            LOG.finer("Invalid publish topic: topic=" + String.valueOf(topic) + ", connection=" + MQTTBrokerService.connectionToString(remotingConnection));
            return false;
        }
        String connectionIDString = MQTTBrokerService.getConnectionIDString(remotingConnection);
        ConcurrentHashSet concurrentHashSet2 = (ConcurrentHashSet) this.authorizationCache.getIfPresent(connectionIDString);
        if (concurrentHashSet2 != null && concurrentHashSet2.contains(topic.getString())) {
            return true;
        }
        if (!this.clientEventService.authorizeEventWrite(topicRealm(topic), authContextFromSecurityContext, buildAttributeEvent(topic.getTokens(), null, null))) {
            LOG.fine("Publish was not authorised for this user and topic: topic=" + String.valueOf(topic) + ", subject=" + String.valueOf(authContextFromSecurityContext));
            return false;
        }
        synchronized (this.authorizationCache) {
            ConcurrentHashSet concurrentHashSet3 = (ConcurrentHashSet) this.authorizationCache.getIfPresent(connectionIDString);
            if (concurrentHashSet3 != null) {
                concurrentHashSet = concurrentHashSet3;
            } else {
                concurrentHashSet = new ConcurrentHashSet();
                this.authorizationCache.put(connectionIDString, concurrentHashSet);
            }
        }
        concurrentHashSet.add(topic.getString());
        return true;
    }

    @Override // org.openremote.manager.mqtt.MQTTHandler
    public void onSubscribe(RemotingConnection remotingConnection, Topic topic) {
        boolean isAssetTopic = isAssetTopic(topic);
        String string = topic.getString();
        AssetFilter<?> buildAssetFilter = buildAssetFilter(topic);
        Class<AssetEvent> cls = isAssetTopic ? AssetEvent.class : AttributeEvent.class;
        String sessionKey = getSessionKey(remotingConnection);
        if (buildAssetFilter == null) {
            LOG.info("Invalid event filter generated for topic '" + String.valueOf(topic) + "': " + MQTTBrokerService.connectionToString(remotingConnection));
            return;
        }
        Consumer<? extends Event> subscriptionEventConsumer = getSubscriptionEventConsumer(remotingConnection, topic);
        EventSubscription<? extends Event> eventSubscription = new EventSubscription<>(cls, buildAssetFilter, string);
        synchronized (this.sessionSubscriptionConsumers) {
            this.sessionSubscriptionConsumers.computeIfAbsent(sessionKey, str -> {
                return new HashMap();
            }).put(string, subscriptionEventConsumer);
            this.clientEventService.addSubscription(eventSubscription, subscriptionEventConsumer);
            LOG.finest(() -> {
                return "Client event subscription created for topic '" + String.valueOf(topic) + "': " + MQTTBrokerService.connectionToString(remotingConnection);
            });
        }
    }

    @Override // org.openremote.manager.mqtt.MQTTHandler
    public void onUnsubscribe(RemotingConnection remotingConnection, Topic topic) {
        String topic2 = topic.toString();
        String sessionKey = getSessionKey(remotingConnection);
        synchronized (this.sessionSubscriptionConsumers) {
            this.sessionSubscriptionConsumers.computeIfPresent(sessionKey, (str, map) -> {
                Consumer<? extends Event> consumer = (Consumer) map.remove(topic2);
                if (consumer != null) {
                    this.clientEventService.removeSubscription(consumer);
                }
                if (map.isEmpty()) {
                    return null;
                }
                return map;
            });
        }
    }

    @Override // org.openremote.manager.mqtt.MQTTHandler
    public Set<String> getPublishListenerTopics() {
        return Set.of("+/+/writeattributevalue/#", "+/+/writeattribute/#");
    }

    @Override // org.openremote.manager.mqtt.MQTTHandler
    public void onPublish(RemotingConnection remotingConnection, Topic topic, ByteBuf byteBuf) {
        AttributeEvent buildAttributeEvent;
        List<String> tokens = topic.getTokens();
        String byteBuf2 = byteBuf.toString(StandardCharsets.UTF_8);
        if (isAttributeWriteTopic(topic)) {
            buildAttributeEvent = (AttributeEvent) ValueUtil.parse(byteBuf2, ObjectNode.class).map(objectNode -> {
                if (!objectNode.has("value") || !objectNode.has("timestamp")) {
                    return null;
                }
                JsonNode jsonNode = objectNode.get("value");
                long asLong = objectNode.get("timestamp").asLong();
                if (asLong > 0) {
                    return buildAttributeEvent(tokens, jsonNode, Long.valueOf(asLong));
                }
                return null;
            }).orElse(null);
            if (buildAttributeEvent == null) {
                LOG.info(() -> {
                    return "Invalid publish to write attribute topic '" + String.valueOf(topic) + "': " + MQTTBrokerService.connectionToString(remotingConnection);
                });
            }
        } else {
            buildAttributeEvent = buildAttributeEvent(tokens, ValueUtil.parse(byteBuf2).orElse(null), Long.valueOf(this.timerService.getCurrentTimeMillis()));
        }
        if (buildAttributeEvent != null) {
            this.messageBrokerService.getFluentProducerTemplate().withBody(buildAttributeEvent).to(AssetProcessingService.ATTRIBUTE_EVENT_PROCESSOR).asyncSend();
        }
    }

    @Override // org.openremote.manager.mqtt.MQTTHandler
    public void onUserAssetLinksChanged(RemotingConnection remotingConnection, List<PersistenceEvent<UserAssetLink>> list) {
        if (!this.sessionSubscriptionConsumers.containsKey(getSessionKey(remotingConnection)) || list.stream().allMatch(persistenceEvent -> {
            return persistenceEvent.getCause() == PersistenceEvent.Cause.CREATE;
        })) {
            return;
        }
        LOG.info("User asset links have changed for a connected user with active subscriptions so force disconnecting them: " + MQTTBrokerService.connectionToString(remotingConnection));
        this.mqttBrokerService.doForceDisconnect(remotingConnection);
    }

    protected static AttributeEvent buildAttributeEvent(List<String> list, Object obj, Long l) {
        return new AttributeEvent(list.get(4), list.get(3), obj, l).setSource(DefaultMQTTHandler.class.getSimpleName());
    }

    protected static AssetFilter<?> buildAssetFilter(Topic topic) {
        boolean isAssetTopic = isAssetTopic(topic);
        String str = topicRealm(topic);
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        ArrayList arrayList3 = new ArrayList();
        ArrayList arrayList4 = new ArrayList();
        String str2 = topicTokenIndexToString(topic, 3);
        if (!isAssetTopic) {
            if (!"+".equals(str2)) {
                arrayList4.add(str2);
            }
            if (topic.getTokens().size() == 5) {
                String str3 = topicTokenIndexToString(topic, 4);
                if (!"#".equals(str3)) {
                    if ("+".equals(str3)) {
                        arrayList2.add(null);
                    } else {
                        arrayList.add(str3);
                    }
                }
            } else {
                if (topic.getTokens().size() != 6) {
                    return null;
                }
                String str4 = topicTokenIndexToString(topic, 5);
                if ("#".equals(str4)) {
                    arrayList3.add(topicTokenIndexToString(topic, 4));
                } else if ("+".equals(str4)) {
                    arrayList2.add(topicTokenIndexToString(topic, 4));
                }
            }
        } else if (topic.getTokens().size() == 4) {
            if (!"#".equals(str2)) {
                if ("+".equals(str2)) {
                    arrayList2.add(null);
                } else {
                    arrayList.add(str2);
                }
            }
        } else {
            if (topic.getTokens().size() != 5) {
                return null;
            }
            String str5 = topicTokenIndexToString(topic, 4);
            if ("#".equals(str5)) {
                arrayList3.add(str2);
            } else if ("+".equals(str5)) {
                arrayList2.add(str2);
            }
        }
        AssetFilter valueChanged = new AssetFilter().setRealm(str).setValueChanged(true);
        if (!arrayList.isEmpty()) {
            valueChanged.setAssetIds((String[]) arrayList.toArray(new String[0]));
        }
        if (!arrayList2.isEmpty()) {
            valueChanged.setParentIds((String[]) arrayList2.toArray(new String[0]));
        }
        if (!arrayList3.isEmpty()) {
            valueChanged.setPath((String[]) arrayList3.toArray(new String[0]));
        }
        if (!arrayList4.isEmpty()) {
            valueChanged.setAttributeNames((String[]) arrayList4.toArray(new String[0]));
        }
        return valueChanged.setValueChanged(true);
    }

    protected <T extends Event> Consumer<T> getSubscriptionEventConsumer(RemotingConnection remotingConnection, Topic topic) {
        Function function;
        boolean equalsIgnoreCase = ATTRIBUTE_VALUE_TOPIC.equalsIgnoreCase(topicTokenIndexToString(topic, 2));
        boolean isAssetTopic = isAssetTopic(topic);
        MqttQoS mqttQoS = MqttQoS.AT_MOST_ONCE;
        if (isAssetTopic) {
            String topic2 = topic.toString();
            String str = topic2.endsWith("#") ? "#" : topic2.endsWith("+") ? "+" : null;
            function = event -> {
                return str != null ? topic2.replace(str, ((AssetEvent) event).getId()) : topic2;
            };
        } else {
            String topic3 = topic.toString();
            boolean equals = "+".equals(topicTokenIndexToString(topic, 3));
            if (equals) {
                topic3 = topic3.replaceFirst("\\+", "\\$");
            }
            String str2 = topic3.endsWith("#") ? "#" : topic3.endsWith("+") ? "+" : null;
            String str3 = topic3;
            function = event2 -> {
                String replace = str2 != null ? str3.replace(str2, ((AttributeEvent) event2).getId()) : str3;
                if (equals) {
                    replace = replace.replace("$", ((AttributeEvent) event2).getName());
                }
                return replace;
            };
        }
        Function function2 = function;
        return event3 -> {
            if (isAssetTopic) {
                if (event3 instanceof AssetEvent) {
                    publishMessage((String) function2.apply(event3), event3, mqttQoS);
                }
            } else if (event3 instanceof AttributeEvent) {
                AttributeEvent attributeEvent = (AttributeEvent) event3;
                if (equalsIgnoreCase) {
                    publishMessage((String) function2.apply(event3), attributeEvent.getValue().orElse(null), mqttQoS);
                } else {
                    publishMessage((String) function2.apply(event3), event3, mqttQoS);
                }
            }
        };
    }

    protected static boolean isAttributeTopic(Topic topic) {
        return ATTRIBUTE_TOPIC.equalsIgnoreCase(topicTokenIndexToString(topic, 2)) || ATTRIBUTE_VALUE_TOPIC.equalsIgnoreCase(topicTokenIndexToString(topic, 2));
    }

    protected static boolean isAttributeValueWriteTopic(Topic topic) {
        return ATTRIBUTE_VALUE_WRITE_TOPIC.equalsIgnoreCase(topicTokenIndexToString(topic, 2));
    }

    protected static boolean isAttributeWriteTopic(Topic topic) {
        return ATTRIBUTE_WRITE_TOPIC.equalsIgnoreCase(topicTokenIndexToString(topic, 2));
    }

    protected static boolean isAssetTopic(Topic topic) {
        return ASSET_TOPIC.equalsIgnoreCase(topicTokenIndexToString(topic, 2));
    }

    protected static String getSessionKey(RemotingConnection remotingConnection) {
        return MQTTBrokerService.getConnectionIDString(remotingConnection);
    }
}
