package org.openremote.manager.event;

import io.undertow.websockets.core.WebSocketChannel;
import io.undertow.websockets.spi.WebSocketHttpExchange;
import java.io.IOException;
import java.lang.System;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.undertow.UndertowComponent;
import org.apache.camel.component.undertow.UndertowConstants;
import org.apache.camel.component.undertow.UndertowHostKey;
import org.keycloak.KeycloakPrincipal;
import org.openremote.container.message.MessageBrokerService;
import org.openremote.container.security.AuthContext;
import org.openremote.container.security.basic.BasicAuthContext;
import org.openremote.container.security.keycloak.AccessTokenAuthContext;
import org.openremote.container.timer.TimerService;
import org.openremote.manager.asset.AssetProcessingService;
import org.openremote.manager.gateway.GatewayService;
import org.openremote.manager.security.ManagerIdentityService;
import org.openremote.model.Container;
import org.openremote.model.ContainerService;
import org.openremote.model.asset.AssetFilter;
import org.openremote.model.attribute.AttributeEvent;
import org.openremote.model.event.Event;
import org.openremote.model.event.RespondableEvent;
import org.openremote.model.event.TriggeredEventSubscription;
import org.openremote.model.event.shared.CancelEventSubscription;
import org.openremote.model.event.shared.EventFilter;
import org.openremote.model.event.shared.EventSubscription;
import org.openremote.model.event.shared.SharedEvent;
import org.openremote.model.event.shared.UnauthorizedEventSubscription;
import org.openremote.model.syslog.SyslogEvent;
import org.openremote.model.util.Pair;

/* loaded from: input_file:org/openremote/manager/event/ClientEventService.class */
public class ClientEventService extends RouteBuilder implements ContainerService {
    public static final int PRIORITY = 2147482547;
    public static final String WEBSOCKET_URI = "undertow://ws://0.0.0.0/websocket/events?fireWebSocketChannelEvents=true&sendTimeout=15000";
    protected static final System.Logger LOG = System.getLogger(ClientEventService.class.getName());
    protected static final String PUBLISH_QUEUE = "direct://ClientPublishQueue";
    protected final Collection<EventSubscriptionAuthorizer> eventSubscriptionAuthorizers = new CopyOnWriteArraySet();
    protected final Collection<EventAuthorizer> eventAuthorizers = new CopyOnWriteArraySet();
    protected final Set<Pair<EventSubscription<? extends Event>, Consumer<? extends Event>>> eventSubscriptions = new CopyOnWriteArraySet();
    protected final Map<String, WebSocketChannel> sessionChannels = new ConcurrentHashMap();
    protected final Map<String, Map<String, Consumer<? extends Event>>> websocketSessionSubscriptionConsumers = new HashMap();
    protected TimerService timerService;
    protected ExecutorService executorService;
    protected MessageBrokerService messageBrokerService;
    protected ManagerIdentityService identityService;
    protected GatewayService gatewayService;
    protected boolean started;
    protected Consumer<Exchange> gatewayInterceptor;

    /* renamed from: org.openremote.manager.event.ClientEventService$2, reason: invalid class name */
    /* loaded from: input_file:org/openremote/manager/event/ClientEventService$2.class */
    static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$camel$component$undertow$UndertowConstants$EventType = new int[UndertowConstants.EventType.values().length];

        static {
            try {
                $SwitchMap$org$apache$camel$component$undertow$UndertowConstants$EventType[UndertowConstants.EventType.ONOPEN.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$camel$component$undertow$UndertowConstants$EventType[UndertowConstants.EventType.ONCLOSE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$camel$component$undertow$UndertowConstants$EventType[UndertowConstants.EventType.ONERROR.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    public static String getSessionKey(Exchange exchange) {
        return (String) exchange.getIn().getHeader("websocket.connectionKey", String.class);
    }

    public static String getClientId(Exchange exchange) {
        AuthContext authContext = (AuthContext) exchange.getIn().getHeader("AUTH_CONTEXT", AuthContext.class);
        if (authContext != null) {
            return authContext.getClientId();
        }
        return null;
    }

    public int getPriority() {
        return PRIORITY;
    }

    public void init(final Container container) throws Exception {
        this.timerService = container.getService(TimerService.class);
        this.messageBrokerService = container.getService(MessageBrokerService.class);
        this.identityService = container.getService(ManagerIdentityService.class);
        this.gatewayService = (GatewayService) container.getService(GatewayService.class);
        this.executorService = container.getExecutor();
        this.messageBrokerService.getContext().addComponent("undertow", new UndertowComponent(this, this.messageBrokerService.getContext()) { // from class: org.openremote.manager.event.ClientEventService.1
            protected org.apache.camel.component.undertow.UndertowHost createUndertowHost(UndertowHostKey undertowHostKey) {
                return new UndertowHost(container, undertowHostKey, getHostOptions());
            }
        });
        this.messageBrokerService.getContext().getTypeConverterRegistry().addTypeConverters(new EventTypeConverters());
        this.messageBrokerService.getContext().addRoutes(this);
    }

    public void configure() throws Exception {
        from(WEBSOCKET_URI).routeId("ClientInbound-Websocket").routeConfigurationId(AssetProcessingService.ATTRIBUTE_EVENT_ROUTE_CONFIG_ID).threads().executorService(this.executorService).choice().when(header("websocket.eventType")).process(exchange -> {
            UndertowConstants.EventType eventType = (UndertowConstants.EventType) exchange.getIn().getHeader("websocket.eventTypeEnum", UndertowConstants.EventType.class);
            WebSocketChannel webSocketChannel = (WebSocketChannel) exchange.getIn().getHeader("websocket.channel", WebSocketChannel.class);
            switch (AnonymousClass2.$SwitchMap$org$apache$camel$component$undertow$UndertowConstants$EventType[eventType.ordinal()]) {
                case 1:
                    WebSocketHttpExchange webSocketHttpExchange = (WebSocketHttpExchange) exchange.getIn().getHeader("websocket.exchange", WebSocketHttpExchange.class);
                    String requestHeader = webSocketHttpExchange.getRequestHeader("Realm");
                    AccessTokenAuthContext userPrincipal = webSocketHttpExchange.getUserPrincipal();
                    AccessTokenAuthContext accessTokenAuthContext = null;
                    if (userPrincipal instanceof KeycloakPrincipal) {
                        KeycloakPrincipal keycloakPrincipal = (KeycloakPrincipal) userPrincipal;
                        accessTokenAuthContext = new AccessTokenAuthContext(keycloakPrincipal.getKeycloakSecurityContext().getRealm(), keycloakPrincipal.getKeycloakSecurityContext().getToken());
                    } else if (userPrincipal instanceof BasicAuthContext) {
                        accessTokenAuthContext = (BasicAuthContext) userPrincipal;
                    } else if (userPrincipal != null) {
                        LOG.log(System.Logger.Level.INFO, "Unsupported user principal type: " + String.valueOf(userPrincipal));
                    }
                    if (accessTokenAuthContext != null && accessTokenAuthContext.getUsername().startsWith("service-account-")) {
                        webSocketChannel.setIdleTimeout(30000L);
                    }
                    webSocketChannel.setAttribute("AUTH_CONTEXT", accessTokenAuthContext);
                    webSocketChannel.setAttribute("Realm", requestHeader);
                    exchange.getIn().setHeader("AUTH_CONTEXT", accessTokenAuthContext);
                    exchange.getIn().setHeader("Realm", requestHeader);
                    exchange.getIn().setHeader("connection.sessionOpen", true);
                    this.sessionChannels.put(getSessionKey(exchange), webSocketChannel);
                    LOG.log(System.Logger.Level.DEBUG, "Client connection created: " + String.valueOf(webSocketChannel.getSourceAddress()));
                    break;
                case 2:
                    AuthContext authContext = (AuthContext) webSocketChannel.getAttribute("AUTH_CONTEXT");
                    String str = (String) webSocketChannel.getAttribute("Realm");
                    String sessionKey = getSessionKey(exchange);
                    exchange.getIn().setHeader("AUTH_CONTEXT", authContext);
                    exchange.getIn().setHeader("Realm", str);
                    exchange.getIn().setHeader("connection.sessionClose", true);
                    this.sessionChannels.remove(getSessionKey(exchange));
                    LOG.log(System.Logger.Level.DEBUG, "Client connection closed: " + String.valueOf(webSocketChannel.getSourceAddress()));
                    LOG.log(System.Logger.Level.TRACE, "Removing subscriptions for session: " + sessionKey);
                    synchronized (this.websocketSessionSubscriptionConsumers) {
                        this.websocketSessionSubscriptionConsumers.computeIfPresent(sessionKey, (str2, map) -> {
                            map.forEach((str2, consumer) -> {
                                removeSubscription(consumer);
                            });
                            return null;
                        });
                    }
                    break;
                case 3:
                    AuthContext authContext2 = (AuthContext) webSocketChannel.getAttribute("AUTH_CONTEXT");
                    String str3 = (String) webSocketChannel.getAttribute("Realm");
                    String sessionKey2 = getSessionKey(exchange);
                    exchange.getIn().setHeader("AUTH_CONTEXT", authContext2);
                    exchange.getIn().setHeader("Realm", str3);
                    exchange.getIn().setHeader("connection.sessionCloseError", true);
                    LOG.log(System.Logger.Level.DEBUG, "Client connection error: " + String.valueOf(webSocketChannel.getSourceAddress()));
                    try {
                        webSocketChannel.close();
                    } catch (Exception e) {
                    }
                    this.sessionChannels.remove(getSessionKey(exchange));
                    LOG.log(System.Logger.Level.TRACE, "Removing subscriptions for session: " + sessionKey2);
                    synchronized (this.websocketSessionSubscriptionConsumers) {
                        this.websocketSessionSubscriptionConsumers.computeIfPresent(sessionKey2, (str4, map2) -> {
                            map2.forEach((str4, consumer) -> {
                                removeSubscription(consumer);
                            });
                            return null;
                        });
                    }
                    break;
            }
            if (this.gatewayInterceptor != null) {
                this.gatewayInterceptor.accept(exchange);
            }
        }).stop().endChoice().end().process(exchange2 -> {
            WebSocketChannel webSocketChannel = (WebSocketChannel) exchange2.getIn().getHeader("websocket.channel", WebSocketChannel.class);
            AuthContext authContext = (AuthContext) webSocketChannel.getAttribute("AUTH_CONTEXT");
            String str = (String) webSocketChannel.getAttribute("Realm");
            exchange2.getIn().setHeader("AUTH_CONTEXT", authContext);
            exchange2.getIn().setHeader("Realm", str);
            Object body = exchange2.getIn().getBody();
            if (body instanceof String) {
                String str2 = (String) body;
                if (str2.startsWith("SUBSCRIBE:")) {
                    exchange2.getIn().setBody(exchange2.getIn().getBody(EventSubscription.class));
                } else if (str2.startsWith("UNSUBSCRIBE:")) {
                    exchange2.getIn().setBody(exchange2.getIn().getBody(CancelEventSubscription.class));
                } else if (str2.startsWith("EVENT:")) {
                    exchange2.getIn().setBody(exchange2.getIn().getBody(SharedEvent.class));
                }
            }
            Object body2 = exchange2.getIn().getBody();
            if (body2 instanceof RespondableEvent) {
                ((RespondableEvent) body2).setResponseConsumer(event -> {
                    sendToWebsocketSession(getSessionKey(exchange2), event);
                });
            }
            if (this.gatewayInterceptor != null) {
                this.gatewayInterceptor.accept(exchange2);
            }
        }).process(exchange3 -> {
            AuthContext authContext = (AuthContext) exchange3.getIn().getHeader("AUTH_CONTEXT", AuthContext.class);
            String str = (String) exchange3.getIn().getHeader("Realm", String.class);
            Object body = exchange3.getIn().getBody();
            if (body instanceof EventSubscription) {
                EventSubscription<?> eventSubscription = (EventSubscription) body;
                String sessionKey = getSessionKey(exchange3);
                LOG.log(System.Logger.Level.TRACE, () -> {
                    return "Adding subscription for session '" + sessionKey + "': " + String.valueOf(eventSubscription);
                });
                if (!authorizeEventSubscription(str, authContext, eventSubscription)) {
                    sendToWebsocketSession(sessionKey, new UnauthorizedEventSubscription(eventSubscription));
                    exchange3.setRouteStop(true);
                    return;
                }
                AssetFilter filter = eventSubscription.getFilter();
                if (filter instanceof AssetFilter) {
                    eventSubscription.setFilter(filter.setValueChanged(true));
                }
                eventSubscription.setSubscribed(true);
                sendToWebsocketSession(sessionKey, eventSubscription);
                synchronized (this.websocketSessionSubscriptionConsumers) {
                    Consumer<? extends Event> consumer = sharedEvent -> {
                        onWebsocketSubscriptionTriggered(sessionKey, eventSubscription, sharedEvent);
                    };
                    this.websocketSessionSubscriptionConsumers.computeIfAbsent(sessionKey, str2 -> {
                        return new HashMap();
                    }).put(eventSubscription.getEventType() + eventSubscription.getSubscriptionId(), consumer);
                    addSubscription((EventSubscription<? extends Event>) eventSubscription, consumer);
                }
                exchange3.setRouteStop(true);
                return;
            }
            Object body2 = exchange3.getIn().getBody();
            if (body2 instanceof CancelEventSubscription) {
                CancelEventSubscription cancelEventSubscription = (CancelEventSubscription) body2;
                String sessionKey2 = getSessionKey(exchange3);
                LOG.log(System.Logger.Level.TRACE, () -> {
                    return "Cancelling subscription for session '" + sessionKey2 + "': " + String.valueOf(cancelEventSubscription);
                });
                synchronized (this.websocketSessionSubscriptionConsumers) {
                    this.websocketSessionSubscriptionConsumers.computeIfPresent(sessionKey2, (str3, map) -> {
                        Consumer<? extends Event> consumer2 = (Consumer) map.remove(cancelEventSubscription.getEventType() + cancelEventSubscription.getSubscriptionId());
                        if (consumer2 != null) {
                            removeSubscription(consumer2);
                        }
                        if (map.isEmpty()) {
                            return null;
                        }
                        return map;
                    });
                }
                exchange3.setRouteStop(true);
                return;
            }
            Object body3 = exchange3.getIn().getBody();
            if (body3 instanceof SharedEvent) {
                AttributeEvent attributeEvent = (SharedEvent) body3;
                if (!authorizeEventWrite(str, authContext, attributeEvent)) {
                    exchange3.setRouteStop(true);
                    return;
                }
                if (attributeEvent instanceof AttributeEvent) {
                    AttributeEvent attributeEvent2 = attributeEvent;
                    if (attributeEvent2.getTimestamp() <= 0) {
                        attributeEvent2.setTimestamp(this.timerService.getCurrentTimeMillis());
                    }
                    attributeEvent2.setSource("WebsocketClient");
                    this.messageBrokerService.getFluentProducerTemplate().withBody(attributeEvent2).to(AssetProcessingService.ATTRIBUTE_EVENT_PROCESSOR).asyncSend();
                    exchange3.setRouteStop(true);
                }
            }
        }).to(PUBLISH_QUEUE).end();
        from(PUBLISH_QUEUE).routeId("ClientPublishToSubscribers").routeConfigurationId(AssetProcessingService.ATTRIBUTE_EVENT_ROUTE_CONFIG_ID).threads().executorService(this.executorService).filter(body().isInstanceOf(Event.class)).process(exchange4 -> {
            sendToSubscribers((Event) exchange4.getIn().getBody(Event.class));
        });
    }

    protected <T extends Event> void sendToSubscribers(T t) {
        this.eventSubscriptions.forEach(pair -> {
            EventSubscription eventSubscription = (EventSubscription) pair.getKey();
            if (eventSubscription.getEventType().equals(t.getEventType())) {
                Event apply = eventSubscription.getFilter() == null ? t : eventSubscription.getFilter().apply(t);
                if (apply == null) {
                    return;
                }
                Consumer consumer = (Consumer) pair.getValue();
                try {
                    consumer.accept(apply);
                } catch (Exception e) {
                    LOG.log(System.Logger.Level.WARNING, "Event subscriber has thrown an exception: " + String.valueOf(consumer), e);
                }
            }
        });
    }

    public void addSubscription(EventSubscription<? extends Event> eventSubscription, Consumer<? extends Event> consumer) throws IllegalStateException {
        this.eventSubscriptions.add(new Pair<>(eventSubscription, consumer));
    }

    public <T extends Event> void addSubscription(Class<T> cls, Consumer<T> consumer) throws IllegalStateException {
        addSubscription(new EventSubscription<>(cls, (EventFilter) null), (Consumer<? extends Event>) consumer);
    }

    public <T extends Event> void addSubscription(Class<T> cls, EventFilter<T> eventFilter, Consumer<T> consumer) throws IllegalStateException {
        addSubscription(new EventSubscription<>(cls, eventFilter), (Consumer<? extends Event>) consumer);
    }

    public void removeSubscription(Consumer<? extends Event> consumer) {
        this.eventSubscriptions.removeIf(pair -> {
            return pair.value == consumer;
        });
    }

    public void start(Container container) {
        this.started = true;
    }

    public void stop(Container container) {
        this.started = false;
    }

    public void addSubscriptionAuthorizer(EventSubscriptionAuthorizer eventSubscriptionAuthorizer) {
        this.eventSubscriptionAuthorizers.add(eventSubscriptionAuthorizer);
    }

    public void addEventAuthorizer(EventAuthorizer eventAuthorizer) {
        this.eventAuthorizers.add(eventAuthorizer);
    }

    public boolean authorizeEventSubscription(String str, AuthContext authContext, EventSubscription<?> eventSubscription) {
        boolean anyMatch = this.eventSubscriptionAuthorizers.stream().anyMatch(eventSubscriptionAuthorizer -> {
            return eventSubscriptionAuthorizer.authorise(str, authContext, eventSubscription);
        });
        if (!anyMatch) {
            if (authContext != null) {
                LOG.log(System.Logger.Level.DEBUG, "Client not authorised to subscribe: subscription=" + String.valueOf(eventSubscription) + ", requestRealm=" + str + ", username=" + authContext.getUsername() + ", userRealm=" + authContext.getAuthenticatedRealmName());
            } else {
                LOG.log(System.Logger.Level.DEBUG, "Client not authorised to subscribe: subscription=" + String.valueOf(eventSubscription) + ", requestRealm=" + str + ", user=null");
            }
        }
        return anyMatch;
    }

    public <T extends SharedEvent> boolean authorizeEventWrite(String str, AuthContext authContext, T t) {
        boolean anyMatch = this.eventAuthorizers.stream().anyMatch(eventAuthorizer -> {
            return eventAuthorizer.authorise(str, authContext, t);
        });
        if (!anyMatch) {
            if (authContext != null) {
                LOG.log(System.Logger.Level.DEBUG, () -> {
                    return "Client not authorised to send event: type=" + t.getEventType() + ", requestRealm=" + str + ", user=" + authContext.getUsername() + ", userRealm=" + authContext.getAuthenticatedRealmName();
                });
            } else {
                LOG.log(System.Logger.Level.DEBUG, () -> {
                    return "Client not authorised to send event: type=" + t.getEventType() + ", requestRealm=" + str + ", user=null";
                });
            }
        }
        return anyMatch;
    }

    public <T extends Event> void publishEvent(T t) {
        if (this.started) {
            if (!(t instanceof SyslogEvent)) {
                LOG.log(System.Logger.Level.TRACE, () -> {
                    return "Publishing to subscribers: " + String.valueOf(t);
                });
            }
            this.messageBrokerService.getFluentProducerTemplate().withBody(t).to(PUBLISH_QUEUE).asyncSend();
        }
    }

    public void setGatewayInterceptor(Consumer<Exchange> consumer) {
        this.gatewayInterceptor = consumer;
    }

    protected void onWebsocketSubscriptionTriggered(String str, EventSubscription<?> eventSubscription, SharedEvent sharedEvent) {
        this.messageBrokerService.getFluentProducerTemplate().withBody(new TriggeredEventSubscription(Collections.singletonList(sharedEvent), eventSubscription.getSubscriptionId())).withHeader("websocket.connectionKey", str).to(WEBSOCKET_URI).asyncSend();
    }

    public void sendToWebsocketSession(String str, Object obj) {
        this.messageBrokerService.getFluentProducerTemplate().withBody(obj).withHeader("websocket.connectionKey", str).to(WEBSOCKET_URI).asyncSend();
    }

    public void closeWebsocketSession(String str) {
        WebSocketChannel webSocketChannel = this.sessionChannels.get(str);
        if (webSocketChannel != null) {
            LOG.log(System.Logger.Level.INFO, () -> {
                return "Force closing websocket session: " + str;
            });
            try {
                webSocketChannel.close();
            } catch (IOException e) {
                LOG.log(System.Logger.Level.DEBUG, () -> {
                    return "Failed to close websocket session: " + str;
                });
                throw new RuntimeException(e);
            }
        }
    }

    public String toString() {
        return getClass().getSimpleName() + "{}";
    }
}
