package modelengine.fit.server.http.websocket;

import java.lang.reflect.Method;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Stream;
import modelengine.fit.http.annotation.PathVariable;
import modelengine.fit.http.server.DoHttpHandlerException;
import modelengine.fit.http.websocket.Session;
import modelengine.fit.http.websocket.annotation.BinaryMessage;
import modelengine.fit.http.websocket.annotation.OnClose;
import modelengine.fit.http.websocket.annotation.OnMessage;
import modelengine.fit.http.websocket.annotation.OnOpen;
import modelengine.fit.http.websocket.annotation.WebSocketEndpoint;
import modelengine.fit.serialization.http.Constants;
import modelengine.fit.serialization.http.websocket.FailMessageContentUtils;
import modelengine.fit.serialization.http.websocket.RequestMessageContentUtils;
import modelengine.fit.serialization.http.websocket.ResponseMessageContentUtils;
import modelengine.fit.serialization.http.websocket.StreamMessageType;
import modelengine.fit.serialization.http.websocket.WebSocketUtils;
import modelengine.fit.serialization.http.websocket.WebSocketWorkerObserver;
import modelengine.fit.serialization.util.MessageSerializerUtils;
import modelengine.fit.serialization.util.PublisherCategory;
import modelengine.fitframework.annotation.Component;
import modelengine.fitframework.broker.ExceptionInfo;
import modelengine.fitframework.broker.FitExceptionCreator;
import modelengine.fitframework.broker.Genericable;
import modelengine.fitframework.broker.LocalGenericableRepository;
import modelengine.fitframework.broker.server.Dispatcher;
import modelengine.fitframework.broker.server.Response;
import modelengine.fitframework.flowable.Emitter;
import modelengine.fitframework.flowable.Publisher;
import modelengine.fitframework.flowable.choir.FlexibleEmitterChoir;
import modelengine.fitframework.flowable.solo.FlexibleEmitterSolo;
import modelengine.fitframework.flowable.util.worker.Worker;
import modelengine.fitframework.inspection.Validation;
import modelengine.fitframework.ioc.BeanContainer;
import modelengine.fitframework.log.Logger;
import modelengine.fitframework.serialization.RequestMetadata;
import modelengine.fitframework.serialization.TagLengthValues;
import modelengine.fitframework.serialization.Version;
import modelengine.fitframework.serialization.tlv.TlvUtils;
import modelengine.fitframework.util.ObjectUtils;
import modelengine.fitframework.util.StringUtils;

@WebSocketEndpoint(path = Constants.FIT_PATH_PATTERN)
@Component
/* loaded from: input_file:modelengine/fit/server/http/websocket/FitWebSocketController.class */
public class FitWebSocketController {
    private static final Logger log = Logger.get(FitWebSocketController.class);
    private static final int RETURN_INDEX = -1;
    private final BeanContainer container;
    private final Dispatcher dispatcher;
    private final LocalGenericableRepository repository;
    private final FitExceptionCreator exceptionCreator;
    private final Map<String, ConfigurableWebSocketServerContext> contexts = new ConcurrentHashMap();
    private final Map<Integer, BiConsumer<Session, TagLengthValues>> handlers = new HashMap();

    FitWebSocketController(BeanContainer beanContainer, Dispatcher dispatcher, LocalGenericableRepository localGenericableRepository, FitExceptionCreator fitExceptionCreator) {
        this.container = (BeanContainer) Validation.notNull(beanContainer, "The container cannot be null.", new Object[0]);
        this.dispatcher = (Dispatcher) Validation.notNull(dispatcher, "The dispatcher cannot be null.", new Object[0]);
        this.repository = (LocalGenericableRepository) Validation.notNull(localGenericableRepository, "The repository cannot be null.", new Object[0]);
        this.exceptionCreator = (FitExceptionCreator) Validation.notNull(fitExceptionCreator, "The exception creator cannot be null.", new Object[0]);
        this.handlers.put(Integer.valueOf(StreamMessageType.REQUEST.code()), this::doRequestMessageHandler);
        this.handlers.put(Integer.valueOf(StreamMessageType.CONSUME.code()), this::doConsumeMessageHandler);
        this.handlers.put(Integer.valueOf(StreamMessageType.COMPLETE.code()), this::doCompleteMessageHandler);
        this.handlers.put(Integer.valueOf(StreamMessageType.FAIL.code()), this::doFailMessageHandler);
        this.handlers.put(Integer.valueOf(StreamMessageType.REQUEST_ELEMENT.code()), this::doRequestElementHandler);
        this.handlers.put(Integer.valueOf(StreamMessageType.CANCEL.code()), this::doCancelMessageHandler);
        this.handlers.put(Integer.valueOf(StreamMessageType.UNKNOWN.code()), this::doUnknownMessageHandler);
    }

    @OnOpen
    public void onOpen(Session session, @PathVariable("genericableId") String str, @PathVariable("fitableId") String str2) {
        this.contexts.put(session.getId(), ConfigurableWebSocketServerContext.create(str, str2));
    }

    @OnMessage
    public void onMessage(Session session, @BinaryMessage byte[] bArr) {
        TagLengthValues deserialize = TagLengthValues.deserialize(bArr);
        this.handlers.get(Integer.valueOf(WebSocketUtils.getType(deserialize))).accept(session, deserialize);
    }

    @OnClose
    public void onClose(Session session) {
        log.warn(StringUtils.format("WebSocket connection closed by client. [code={0}, reason={1}]", new Object[]{Integer.valueOf(session.getCloseCode()), session.getCloseReason()}), new Object[0]);
        this.contexts.remove(session.getId());
    }

    private void doConsumeMessageHandler(Session session, TagLengthValues tagLengthValues) {
        int index = WebSocketUtils.getIndex(tagLengthValues);
        byte[] content = WebSocketUtils.getContent(tagLengthValues);
        ConfigurableWebSocketServerContext configurableWebSocketServerContext = this.contexts.get(session.getId());
        configurableWebSocketServerContext.emitters().get(Integer.valueOf(index)).emit(configurableWebSocketServerContext.messageSerializer().deserializeResponse(configurableWebSocketServerContext.publisherArgumentElementTypes().get(Integer.valueOf(index)), content));
    }

    private void doCompleteMessageHandler(Session session, TagLengthValues tagLengthValues) {
        int index = WebSocketUtils.getIndex(tagLengthValues);
        this.contexts.get(session.getId()).emitters().get(Integer.valueOf(index)).complete();
        tryCloseConnection(session, index);
    }

    private void doFailMessageHandler(Session session, TagLengthValues tagLengthValues) {
        int index = WebSocketUtils.getIndex(tagLengthValues);
        TagLengthValues deserialize = TagLengthValues.deserialize(WebSocketUtils.getContent(tagLengthValues));
        this.contexts.get(session.getId()).emitters().get(Integer.valueOf(index)).fail(this.exceptionCreator.buildException(ExceptionInfo.create(this.contexts.get(session.getId()).genericableId(), this.contexts.get(session.getId()).fitableId(), FailMessageContentUtils.getCode(deserialize), FailMessageContentUtils.getMessage(deserialize), TlvUtils.getExceptionProperties(deserialize))));
        tryCloseConnection(session, index);
    }

    private void doRequestElementHandler(Session session, TagLengthValues tagLengthValues) {
        this.contexts.get(session.getId()).worker().request(Long.parseLong(new String(WebSocketUtils.getContent(tagLengthValues), StandardCharsets.UTF_8)));
    }

    private void doCancelMessageHandler(Session session, TagLengthValues tagLengthValues) {
        this.contexts.get(session.getId()).worker().cancel();
        tryCloseConnection(session, RETURN_INDEX);
    }

    private void doUnknownMessageHandler(Session session, TagLengthValues tagLengthValues) {
        log.warn(StringUtils.format("Cannot handle message with unknown type.", new Object[0]), new Object[0]);
    }

    private void doRequestMessageHandler(Session session, TagLengthValues tagLengthValues) {
        ConfigurableWebSocketServerContext configurableWebSocketServerContext = this.contexts.get(session.getId());
        TagLengthValues deserialize = TagLengthValues.deserialize(WebSocketUtils.getContent(tagLengthValues));
        byte[] entity = RequestMessageContentUtils.getEntity(deserialize);
        configurableWebSocketServerContext.format(RequestMessageContentUtils.getDataFormat(deserialize));
        configurableWebSocketServerContext.genericableVersion(RequestMessageContentUtils.getGenericableVersion(deserialize));
        configurableWebSocketServerContext.extensions(RequestMessageContentUtils.getExtensions(deserialize));
        configurableWebSocketServerContext.messageSerializer(MessageSerializerUtils.getMessageSerializer(this.container, configurableWebSocketServerContext.format()).orElseThrow(() -> {
            return new IllegalStateException(StringUtils.format("MessageSerializer required but not found. [format={0}]", new Object[]{Integer.valueOf(configurableWebSocketServerContext.format())}));
        }));
        Response executeMethod = executeMethod(session, getDeserializedParameters(session, entity));
        sendResponseMessage(session, executeMethod, getConvertedReturnValue(session, executeMethod, configurableWebSocketServerContext));
        if (executeMethod.metadata().code() != 0) {
            session.close();
        }
    }

    private Object[] getDeserializedParameters(Session session, byte[] bArr) {
        Type[] typeArr = (Type[]) Stream.of((Object[]) getMethod(session).getParameters()).map((v0) -> {
            return v0.getParameterizedType();
        }).toArray(i -> {
            return new Type[i];
        });
        Object[] deserializeRequest = this.contexts.get(session.getId()).messageSerializer().deserializeRequest(typeArr, bArr);
        initPublisherTypeArguments(session, deserializeRequest, typeArr);
        return deserializeRequest;
    }

    private void initPublisherTypeArguments(Session session, Object[] objArr, Type[] typeArr) {
        ConfigurableWebSocketServerContext configurableWebSocketServerContext = this.contexts.get(session.getId());
        for (int i = 0; i < typeArr.length; i++) {
            PublisherCategory fromType = PublisherCategory.fromType(typeArr[i]);
            if (fromType != PublisherCategory.NON_PUBLISHER) {
                Emitter<?> create = Emitter.create();
                configurableWebSocketServerContext.publisherArgumentElementTypes().put(Integer.valueOf(i), getPublisherDataType(typeArr[i]));
                configurableWebSocketServerContext.emitters().put(Integer.valueOf(i), create);
                configurableWebSocketServerContext.publisherFinishedTags().put(Integer.valueOf(i), false);
                int i2 = i;
                Consumer consumer = l -> {
                    sendRequestElementMessage(session, i2, l.longValue());
                };
                Runnable runnable = () -> {
                    sendCancelMessage(session, i2);
                    tryCloseConnection(session, i2);
                };
                if (fromType == PublisherCategory.CHOIR) {
                    objArr[i] = new FlexibleEmitterChoir(() -> {
                        return create;
                    }, (Consumer) null, (Consumer) null, consumer, runnable);
                } else {
                    objArr[i] = new FlexibleEmitterSolo(() -> {
                        return create;
                    }, (Consumer) null, (Consumer) null, consumer, runnable);
                }
            }
        }
    }

    private Response executeMethod(Session session, Object[] objArr) {
        ConfigurableWebSocketServerContext configurableWebSocketServerContext = this.contexts.get(session.getId());
        return this.dispatcher.dispatch(RequestMetadata.custom().dataFormat(configurableWebSocketServerContext.format()).genericableId(configurableWebSocketServerContext.genericableId()).genericableVersion(configurableWebSocketServerContext.genericableVersion()).fitableId(configurableWebSocketServerContext.fitableId()).fitableVersion(Version.builder("1.0.0").build()).tagValues(configurableWebSocketServerContext.extensions()).build(), objArr);
    }

    private Object getConvertedReturnValue(Session session, Response response, ConfigurableWebSocketServerContext configurableWebSocketServerContext) {
        Type genericReturnType = getMethod(session).getGenericReturnType();
        if (response.metadata().code() != 0 || PublisherCategory.fromType(genericReturnType) == PublisherCategory.NON_PUBLISHER) {
            return response.data();
        }
        configurableWebSocketServerContext.publisherFinishedTags().put(Integer.valueOf(RETURN_INDEX), false);
        configurableWebSocketServerContext.worker(Worker.create(new WebSocketWorkerObserver(session, configurableWebSocketServerContext.messageSerializer(), getPublisherDataType(genericReturnType), RETURN_INDEX, (v1, v2) -> {
            tryCloseConnection(v1, v2);
        }), (Publisher) ObjectUtils.cast(Validation.isInstanceOf(response.data(), Publisher.class, StringUtils.format("The return value type is not Publisher. [type={0}]", new Object[]{response.data().getClass()}), new Object[0])), -1L));
        configurableWebSocketServerContext.worker().run();
        return null;
    }

    private void sendRequestElementMessage(Session session, int i, long j) {
        TagLengthValues create = TagLengthValues.create();
        WebSocketUtils.setIndex(create, i);
        WebSocketUtils.setType(create, StreamMessageType.REQUEST_ELEMENT.code());
        WebSocketUtils.setContent(create, Long.toString(j).getBytes(StandardCharsets.UTF_8));
        session.send(create.serialize());
    }

    private void sendCancelMessage(Session session, int i) {
        TagLengthValues create = TagLengthValues.create();
        WebSocketUtils.setIndex(create, i);
        WebSocketUtils.setType(create, StreamMessageType.CANCEL.code());
        session.send(create.serialize());
    }

    private void sendResponseMessage(Session session, Response response, Object obj) {
        Type genericReturnType = getMethod(session).getGenericReturnType();
        TagLengthValues create = TagLengthValues.create();
        ResponseMessageContentUtils.setDataFormat(create, response.metadata().dataFormat());
        ResponseMessageContentUtils.setCode(create, response.metadata().code());
        ResponseMessageContentUtils.setMessage(create, response.metadata().message());
        ResponseMessageContentUtils.setExtensions(create, response.metadata().tagValues());
        ResponseMessageContentUtils.setEntity(create, this.contexts.get(session.getId()).messageSerializer().serializeResponse(genericReturnType, obj));
        TagLengthValues create2 = TagLengthValues.create();
        WebSocketUtils.setType(create2, StreamMessageType.RESPONSE.code());
        WebSocketUtils.setContent(create2, create.serialize());
        session.send(create2.serialize());
    }

    private Method getMethod(Session session) {
        ConfigurableWebSocketServerContext configurableWebSocketServerContext = this.contexts.get(session.getId());
        Genericable genericable = (Genericable) this.repository.get(configurableWebSocketServerContext.genericableId(), configurableWebSocketServerContext.genericableVersion().toString()).orElseThrow(() -> {
            return new DoHttpHandlerException(StringUtils.format("No genericable. [genericableId={0}, genericableVersion={1}]", new Object[]{configurableWebSocketServerContext.genericableId(), configurableWebSocketServerContext.genericableVersion()}));
        });
        Method method = genericable.method().method();
        Validation.notNull(method, "The genericable method cannot be null. [genericableId={0}]", new Object[]{genericable.id()});
        return method;
    }

    private void tryCloseConnection(Session session, int i) {
        this.contexts.get(session.getId()).publisherFinishedTags().put(Integer.valueOf(i), true);
        Iterator<Map.Entry<Integer, Boolean>> it = this.contexts.get(session.getId()).publisherFinishedTags().entrySet().iterator();
        while (it.hasNext()) {
            if (!it.next().getValue().booleanValue()) {
                return;
            }
        }
        if (this.contexts.get(session.getId()).finished().compareAndSet(false, true)) {
            session.close();
            this.contexts.remove(session.getId());
        }
    }

    private static Type getPublisherDataType(Type type) {
        if (type instanceof ParameterizedType) {
            return ((ParameterizedType) type).getActualTypeArguments()[0];
        }
        throw new IllegalArgumentException("Cannot get data type which type is not parameterized type.");
    }
}
