package io.modelcontextprotocol.client.transport;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.modelcontextprotocol.spec.McpClientTransport;
import io.modelcontextprotocol.spec.McpError;
import io.modelcontextprotocol.spec.McpSchema;
import io.modelcontextprotocol.util.Assert;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.noear.solon.net.http.HttpUtilsBuilder;
import org.noear.solon.rx.SimpleSubscriber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

/* loaded from: input_file:io/modelcontextprotocol/client/transport/WebRxSseClientTransport.class */
public class WebRxSseClientTransport implements McpClientTransport {
    private static final Logger logger = LoggerFactory.getLogger(WebRxSseClientTransport.class);
    private static final String MESSAGE_EVENT_TYPE = "message";
    private static final String ENDPOINT_EVENT_TYPE = "endpoint";
    private static final String DEFAULT_SSE_ENDPOINT = "/sse";
    private final HttpUtilsBuilder webBuilder;
    private final String sseEndpoint;
    protected ObjectMapper objectMapper;
    private volatile boolean isClosing;
    private final CountDownLatch closeLatch;
    private final AtomicReference<String> messageEndpoint;
    private final AtomicReference<CompletableFuture<Void>> connectionFuture;

    /* loaded from: input_file:io/modelcontextprotocol/client/transport/WebRxSseClientTransport$Builder.class */
    public static class Builder {
        private final HttpUtilsBuilder webBuilder;
        private String sseEndpoint = "/sse";
        private ObjectMapper objectMapper = new ObjectMapper();

        public Builder(HttpUtilsBuilder httpUtilsBuilder) {
            Assert.notNull(httpUtilsBuilder, "webBuilder must not be empty");
            this.webBuilder = httpUtilsBuilder;
        }

        public Builder sseEndpoint(String str) {
            Assert.hasText(str, "sseEndpoint must not be null");
            this.sseEndpoint = str;
            return this;
        }

        public Builder objectMapper(ObjectMapper objectMapper) {
            Assert.notNull(objectMapper, "objectMapper must not be null");
            this.objectMapper = objectMapper;
            return this;
        }

        public WebRxSseClientTransport build() {
            return new WebRxSseClientTransport(this.webBuilder, this.sseEndpoint, this.objectMapper);
        }
    }

    public WebRxSseClientTransport(HttpUtilsBuilder httpUtilsBuilder) {
        this(httpUtilsBuilder, new ObjectMapper());
    }

    public WebRxSseClientTransport(HttpUtilsBuilder httpUtilsBuilder, ObjectMapper objectMapper) {
        this(httpUtilsBuilder, "/sse", objectMapper);
    }

    public WebRxSseClientTransport(HttpUtilsBuilder httpUtilsBuilder, String str, ObjectMapper objectMapper) {
        this.isClosing = false;
        this.closeLatch = new CountDownLatch(1);
        this.messageEndpoint = new AtomicReference<>();
        this.connectionFuture = new AtomicReference<>();
        Assert.notNull(objectMapper, "ObjectMapper must not be null");
        Assert.notNull(httpUtilsBuilder, "baseUri must not be empty");
        Assert.hasText(str, "sseEndpoint must not be empty");
        this.webBuilder = httpUtilsBuilder;
        this.sseEndpoint = str;
        this.objectMapper = objectMapper;
    }

    public static Builder builder(HttpUtilsBuilder httpUtilsBuilder) {
        return new Builder(httpUtilsBuilder);
    }

    @Override // io.modelcontextprotocol.spec.McpClientTransport, io.modelcontextprotocol.spec.McpTransport
    public Mono<Void> connect(Function<Mono<McpSchema.JSONRPCMessage>, Mono<McpSchema.JSONRPCMessage>> function) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        this.connectionFuture.set(completableFuture);
        this.webBuilder.build(this.sseEndpoint).execAsSseStream("GET").subscribe(new SimpleSubscriber().doOnNext(serverSentEvent -> {
            if (this.isClosing) {
                return;
            }
            try {
                if ("endpoint".equals(serverSentEvent.getEvent())) {
                    this.messageEndpoint.set(serverSentEvent.data());
                    this.closeLatch.countDown();
                    completableFuture.complete(null);
                } else if ("message".equals(serverSentEvent.getEvent())) {
                    ((Mono) function.apply(Mono.just(McpSchema.deserializeJsonRpcMessage(this.objectMapper, serverSentEvent.data())))).subscribe();
                } else {
                    logger.error("Received unrecognized SSE event type: {}", serverSentEvent.getEvent());
                }
            } catch (IOException e) {
                logger.error("Error processing SSE event", e);
                completableFuture.completeExceptionally(e);
            }
        }).doOnError(th -> {
            if (this.isClosing) {
                return;
            }
            logger.error("SSE connection error", th);
            completableFuture.completeExceptionally(th);
        }));
        return Mono.fromFuture(completableFuture);
    }

    @Override // io.modelcontextprotocol.spec.McpTransport
    public Mono<Void> sendMessage(McpSchema.JSONRPCMessage jSONRPCMessage) {
        if (this.isClosing) {
            return Mono.empty();
        }
        try {
            if (!this.closeLatch.await(10L, TimeUnit.SECONDS)) {
                return Mono.error(new McpError("Failed to wait for the message endpoint"));
            }
            String str = this.messageEndpoint.get();
            if (str == null) {
                return Mono.error(new McpError("No message endpoint available"));
            }
            try {
                return Mono.fromFuture(this.webBuilder.build(str).header("Content-Type", "application/json").bodyOfJson(this.objectMapper.writeValueAsString(jSONRPCMessage)).execAsync("POST").thenAccept(httpResponse -> {
                    if (httpResponse.code() == 200 || httpResponse.code() == 201 || httpResponse.code() == 202 || httpResponse.code() == 206) {
                        return;
                    }
                    logger.error("Error sending message: {}", Integer.valueOf(httpResponse.code()));
                }));
            } catch (IOException e) {
                return !this.isClosing ? Mono.error(new RuntimeException("Failed to serialize message", e)) : Mono.empty();
            }
        } catch (InterruptedException e2) {
            return Mono.error(new McpError("Failed to wait for the message endpoint"));
        }
    }

    @Override // io.modelcontextprotocol.spec.McpTransport
    public Mono<Void> closeGracefully() {
        return Mono.fromRunnable(() -> {
            this.isClosing = true;
            CompletableFuture<Void> completableFuture = this.connectionFuture.get();
            if (completableFuture == null || completableFuture.isDone()) {
                return;
            }
            completableFuture.cancel(true);
        });
    }

    @Override // io.modelcontextprotocol.spec.McpTransport
    public <T> T unmarshalFrom(Object obj, TypeReference<T> typeReference) {
        return (T) this.objectMapper.convertValue(obj, typeReference);
    }
}
