package io.debezium.platform.api;

import io.debezium.platform.domain.LogStreamingService;
import io.debezium.platform.domain.PipelineService;
import io.debezium.platform.error.NotFoundException;
import io.quarkus.websockets.next.InboundProcessingMode;
import io.quarkus.websockets.next.OnClose;
import io.quarkus.websockets.next.OnError;
import io.quarkus.websockets.next.OnOpen;
import io.quarkus.websockets.next.PathParam;
import io.quarkus.websockets.next.WebSocket;
import io.quarkus.websockets.next.WebSocketConnection;
import io.smallrye.common.annotation.RunOnVirtualThread;
import jakarta.inject.Inject;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@WebSocket(path = "/api/pipelines/{id}/logs/stream", inboundProcessingMode = InboundProcessingMode.CONCURRENT)
/* loaded from: input_file:io/debezium/platform/api/PipelineLogWebSocket.class */
public class PipelineLogWebSocket {
    private static final Logger log = LoggerFactory.getLogger(PipelineLogWebSocket.class);

    @Inject
    org.jboss.logging.Logger logger;

    @Inject
    PipelineService pipelineService;

    @Inject
    LogStreamingService logStreamer;
    private final Map<String, LogStreamingService.LogStreamingTask> streamingTasks = new ConcurrentHashMap();

    @OnOpen
    @RunOnVirtualThread
    public void onOpen(@PathParam("id") String str, WebSocketConnection webSocketConnection) {
        this.logger.infof("Connection '%s' requesting logs for pipeline '%s',", webSocketConnection.id(), str);
        long parseLong = Long.parseLong(str);
        PipelineService pipelineService = this.pipelineService;
        Long valueOf = Long.valueOf(parseLong);
        Objects.requireNonNull(webSocketConnection);
        pipelineService.streamLogs(valueOf, webSocketConnection::sendTextAndAwait).ifPresentOrElse(logStreamingTask -> {
            this.streamingTasks.put(webSocketConnection.id(), logStreamingTask);
        }, () -> {
            throw new NotFoundException(Long.valueOf(parseLong));
        });
    }

    @OnError
    public void onError(WebSocketConnection webSocketConnection, @PathParam("id") String str, NumberFormatException numberFormatException) {
        this.logger.warnf("Invalid pipeline id: %s", str);
        webSocketConnection.sendTextAndAwait("Invalid pipeline id");
        webSocketConnection.closeAndAwait();
    }

    @OnError
    public void onError(WebSocketConnection webSocketConnection, NotFoundException notFoundException) {
        this.logger.warnf("Pipeline not found: %s", Long.valueOf(notFoundException.getId()));
        webSocketConnection.sendTextAndAwait("Pipeline not found");
        webSocketConnection.closeAndAwait();
    }

    @OnClose
    public void onClose(WebSocketConnection webSocketConnection) {
        this.logger.debugf("Connection: %s closed", webSocketConnection.id());
        LogStreamingService.LogStreamingTask remove = this.streamingTasks.remove(webSocketConnection.id());
        if (remove != null) {
            remove.stop();
        }
    }
}
