package top.turboweb.http.response;

import com.fasterxml.jackson.core.JsonProcessingException;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.HttpVersion;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Function;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import top.turboweb.commons.utils.base.BeanUtils;
import top.turboweb.http.connect.ConnectSession;

/* loaded from: input_file:top/turboweb/http/response/SseResponse.class */
public class SseResponse extends DefaultHttpResponse {
    private final ConnectSession connectSession;
    private Consumer<ConnectSession> sseCallback;
    static final /* synthetic */ boolean $assertionsDisabled;

    public SseResponse(HttpVersion httpVersion, HttpResponseStatus httpResponseStatus, HttpHeaders httpHeaders, ConnectSession connectSession) {
        super(httpVersion, httpResponseStatus, httpHeaders);
        if (!$assertionsDisabled && connectSession == null) {
            throw new AssertionError();
        }
        this.connectSession = connectSession;
        setSseHeaders();
    }

    private void setSseHeaders() {
        headers().set(HttpHeaderNames.CONTENT_TYPE, "text/event-stream");
        headers().set(HttpHeaderNames.CACHE_CONTROL, "no-cache");
        headers().set(HttpHeaderNames.CONNECTION, "keep-alive");
        headers().set(HttpHeaderNames.TRANSFER_ENCODING, "chunked");
        HttpUtil.setTransferEncodingChunked(this, true);
    }

    public void setSseCallback(Consumer<ConnectSession> consumer) {
        this.sseCallback = consumer;
    }

    public void startSse() {
        if (this.sseCallback != null) {
            this.sseCallback.accept(this.connectSession);
        }
    }

    public <T> void setSseCallback(Flux<T> flux, Function<Throwable, String> function, Consumer<ConnectSession> consumer) {
        setSseCallback(connectSession -> {
            Flux doFinally = flux.flatMap(obj -> {
                try {
                    return obj instanceof String ? Mono.just((String) obj) : Mono.just(BeanUtils.getObjectMapper().writeValueAsString(obj));
                } catch (JsonProcessingException e) {
                    return Mono.error(e);
                }
            }).doFinally(signalType -> {
                if (consumer != null) {
                    consumer.accept(connectSession);
                }
            });
            Objects.requireNonNull(connectSession);
            doFinally.subscribe(connectSession::send, th -> {
                if (function != null) {
                    connectSession.send((String) function.apply(th));
                } else {
                    connectSession.send("error:" + th.getMessage());
                }
            });
        });
    }

    public <T> void setSseCallback(Flux<T> flux) {
        setSseCallback(flux, null, (v0) -> {
            v0.close();
        });
    }

    static {
        $assertionsDisabled = !SseResponse.class.desiredAssertionStatus();
    }
}
