package io.vertx.ext.eventbus.bridge.tcp.impl;

import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.core.json.JsonObject;
import io.vertx.core.net.NetServer;
import io.vertx.core.net.NetServerOptions;
import io.vertx.core.net.NetSocket;
import io.vertx.ext.bridge.BridgeEventType;
import io.vertx.ext.bridge.BridgeOptions;
import io.vertx.ext.bridge.PermittedOptions;
import io.vertx.ext.eventbus.bridge.tcp.BridgeEvent;
import io.vertx.ext.eventbus.bridge.tcp.TcpEventBusBridge;
import io.vertx.ext.eventbus.bridge.tcp.impl.protocol.FrameHelper;
import io.vertx.ext.eventbus.bridge.tcp.impl.protocol.FrameParser;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import java.util.regex.Pattern;

/* loaded from: input_file:io/vertx/ext/eventbus/bridge/tcp/impl/TcpEventBusBridgeImpl.class */
public class TcpEventBusBridgeImpl implements TcpEventBusBridge {
    private static final Logger log = LoggerFactory.getLogger(TcpEventBusBridgeImpl.class);
    private final EventBus eb;
    private final NetServer server;
    private final Map<String, Pattern> compiledREs = new HashMap();
    private final BridgeOptions options;
    private final Handler<BridgeEvent> bridgeEventHandler;

    public TcpEventBusBridgeImpl(Vertx vertx, BridgeOptions bridgeOptions, NetServerOptions netServerOptions, Handler<BridgeEvent> handler) {
        this.eb = vertx.eventBus();
        this.options = bridgeOptions != null ? bridgeOptions : new BridgeOptions();
        this.bridgeEventHandler = handler;
        this.server = vertx.createNetServer(netServerOptions == null ? new NetServerOptions() : netServerOptions);
        this.server.connectHandler(this::handler);
    }

    @Override // io.vertx.ext.eventbus.bridge.tcp.TcpEventBusBridge
    public Future<TcpEventBusBridge> listen() {
        return this.server.listen().map(this);
    }

    @Override // io.vertx.ext.eventbus.bridge.tcp.TcpEventBusBridge
    public Future<TcpEventBusBridge> listen(int i) {
        return this.server.listen(i).map(this);
    }

    @Override // io.vertx.ext.eventbus.bridge.tcp.TcpEventBusBridge
    public Future<TcpEventBusBridge> listen(int i, String str) {
        return this.server.listen(i, str).map(this);
    }

    @Override // io.vertx.ext.eventbus.bridge.tcp.TcpEventBusBridge
    public TcpEventBusBridge listen(Handler<AsyncResult<TcpEventBusBridge>> handler) {
        this.server.listen(asyncResult -> {
            if (asyncResult.failed()) {
                handler.handle(Future.failedFuture(asyncResult.cause()));
            } else {
                handler.handle(Future.succeededFuture(this));
            }
        });
        return this;
    }

    @Override // io.vertx.ext.eventbus.bridge.tcp.TcpEventBusBridge
    public TcpEventBusBridge listen(int i, String str, Handler<AsyncResult<TcpEventBusBridge>> handler) {
        this.server.listen(i, str, asyncResult -> {
            if (asyncResult.failed()) {
                handler.handle(Future.failedFuture(asyncResult.cause()));
            } else {
                handler.handle(Future.succeededFuture(this));
            }
        });
        return this;
    }

    @Override // io.vertx.ext.eventbus.bridge.tcp.TcpEventBusBridge
    public TcpEventBusBridge listen(int i, Handler<AsyncResult<TcpEventBusBridge>> handler) {
        this.server.listen(i, asyncResult -> {
            if (asyncResult.failed()) {
                handler.handle(Future.failedFuture(asyncResult.cause()));
            } else {
                handler.handle(Future.succeededFuture(this));
            }
        });
        return this;
    }

    private void doSendOrPub(NetSocket netSocket, String str, JsonObject jsonObject, Map<String, MessageConsumer<?>> map, Map<String, Message<?>> map2) {
        Object value = jsonObject.getValue("body");
        JsonObject jsonObject2 = jsonObject.getJsonObject("headers");
        String string = jsonObject.getString("type", "message");
        DeliveryOptions parseMsgHeaders = parseMsgHeaders(new DeliveryOptions(), jsonObject2);
        boolean z = -1;
        switch (string.hashCode()) {
            case -690213213:
                if (string.equals("register")) {
                    z = 2;
                    break;
                }
                break;
            case -235365105:
                if (string.equals("publish")) {
                    z = true;
                    break;
                }
                break;
            case 3441010:
                if (string.equals("ping")) {
                    z = 4;
                    break;
                }
                break;
            case 3526536:
                if (string.equals("send")) {
                    z = false;
                    break;
                }
                break;
            case 836015164:
                if (string.equals("unregister")) {
                    z = 3;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (!checkMatches(true, str, map2)) {
                    FrameHelper.sendErrFrame("access_denied", netSocket);
                    return;
                }
                String string2 = jsonObject.getString("replyAddress");
                if (string2 != null) {
                    this.eb.request(str, value, parseMsgHeaders, asyncResult -> {
                        if (asyncResult.failed()) {
                            FrameHelper.sendErrFrame(str, string2, asyncResult.cause(), netSocket);
                            return;
                        }
                        Message message = (Message) asyncResult.result();
                        JsonObject jsonObject3 = new JsonObject();
                        for (Map.Entry entry : message.headers()) {
                            jsonObject3.put((String) entry.getKey(), entry.getValue());
                        }
                        if (message.replyAddress() != null) {
                            map2.put(message.replyAddress(), message);
                        }
                        FrameHelper.sendFrame("message", string2, message.replyAddress(), jsonObject3, true, message.body(), netSocket);
                    });
                } else if (!map2.containsKey(str)) {
                    this.eb.send(str, value, parseMsgHeaders);
                } else if (jsonObject.getInteger("failureCode") == null) {
                    map2.get(str).reply(value, parseMsgHeaders);
                } else {
                    map2.get(str).fail(jsonObject.getInteger("failureCode").intValue(), jsonObject.getString("message"));
                }
                map2.remove(str);
                return;
            case true:
                if (checkMatches(true, str)) {
                    this.eb.publish(str, value, parseMsgHeaders);
                    return;
                } else {
                    FrameHelper.sendErrFrame("access_denied", netSocket);
                    return;
                }
            case true:
                if (!checkMatches(false, str)) {
                    FrameHelper.sendErrFrame("access_denied", netSocket);
                    return;
                } else {
                    map.put(str, this.eb.consumer(str, message -> {
                        if (message.replyAddress() != null) {
                            map2.put(message.replyAddress(), message);
                        }
                        JsonObject jsonObject3 = new JsonObject();
                        for (Map.Entry entry : message.headers()) {
                            jsonObject3.put((String) entry.getKey(), entry.getValue());
                        }
                        FrameHelper.sendFrame("message", message.address(), message.replyAddress(), jsonObject3, Boolean.valueOf(message.isSend()), message.body(), netSocket);
                    }));
                    checkCallHook(() -> {
                        return new BridgeEventImpl(BridgeEventType.REGISTERED, jsonObject, netSocket);
                    }, null, null);
                    return;
                }
            case true:
                if (!checkMatches(false, str)) {
                    FrameHelper.sendErrFrame("access_denied", netSocket);
                    return;
                }
                MessageConsumer<?> remove = map.remove(str);
                if (remove != null) {
                    remove.unregister();
                    return;
                } else {
                    FrameHelper.sendErrFrame("unknown_address", netSocket);
                    return;
                }
            case true:
                FrameHelper.sendFrame("pong", netSocket);
                return;
            default:
                FrameHelper.sendErrFrame("unknown_type", netSocket);
                return;
        }
    }

    private void handler(NetSocket netSocket) {
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        ConcurrentHashMap concurrentHashMap2 = new ConcurrentHashMap();
        netSocket.handler(new FrameParser(asyncResult -> {
            if (asyncResult.failed()) {
                log.error(asyncResult.cause());
                return;
            }
            JsonObject jsonObject = (JsonObject) asyncResult.result();
            String string = jsonObject.getString("type", "message");
            String string2 = jsonObject.getString("address");
            BridgeEventType parseType = parseType(string);
            checkCallHook(() -> {
                return new BridgeEventImpl(parseType, jsonObject, netSocket);
            }, () -> {
                if (parseType == BridgeEventType.SOCKET_PING || string2 != null) {
                    doSendOrPub(netSocket, string2, jsonObject, concurrentHashMap, concurrentHashMap2);
                } else {
                    FrameHelper.sendErrFrame("missing_address", netSocket);
                    log.error("msg does not have address: " + jsonObject);
                }
            }, () -> {
                FrameHelper.sendErrFrame("blocked by bridgeEvent handler", netSocket);
            });
        }));
        netSocket.exceptionHandler(th -> {
            log.error(th.getMessage(), th);
            concurrentHashMap.values().forEach((v0) -> {
                v0.unregister();
            });
            concurrentHashMap.clear();
            netSocket.close();
        });
        netSocket.endHandler(r4 -> {
            concurrentHashMap.values().forEach((v0) -> {
                v0.unregister();
            });
            concurrentHashMap.clear();
        });
    }

    @Override // io.vertx.ext.eventbus.bridge.tcp.TcpEventBusBridge
    public void close(Handler<AsyncResult<Void>> handler) {
        this.server.close(handler);
    }

    @Override // io.vertx.ext.eventbus.bridge.tcp.TcpEventBusBridge
    public Future<Void> close() {
        return this.server.close();
    }

    private void checkCallHook(Supplier<BridgeEventImpl> supplier, Runnable runnable, Runnable runnable2) {
        if (this.bridgeEventHandler == null) {
            if (runnable != null) {
                runnable.run();
            }
        } else {
            BridgeEventImpl bridgeEventImpl = supplier.get();
            this.bridgeEventHandler.handle(bridgeEventImpl);
            bridgeEventImpl.future().onComplete(asyncResult -> {
                if (!asyncResult.succeeded()) {
                    log.error("Failure in bridge event handler", asyncResult.cause());
                    return;
                }
                if (((Boolean) asyncResult.result()).booleanValue()) {
                    if (runnable != null) {
                        runnable.run();
                    }
                } else if (runnable2 != null) {
                    runnable2.run();
                } else {
                    log.debug("Bridge handler prevented send or pub");
                }
            });
        }
    }

    private boolean checkMatches(boolean z, String str) {
        return checkMatches(z, str, null);
    }

    private boolean checkMatches(boolean z, String str, Map<String, Message<?>> map) {
        if (map != null && z && map.containsKey(str)) {
            return true;
        }
        for (PermittedOptions permittedOptions : z ? this.options.getInboundPermitteds() : this.options.getOutboundPermitteds()) {
            String address = permittedOptions.getAddress();
            String addressRegex = address == null ? permittedOptions.getAddressRegex() : null;
            if (address == null ? addressRegex == null || regexMatches(addressRegex, str) : address.equals(str)) {
                return true;
            }
        }
        return false;
    }

    private boolean regexMatches(String str, String str2) {
        Pattern pattern = this.compiledREs.get(str);
        if (pattern == null) {
            pattern = Pattern.compile(str);
            this.compiledREs.put(str, pattern);
        }
        return pattern.matcher(str2).matches();
    }

    private DeliveryOptions parseMsgHeaders(DeliveryOptions deliveryOptions, JsonObject jsonObject) {
        if (jsonObject == null) {
            return deliveryOptions;
        }
        for (String str : jsonObject.fieldNames()) {
            if ("timeout".equals(str)) {
                deliveryOptions.setSendTimeout(jsonObject.getLong(str).longValue());
            } else if ("localOnly".equals(str)) {
                deliveryOptions.setLocalOnly(jsonObject.getBoolean(str).booleanValue());
            } else if ("codecName".equals(str)) {
                deliveryOptions.setCodecName(jsonObject.getString(str));
            } else {
                deliveryOptions.addHeader(str, jsonObject.getString(str));
            }
        }
        return deliveryOptions;
    }

    private static BridgeEventType parseType(String str) {
        boolean z = -1;
        switch (str.hashCode()) {
            case -690213213:
                if (str.equals("register")) {
                    z = true;
                    break;
                }
                break;
            case -235365105:
                if (str.equals("publish")) {
                    z = 3;
                    break;
                }
                break;
            case 3441010:
                if (str.equals("ping")) {
                    z = false;
                    break;
                }
                break;
            case 3526536:
                if (str.equals("send")) {
                    z = 4;
                    break;
                }
                break;
            case 836015164:
                if (str.equals("unregister")) {
                    z = 2;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                return BridgeEventType.SOCKET_PING;
            case true:
                return BridgeEventType.REGISTER;
            case true:
                return BridgeEventType.UNREGISTER;
            case true:
                return BridgeEventType.PUBLISH;
            case true:
                return BridgeEventType.SEND;
            default:
                throw new IllegalArgumentException("Invalid frame type " + str);
        }
    }
}
