package io.modelcontextprotocol.server.transport;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.modelcontextprotocol.spec.McpError;
import io.modelcontextprotocol.spec.McpSchema;
import io.modelcontextprotocol.spec.McpServerSession;
import io.modelcontextprotocol.spec.McpServerTransport;
import io.modelcontextprotocol.spec.McpServerTransportProvider;
import io.modelcontextprotocol.util.Assert;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:io/modelcontextprotocol/server/transport/StdioServerTransportProvider.class */
public class StdioServerTransportProvider implements McpServerTransportProvider {
    private static final Logger logger = LoggerFactory.getLogger(StdioServerTransportProvider.class);
    private final ObjectMapper objectMapper;
    private final InputStream inputStream;
    private final OutputStream outputStream;
    private McpServerSession session;
    private final AtomicBoolean isClosing;
    private final Sinks.One<Void> inboundReady;

    /* loaded from: input_file:io/modelcontextprotocol/server/transport/StdioServerTransportProvider$StdioMcpSessionTransport.class */
    private class StdioMcpSessionTransport implements McpServerTransport {
        private final AtomicBoolean isStarted = new AtomicBoolean(false);
        private final Sinks.One<Void> outboundReady = Sinks.one();
        private final Sinks.Many<McpSchema.JSONRPCMessage> inboundSink = Sinks.many().unicast().onBackpressureBuffer();
        private final Sinks.Many<McpSchema.JSONRPCMessage> outboundSink = Sinks.many().unicast().onBackpressureBuffer();
        private Scheduler inboundScheduler = Schedulers.fromExecutorService(Executors.newSingleThreadExecutor(), "stdio-inbound");
        private Scheduler outboundScheduler = Schedulers.fromExecutorService(Executors.newSingleThreadExecutor(), "stdio-outbound");

        public StdioMcpSessionTransport() {
        }

        @Override // io.modelcontextprotocol.spec.McpTransport
        public Mono<Void> sendMessage(McpSchema.JSONRPCMessage jSONRPCMessage) {
            return Mono.zip(StdioServerTransportProvider.this.inboundReady.asMono(), this.outboundReady.asMono()).then(Mono.defer(() -> {
                return this.outboundSink.tryEmitNext(jSONRPCMessage).isSuccess() ? Mono.empty() : Mono.error(new RuntimeException("Failed to enqueue message"));
            }));
        }

        @Override // io.modelcontextprotocol.spec.McpTransport
        public <T> T unmarshalFrom(Object obj, TypeReference<T> typeReference) {
            return (T) StdioServerTransportProvider.this.objectMapper.convertValue(obj, typeReference);
        }

        @Override // io.modelcontextprotocol.spec.McpTransport
        public Mono<Void> closeGracefully() {
            return Mono.fromRunnable(() -> {
                StdioServerTransportProvider.this.isClosing.set(true);
                StdioServerTransportProvider.logger.debug("Session transport closing gracefully");
                this.inboundSink.tryEmitComplete();
            });
        }

        @Override // io.modelcontextprotocol.spec.McpTransport
        public void close() {
            StdioServerTransportProvider.this.isClosing.set(true);
            StdioServerTransportProvider.logger.debug("Session transport closed");
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void initProcessing() {
            handleIncomingMessages();
            startInboundProcessing();
            startOutboundProcessing();
        }

        private void handleIncomingMessages() {
            this.inboundSink.asFlux().flatMap(jSONRPCMessage -> {
                return StdioServerTransportProvider.this.session.handle(jSONRPCMessage);
            }).doOnTerminate(() -> {
                this.outboundSink.tryEmitComplete();
                this.inboundScheduler.dispose();
            }).subscribe();
        }

        private void startInboundProcessing() {
            if (this.isStarted.compareAndSet(false, true)) {
                this.inboundScheduler.schedule(() -> {
                    StdioServerTransportProvider.this.inboundReady.tryEmitValue((Object) null);
                    try {
                        try {
                            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(StdioServerTransportProvider.this.inputStream));
                            while (!StdioServerTransportProvider.this.isClosing.get()) {
                                try {
                                    String readLine = bufferedReader.readLine();
                                    if (readLine == null || StdioServerTransportProvider.this.isClosing.get()) {
                                        break;
                                    }
                                    StdioServerTransportProvider.logger.debug("Received JSON message: {}", readLine);
                                    try {
                                        if (!this.inboundSink.tryEmitNext(McpSchema.deserializeJsonRpcMessage(StdioServerTransportProvider.this.objectMapper, readLine)).isSuccess()) {
                                            break;
                                        }
                                    } catch (Exception e) {
                                        logIfNotClosing("Error processing inbound message", e);
                                    }
                                } catch (IOException e2) {
                                    logIfNotClosing("Error reading from stdin", e2);
                                }
                            }
                            StdioServerTransportProvider.this.isClosing.set(true);
                            if (StdioServerTransportProvider.this.session != null) {
                                StdioServerTransportProvider.this.session.close();
                            }
                            this.inboundSink.tryEmitComplete();
                        } catch (Exception e3) {
                            logIfNotClosing("Error in inbound processing", e3);
                            StdioServerTransportProvider.this.isClosing.set(true);
                            if (StdioServerTransportProvider.this.session != null) {
                                StdioServerTransportProvider.this.session.close();
                            }
                            this.inboundSink.tryEmitComplete();
                        }
                    } catch (Throwable th) {
                        StdioServerTransportProvider.this.isClosing.set(true);
                        if (StdioServerTransportProvider.this.session != null) {
                            StdioServerTransportProvider.this.session.close();
                        }
                        this.inboundSink.tryEmitComplete();
                        throw th;
                    }
                });
            }
        }

        private void startOutboundProcessing() {
            Function function = flux -> {
                return flux.doOnSubscribe(subscription -> {
                    this.outboundReady.tryEmitValue((Object) null);
                }).publishOn(this.outboundScheduler).handle((jSONRPCMessage, synchronousSink) -> {
                    if (jSONRPCMessage == null || StdioServerTransportProvider.this.isClosing.get()) {
                        if (StdioServerTransportProvider.this.isClosing.get()) {
                            synchronousSink.complete();
                            return;
                        }
                        return;
                    }
                    try {
                        String replace = StdioServerTransportProvider.this.objectMapper.writeValueAsString(jSONRPCMessage).replace("\r\n", "\\n").replace("\n", "\\n").replace("\r", "\\n");
                        synchronized (StdioServerTransportProvider.this.outputStream) {
                            StdioServerTransportProvider.this.outputStream.write(replace.getBytes(StandardCharsets.UTF_8));
                            StdioServerTransportProvider.this.outputStream.write("\n".getBytes(StandardCharsets.UTF_8));
                            StdioServerTransportProvider.this.outputStream.flush();
                        }
                        synchronousSink.next(jSONRPCMessage);
                    } catch (IOException e) {
                        if (StdioServerTransportProvider.this.isClosing.get()) {
                            StdioServerTransportProvider.logger.debug("Stream closed during shutdown", e);
                        } else {
                            StdioServerTransportProvider.logger.error("Error writing message", e);
                            synchronousSink.error(new RuntimeException(e));
                        }
                    }
                }).doOnComplete(() -> {
                    StdioServerTransportProvider.this.isClosing.set(true);
                    this.outboundScheduler.dispose();
                }).doOnError(th -> {
                    if (StdioServerTransportProvider.this.isClosing.get()) {
                        return;
                    }
                    StdioServerTransportProvider.logger.error("Error in outbound processing", th);
                    StdioServerTransportProvider.this.isClosing.set(true);
                    this.outboundScheduler.dispose();
                }).map(obj -> {
                    return (McpSchema.JSONRPCMessage) obj;
                });
            };
            ((Flux) function.apply(this.outboundSink.asFlux())).subscribe();
        }

        private void logIfNotClosing(String str, Exception exc) {
            if (StdioServerTransportProvider.this.isClosing.get()) {
                return;
            }
            StdioServerTransportProvider.logger.error(str, exc);
        }
    }

    public StdioServerTransportProvider() {
        this(new ObjectMapper());
    }

    public StdioServerTransportProvider(ObjectMapper objectMapper) {
        this(objectMapper, System.in, System.out);
    }

    public StdioServerTransportProvider(ObjectMapper objectMapper, InputStream inputStream, OutputStream outputStream) {
        this.isClosing = new AtomicBoolean(false);
        this.inboundReady = Sinks.one();
        Assert.notNull(objectMapper, "The ObjectMapper can not be null");
        Assert.notNull(inputStream, "The InputStream can not be null");
        Assert.notNull(outputStream, "The OutputStream can not be null");
        this.objectMapper = objectMapper;
        this.inputStream = inputStream;
        this.outputStream = outputStream;
    }

    @Override // io.modelcontextprotocol.spec.McpServerTransportProvider
    public void setSessionFactory(McpServerSession.Factory factory) {
        StdioMcpSessionTransport stdioMcpSessionTransport = new StdioMcpSessionTransport();
        this.session = factory.create(stdioMcpSessionTransport);
        stdioMcpSessionTransport.initProcessing();
    }

    @Override // io.modelcontextprotocol.spec.McpServerTransportProvider
    public Mono<Void> notifyClients(String str, Map<String, Object> map) {
        return this.session == null ? Mono.error(new McpError("No session to close")) : this.session.sendNotification(str, map).doOnError(th -> {
            logger.error("Failed to send notification: {}", th.getMessage());
        });
    }

    @Override // io.modelcontextprotocol.spec.McpServerTransportProvider
    public Mono<Void> closeGracefully() {
        return this.session == null ? Mono.empty() : this.session.closeGracefully();
    }
}
