package io.modelcontextprotocol.server.transport;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.modelcontextprotocol.spec.McpError;
import io.modelcontextprotocol.spec.McpSchema;
import io.modelcontextprotocol.spec.ServerMcpTransport;
import io.modelcontextprotocol.util.Assert;
import java.io.IOException;
import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpStatus;
import org.springframework.web.servlet.function.RouterFunction;
import org.springframework.web.servlet.function.RouterFunctions;
import org.springframework.web.servlet.function.ServerRequest;
import org.springframework.web.servlet.function.ServerResponse;
import reactor.core.publisher.Mono;

/* loaded from: input_file:io/modelcontextprotocol/server/transport/WebMvcSseServerTransport.class */
public class WebMvcSseServerTransport implements ServerMcpTransport {
    private static final Logger logger = LoggerFactory.getLogger(WebMvcSseServerTransport.class);
    public static final String MESSAGE_EVENT_TYPE = "message";
    public static final String ENDPOINT_EVENT_TYPE = "endpoint";
    public static final String DEFAULT_SSE_ENDPOINT = "/sse";
    private final ObjectMapper objectMapper;
    private final String messageEndpoint;
    private final String sseEndpoint;
    private final RouterFunction<ServerResponse> routerFunction;
    private final ConcurrentHashMap<String, ClientSession> sessions;
    private volatile boolean isClosing;
    private Function<Mono<McpSchema.JSONRPCMessage>, Mono<McpSchema.JSONRPCMessage>> connectHandler;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/modelcontextprotocol/server/transport/WebMvcSseServerTransport$ClientSession.class */
    public static class ClientSession {
        private final String id;
        private final ServerResponse.SseBuilder sseBuilder;

        ClientSession(String str, ServerResponse.SseBuilder sseBuilder) {
            this.id = str;
            this.sseBuilder = sseBuilder;
            WebMvcSseServerTransport.logger.debug("Session {} initialized with SSE emitter", str);
        }

        void close() {
            WebMvcSseServerTransport.logger.debug("Closing session: {}", this.id);
            try {
                this.sseBuilder.complete();
                WebMvcSseServerTransport.logger.debug("Successfully completed SSE emitter for session {}", this.id);
            } catch (Exception e) {
                WebMvcSseServerTransport.logger.warn("Failed to complete SSE emitter for session {}: {}", this.id, e.getMessage());
            }
        }
    }

    public WebMvcSseServerTransport(ObjectMapper objectMapper, String str, String str2) {
        this.sessions = new ConcurrentHashMap<>();
        this.isClosing = false;
        Assert.notNull(objectMapper, "ObjectMapper must not be null");
        Assert.notNull(str, "Message endpoint must not be null");
        Assert.notNull(str2, "SSE endpoint must not be null");
        this.objectMapper = objectMapper;
        this.messageEndpoint = str;
        this.sseEndpoint = str2;
        this.routerFunction = RouterFunctions.route().GET(this.sseEndpoint, this::handleSseConnection).POST(this.messageEndpoint, this::handleMessage).build();
    }

    public WebMvcSseServerTransport(ObjectMapper objectMapper, String str) {
        this(objectMapper, str, DEFAULT_SSE_ENDPOINT);
    }

    public Mono<Void> connect(Function<Mono<McpSchema.JSONRPCMessage>, Mono<McpSchema.JSONRPCMessage>> function) {
        this.connectHandler = function;
        return Mono.empty();
    }

    public Mono<Void> sendMessage(McpSchema.JSONRPCMessage jSONRPCMessage) {
        return Mono.fromRunnable(() -> {
            if (this.sessions.isEmpty()) {
                logger.debug("No active sessions to broadcast message to");
                return;
            }
            try {
                String writeValueAsString = this.objectMapper.writeValueAsString(jSONRPCMessage);
                logger.debug("Attempting to broadcast message to {} active sessions", Integer.valueOf(this.sessions.size()));
                this.sessions.values().forEach(clientSession -> {
                    try {
                        clientSession.sseBuilder.id(clientSession.id).event(MESSAGE_EVENT_TYPE).data(writeValueAsString);
                    } catch (Exception e) {
                        logger.error("Failed to send message to session {}: {}", clientSession.id, e.getMessage());
                        clientSession.sseBuilder.error(e);
                    }
                });
            } catch (IOException e) {
                logger.error("Failed to serialize message: {}", e.getMessage());
            }
        });
    }

    private ServerResponse handleSseConnection(ServerRequest serverRequest) {
        if (this.isClosing) {
            return ServerResponse.status(HttpStatus.SERVICE_UNAVAILABLE).body("Server is shutting down");
        }
        String uuid = UUID.randomUUID().toString();
        logger.debug("Creating new SSE connection for session: {}", uuid);
        try {
            return ServerResponse.sse(sseBuilder -> {
                ClientSession clientSession = new ClientSession(uuid, sseBuilder);
                this.sessions.put(uuid, clientSession);
                try {
                    clientSession.sseBuilder.id(clientSession.id).event(ENDPOINT_EVENT_TYPE).data(this.messageEndpoint);
                } catch (Exception e) {
                    logger.error("Failed to poll event from session queue: {}", e.getMessage());
                    sseBuilder.error(e);
                }
            }, Duration.ZERO);
        } catch (Exception e) {
            logger.error("Failed to send initial endpoint event to session {}: {}", uuid, e.getMessage());
            this.sessions.remove(uuid);
            return ServerResponse.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
        }
    }

    private ServerResponse handleMessage(ServerRequest serverRequest) {
        if (this.isClosing) {
            return ServerResponse.status(HttpStatus.SERVICE_UNAVAILABLE).body("Server is shutting down");
        }
        try {
            return ServerResponse.ok().build();
        } catch (IOException | IllegalArgumentException e) {
            logger.error("Failed to deserialize message: {}", e.getMessage());
            return ServerResponse.badRequest().body(new McpError("Invalid message format"));
        } catch (Exception e2) {
            logger.error("Error handling message: {}", e2.getMessage());
            return ServerResponse.status(HttpStatus.INTERNAL_SERVER_ERROR).body(new McpError(e2.getMessage()));
        }
    }

    public <T> T unmarshalFrom(Object obj, TypeReference<T> typeReference) {
        return (T) this.objectMapper.convertValue(obj, typeReference);
    }

    public Mono<Void> closeGracefully() {
        return Mono.fromRunnable(() -> {
            this.isClosing = true;
            logger.debug("Initiating graceful shutdown with {} active sessions", Integer.valueOf(this.sessions.size()));
            this.sessions.values().forEach(clientSession -> {
                String str = clientSession.id;
                clientSession.close();
                this.sessions.remove(str);
            });
            logger.debug("Graceful shutdown completed");
        });
    }

    public RouterFunction<ServerResponse> getRouterFunction() {
        return this.routerFunction;
    }
}
