package graphql.solon.ws;

import graphql.GraphqlErrorBuilder;
import graphql.solon.support.WebGraphQlHandlerGetter;
import graphql.solon.support.WebGraphQlResponse;
import graphql.solon.util.Assert;
import graphql.solon.ws.support.GraphQlWebSocketMessage;
import graphql.solon.ws.support.WebSocketGraphQlRequest;
import graphql.solon.ws.support.WebSocketSessionInfo;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.collections.CollectionUtils;
import org.noear.snack.ONode;
import org.noear.solon.annotation.Inject;
import org.noear.solon.lang.Nullable;
import org.noear.solon.net.annotation.ServerEndpoint;
import org.noear.solon.net.websocket.SubProtocolCapable;
import org.noear.solon.net.websocket.WebSocket;
import org.noear.solon.net.websocket.listener.SimpleWebSocketListener;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

@ServerEndpoint("/graphql")
/* loaded from: input_file:graphql/solon/ws/GraphqlWebsocket.class */
public class GraphqlWebsocket extends SimpleWebSocketListener implements SubProtocolCapable {
    private static final Logger log = LoggerFactory.getLogger(GraphqlWebsocket.class);
    private final Map<String, SessionState> sessionInfoMap = new ConcurrentHashMap();

    @Inject
    private WebGraphQlHandlerGetter getter;

    /* loaded from: input_file:graphql/solon/ws/GraphqlWebsocket$SendMessageSubscriber.class */
    private static class SendMessageSubscriber extends BaseSubscriber<String> {
        private final String subscriptionId;
        private final WebSocket session;
        private final SessionState sessionState;

        SendMessageSubscriber(String str, WebSocket webSocket, SessionState sessionState) {
            this.subscriptionId = str;
            this.session = webSocket;
            this.sessionState = sessionState;
        }

        protected void hookOnSubscribe(Subscription subscription) {
            subscription.request(1L);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void hookOnNext(String str) {
            this.session.send(str);
            request(1L);
        }

        public void hookOnError(Throwable th) {
            tryCloseWithError(this.session, th, GraphqlWebsocket.log);
        }

        public void hookOnComplete() {
            this.sessionState.getSubscriptions().remove(this.subscriptionId);
        }

        public void tryCloseWithError(WebSocket webSocket, Throwable th, Logger logger) {
            if (logger.isErrorEnabled()) {
                logger.error("Closing session due to exception for " + webSocket, th);
            }
            if (webSocket.isValid()) {
                try {
                    webSocket.close();
                } catch (Throwable th2) {
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:graphql/solon/ws/GraphqlWebsocket$SessionState.class */
    public static class SessionState {
        private final WebSocketSessionInfo sessionInfo;
        private final AtomicReference<Map<String, Object>> connectionInitPayloadRef = new AtomicReference<>();
        private final Map<String, Subscription> subscriptions = new ConcurrentHashMap();
        private final Scheduler scheduler;

        SessionState(String str, WebSocketSessionInfo webSocketSessionInfo) {
            this.sessionInfo = webSocketSessionInfo;
            this.scheduler = Schedulers.newSingle("GraphQL-WsSession-" + str);
        }

        public WebSocketSessionInfo getSessionInfo() {
            return this.sessionInfo;
        }

        @Nullable
        Map<String, Object> getConnectionInitPayload() {
            return this.connectionInitPayloadRef.get();
        }

        boolean setConnectionInitPayload(Map<String, Object> map) {
            return this.connectionInitPayloadRef.compareAndSet(null, map);
        }

        Map<String, Subscription> getSubscriptions() {
            return this.subscriptions;
        }

        void dispose() {
            Iterator<Map.Entry<String, Subscription>> it = this.subscriptions.entrySet().iterator();
            while (it.hasNext()) {
                try {
                    it.next().getValue().cancel();
                } catch (Throwable th) {
                }
            }
            this.subscriptions.clear();
            this.scheduler.dispose();
        }

        Scheduler getScheduler() {
            return this.scheduler;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:graphql/solon/ws/GraphqlWebsocket$SubscriptionExistsException.class */
    public class SubscriptionExistsException extends RuntimeException {
        SubscriptionExistsException() {
        }
    }

    /* loaded from: input_file:graphql/solon/ws/GraphqlWebsocket$WebMvcSessionInfo.class */
    private static class WebMvcSessionInfo implements WebSocketSessionInfo {
        private final WebSocket session;

        private WebMvcSessionInfo(WebSocket webSocket) {
            this.session = webSocket;
        }

        @Override // graphql.solon.ws.support.WebSocketSessionInfo
        public String getId() {
            return this.session.id();
        }

        @Override // graphql.solon.ws.support.WebSocketSessionInfo
        public Map<String, Object> getAttributes() {
            return this.session.attrMap();
        }

        @Override // graphql.solon.ws.support.WebSocketSessionInfo
        public URI getUri() {
            return URI.create(this.session.url());
        }

        @Override // graphql.solon.ws.support.WebSocketSessionInfo
        public InetSocketAddress getRemoteAddress() throws IOException {
            return this.session.remoteAddress();
        }
    }

    public String getSubProtocols(Collection<String> collection) {
        return "graphql-transport-ws";
    }

    public void onOpen(WebSocket webSocket) {
        this.sessionInfoMap.put(webSocket.id(), new SessionState(webSocket.id(), new WebMvcSessionInfo(webSocket)));
    }

    public void onClose(WebSocket webSocket) {
        SessionState remove = this.sessionInfoMap.remove(webSocket.id());
        if (Objects.nonNull(remove)) {
            remove.dispose();
            Map<String, Object> connectionInitPayload = remove.getConnectionInitPayload();
            if (connectionInitPayload != null) {
                this.getter.getGraphQlHandler().getWebSocketInterceptor().handleConnectionClosed(remove.getSessionInfo(), connectionInitPayload);
            }
        }
    }

    public void onError(WebSocket webSocket, Throwable th) {
        SessionState remove = this.sessionInfoMap.remove(webSocket.id());
        if (Objects.nonNull(remove)) {
            remove.dispose();
        }
    }

    public void onMessage(WebSocket webSocket, String str) throws IOException {
        GraphQlWebSocketMessage graphQlWebSocketMessage = (GraphQlWebSocketMessage) ONode.loadStr(str).toObject(GraphQlWebSocketMessage.class);
        String id = graphQlWebSocketMessage.getId();
        SessionState sessionInfo = getSessionInfo(webSocket);
        switch (graphQlWebSocketMessage.resolvedType()) {
            case CONNECTION_INIT:
                webSocket.send(ONode.stringify(GraphQlWebSocketMessage.connectionAck(Collections.emptyMap())));
                return;
            case SUBSCRIBE:
                if (Objects.isNull(id)) {
                    webSocket.close();
                }
                this.getter.getGraphQlHandler().handleRequest(new WebSocketGraphQlRequest(URI.create(webSocket.url()), Collections.emptyMap(), (Map) graphQlWebSocketMessage.getPayload(), id, Locale.getDefault(), sessionInfo.sessionInfo)).flatMapMany(webGraphQlResponse -> {
                    return handleResponse(webSocket, id, webGraphQlResponse);
                }).subscribe(new SendMessageSubscriber(id, webSocket, sessionInfo));
                return;
            case COMPLETE:
                if (Objects.nonNull(id)) {
                    Subscription remove = sessionInfo.getSubscriptions().remove(id);
                    if (Objects.nonNull(remove)) {
                        remove.cancel();
                    }
                    this.getter.getGraphQlHandler().getWebSocketInterceptor().handleCancelledSubscription(sessionInfo.getSessionInfo(), id).block(Duration.ofSeconds(10L));
                    return;
                }
                return;
            case PING:
                webSocket.send(ONode.stringify(GraphQlWebSocketMessage.pong(null)));
                return;
            default:
                webSocket.close();
                return;
        }
    }

    private Flux<String> handleResponse(WebSocket webSocket, String str, WebGraphQlResponse webGraphQlResponse) {
        if (log.isDebugEnabled()) {
            log.debug("Execution result ready" + (!CollectionUtils.isEmpty(webGraphQlResponse.getErrors()) ? " with errors: " + webGraphQlResponse.getErrors() : "") + ".");
        }
        return (webGraphQlResponse.getData() instanceof Publisher ? Flux.from((Publisher) webGraphQlResponse.getData()).map((v0) -> {
            return v0.toSpecification();
        }).doOnSubscribe(subscription -> {
            if (getSessionInfo(webSocket).getSubscriptions().putIfAbsent(str, subscription) != null) {
                throw new SubscriptionExistsException();
            }
        }) : Flux.just(webGraphQlResponse.toMap())).map(map -> {
            return ONode.stringify(GraphQlWebSocketMessage.next(str, map));
        }).concatWith(Mono.fromCallable(() -> {
            return ONode.stringify(GraphQlWebSocketMessage.complete(str));
        })).onErrorResume(th -> {
            if (th instanceof SubscriptionExistsException) {
                webSocket.close();
                return Flux.empty();
            }
            return Mono.just(ONode.stringify(GraphQlWebSocketMessage.error(str, GraphqlErrorBuilder.newError().message(th.getMessage(), new Object[0]).build())));
        });
    }

    private SessionState getSessionInfo(WebSocket webSocket) {
        SessionState sessionState = this.sessionInfoMap.get(webSocket.id());
        Assert.notNull(sessionState, "No SessionInfo for " + webSocket);
        return sessionState;
    }
}
