package top.turboweb.http.scheduler.impl;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ForkJoinPool;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import top.turboweb.commons.exception.TurboReactiveException;
import top.turboweb.commons.exception.TurboSerializableException;
import top.turboweb.commons.utils.base.BeanUtils;
import top.turboweb.http.connect.ConnectSession;
import top.turboweb.http.context.FullHttpContext;
import top.turboweb.http.handler.ExceptionHandlerMatcher;
import top.turboweb.http.handler.ExceptionHandlerSchedulerHelper;
import top.turboweb.http.middleware.Middleware;
import top.turboweb.http.request.HttpInfoRequest;
import top.turboweb.http.request.HttpInfoRequestPackageHelper;
import top.turboweb.http.response.HttpInfoResponse;
import top.turboweb.http.session.SessionManagerProxy;

/* loaded from: input_file:top/turboweb/http/scheduler/impl/ReactiveHttpScheduler.class */
public class ReactiveHttpScheduler extends AbstractHttpScheduler {
    private final ForkJoinPool SERVICE_POOL;
    private final ObjectMapper objectMapper;
    private final Charset charset;

    public ReactiveHttpScheduler(SessionManagerProxy sessionManagerProxy, Middleware middleware, ExceptionHandlerMatcher exceptionHandlerMatcher, int i) {
        super(sessionManagerProxy, middleware, exceptionHandlerMatcher, ReactiveHttpScheduler.class);
        this.objectMapper = BeanUtils.getObjectMapper();
        this.charset = StandardCharsets.UTF_8;
        this.SERVICE_POOL = new ForkJoinPool(i);
    }

    @Override // top.turboweb.http.scheduler.HttpScheduler
    public void execute(FullHttpRequest fullHttpRequest, ConnectSession connectSession) {
        long nanoTime = System.nanoTime();
        HttpInfoResponse httpInfoResponse = new HttpInfoResponse(fullHttpRequest.protocolVersion(), HttpResponseStatus.OK);
        doExecute(fullHttpRequest, httpInfoResponse, connectSession).subscribeOn(Schedulers.fromExecutor(this.SERVICE_POOL)).doFinally(signalType -> {
            fullHttpRequest.release();
        }).subscribe(httpResponse -> {
            writeResponse(connectSession, fullHttpRequest, httpResponse, nanoTime);
        }, th -> {
            ExceptionHandlerSchedulerHelper.doHandleForReactiveScheduler(this.exceptionHandlerMatcher, httpInfoResponse, th).subscribeOn(Schedulers.fromExecutor(this.SERVICE_POOL)).subscribe(httpInfoResponse2 -> {
                writeResponse(connectSession, fullHttpRequest, httpInfoResponse2, nanoTime);
            });
        });
    }

    private Mono<HttpResponse> doExecute(FullHttpRequest fullHttpRequest, HttpInfoResponse httpInfoResponse, ConnectSession connectSession) {
        return Mono.just(fullHttpRequest).flatMap(fullHttpRequest2 -> {
            try {
                HttpInfoRequest packageRequest = HttpInfoRequestPackageHelper.packageRequest(fullHttpRequest2);
                Object invoke = this.sentinelMiddleware.invoke(new FullHttpContext(packageRequest, httpInfoResponse, connectSession));
                return invoke instanceof Mono ? ((Mono) invoke).map(obj -> {
                    return handleResponse(httpInfoResponse, obj);
                }).doFinally(signalType -> {
                    releaseFileUploads(packageRequest);
                }) : Mono.just(new TurboReactiveException("TurboWeb仅支持Mono类型的反应式对象")).doFinally(signalType2 -> {
                    releaseFileUploads(packageRequest);
                }).flatMap((v0) -> {
                    return Mono.error(v0);
                });
            } catch (Exception e) {
                try {
                    releaseFileUploads(null);
                } catch (Exception e2) {
                }
                return Mono.error(e);
            }
        });
    }

    private HttpResponse handleResponse(HttpInfoResponse httpInfoResponse, Object obj) {
        if (obj instanceof HttpResponse) {
            HttpResponse httpResponse = (HttpResponse) obj;
            if (httpInfoResponse != httpResponse) {
                httpInfoResponse.release();
            }
            return httpResponse;
        }
        if (obj instanceof String) {
            httpInfoResponse.setContent((String) obj);
            httpInfoResponse.setContentType("text/plain;charset=" + this.charset.name());
            return httpInfoResponse;
        }
        try {
            httpInfoResponse.setContent(this.objectMapper.writeValueAsString(obj));
            httpInfoResponse.setContentType("application/json;charset=" + this.charset.name());
            return httpInfoResponse;
        } catch (JsonProcessingException e) {
            throw new TurboSerializableException("序列化失败");
        }
    }
}
