package io.vertx.ext.stomp.impl;

import io.vertx.core.Completable;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.http.ServerWebSocket;
import io.vertx.core.http.ServerWebSocketHandshake;
import io.vertx.core.internal.logging.Logger;
import io.vertx.core.internal.logging.LoggerFactory;
import io.vertx.core.net.NetServer;
import io.vertx.ext.stomp.Command;
import io.vertx.ext.stomp.Frames;
import io.vertx.ext.stomp.ServerFrame;
import io.vertx.ext.stomp.StompOptions;
import io.vertx.ext.stomp.StompServer;
import io.vertx.ext.stomp.StompServerHandler;
import io.vertx.ext.stomp.StompServerOptions;
import java.util.Collections;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;

/* loaded from: input_file:io/vertx/ext/stomp/impl/StompServerImpl.class */
public class StompServerImpl implements StompServer {
    private static final Logger LOGGER = LoggerFactory.getLogger(StompServerImpl.class);
    private final Vertx vertx;
    private final StompServerOptions options;
    private final NetServer server;
    private StompServerHandler handler;
    private volatile boolean listening;
    private Handler<ServerFrame> writingFrameHandler;

    public StompServerImpl(Vertx vertx, NetServer netServer, StompServerOptions stompServerOptions) {
        Objects.requireNonNull(vertx);
        Objects.requireNonNull(stompServerOptions);
        this.options = stompServerOptions;
        this.vertx = vertx;
        if (netServer == null) {
            this.server = vertx.createNetServer(stompServerOptions);
        } else {
            this.server = netServer;
        }
    }

    @Override // io.vertx.ext.stomp.StompServer
    public synchronized StompServer handler(StompServerHandler stompServerHandler) {
        Objects.requireNonNull(stompServerHandler);
        this.handler = stompServerHandler;
        return this;
    }

    @Override // io.vertx.ext.stomp.StompServer
    public Future<StompServer> listen() {
        Promise promise = Promise.promise();
        listen((Completable<StompServer>) promise);
        return promise.future();
    }

    public StompServer listen(Completable<StompServer> completable) {
        return listen(this.options.getPort(), this.options.getHost(), completable);
    }

    @Override // io.vertx.ext.stomp.StompServer
    public Future<StompServer> listen(int i) {
        return listen(i, StompOptions.DEFAULT_STOMP_HOST);
    }

    @Override // io.vertx.ext.stomp.StompServer
    public Future<StompServer> listen(int i, String str) {
        Promise promise = Promise.promise();
        listen(i, str, promise);
        return promise.future();
    }

    public StompServer listen(int i, String str, Completable<StompServer> completable) {
        StompServerHandler stompServerHandler;
        if (i == -1) {
            completable.fail("TCP server disabled. The port is set to '-1'.");
            return this;
        }
        synchronized (this) {
            stompServerHandler = this.handler;
        }
        Objects.requireNonNull(stompServerHandler, "Cannot open STOMP server - no StompServerConnectionHandler attached to the server.");
        this.server.connectHandler(netSocket -> {
            AtomicBoolean atomicBoolean = new AtomicBoolean();
            AtomicBoolean atomicBoolean2 = new AtomicBoolean();
            StompServerTCPConnectionImpl stompServerTCPConnectionImpl = new StompServerTCPConnectionImpl(netSocket, this, serverFrame -> {
                if (serverFrame.frame().getCommand() == Command.CONNECTED) {
                    atomicBoolean.set(true);
                }
                Handler<ServerFrame> handler = this.writingFrameHandler;
                if (handler != null) {
                    handler.handle(serverFrame);
                }
            });
            FrameParser frameParser = new FrameParser(this.options);
            netSocket.exceptionHandler(th -> {
                LOGGER.error("The STOMP server caught a TCP socket error - closing connection", th);
                stompServerTCPConnectionImpl.close();
            });
            netSocket.endHandler(r3 -> {
                stompServerTCPConnectionImpl.close();
            });
            frameParser.errorHandler(frameException -> {
                stompServerTCPConnectionImpl.write(Frames.createInvalidFrameErrorFrame(frameException));
                stompServerTCPConnectionImpl.close();
            }).handler(frame -> {
                if (frame.getCommand() == Command.CONNECT || frame.getCommand() == Command.STOMP) {
                    if (atomicBoolean2.compareAndSet(false, true)) {
                        stompServerHandler.handle(new ServerFrameImpl(frame, stompServerTCPConnectionImpl));
                        return;
                    } else {
                        stompServerTCPConnectionImpl.write(Frames.createErrorFrame("Already connected", Collections.emptyMap(), ""));
                        stompServerTCPConnectionImpl.close();
                        return;
                    }
                }
                if (atomicBoolean.get()) {
                    stompServerHandler.handle(new ServerFrameImpl(frame, stompServerTCPConnectionImpl));
                } else {
                    stompServerTCPConnectionImpl.write(Frames.createErrorFrame("Not connected", Collections.emptyMap(), ""));
                    stompServerTCPConnectionImpl.close();
                }
            });
            netSocket.handler(frameParser);
        }).listen(i, str).onComplete(asyncResult -> {
            if (asyncResult.failed()) {
                if (completable != null) {
                    this.vertx.runOnContext(r5 -> {
                        completable.fail(asyncResult.cause());
                    });
                    return;
                } else {
                    LOGGER.error(asyncResult.cause());
                    return;
                }
            }
            this.listening = true;
            LOGGER.info("STOMP server listening on " + ((NetServer) asyncResult.result()).actualPort());
            if (completable != null) {
                this.vertx.runOnContext(r52 -> {
                    completable.succeed(this);
                });
            }
        });
        return this;
    }

    @Override // io.vertx.ext.stomp.StompServer
    public Future<Void> close() {
        Promise promise = Promise.promise();
        close(promise);
        return promise.future();
    }

    @Override // io.vertx.ext.stomp.StompServer
    public boolean isListening() {
        return this.listening;
    }

    @Override // io.vertx.ext.stomp.StompServer
    public int actualPort() {
        return this.server.actualPort();
    }

    @Override // io.vertx.ext.stomp.StompServer
    public StompServerOptions options() {
        return this.options;
    }

    @Override // io.vertx.ext.stomp.StompServer
    public Vertx vertx() {
        return this.vertx;
    }

    @Override // io.vertx.ext.stomp.StompServer
    public synchronized StompServerHandler stompHandler() {
        return this.handler;
    }

    public void close(Completable<Void> completable) {
        if (this.listening) {
            this.server.close().onComplete((r6, th) -> {
                if (th == null) {
                    LOGGER.info("STOMP Server stopped");
                } else {
                    LOGGER.info("STOMP Server failed to stop", th);
                }
                this.listening = false;
                if (completable != null) {
                    completable.complete(r6, th);
                }
            });
        } else if (completable != null) {
            this.vertx.runOnContext(r3 -> {
                completable.succeed();
            });
        }
    }

    @Override // io.vertx.ext.stomp.StompServer
    public Handler<ServerWebSocketHandshake> webSocketHandshakeHandler() {
        if (this.options.isWebsocketBridge()) {
            return serverWebSocketHandshake -> {
                if (serverWebSocketHandshake.path().equals(this.options.getWebsocketPath())) {
                    serverWebSocketHandshake.accept();
                } else {
                    LOGGER.error("Receiving a web socket connection on an invalid path (" + serverWebSocketHandshake.path() + "), the path is configured to " + this.options.getWebsocketPath() + ". Rejecting connection");
                    serverWebSocketHandshake.reject();
                }
            };
        }
        return null;
    }

    @Override // io.vertx.ext.stomp.StompServer
    public Handler<ServerWebSocket> webSocketHandler() {
        StompServerHandler stompServerHandler;
        if (!this.options.isWebsocketBridge()) {
            return null;
        }
        synchronized (this) {
            stompServerHandler = this.handler;
        }
        return serverWebSocket -> {
            AtomicBoolean atomicBoolean = new AtomicBoolean();
            AtomicBoolean atomicBoolean2 = new AtomicBoolean();
            StompServerWebSocketConnectionImpl stompServerWebSocketConnectionImpl = new StompServerWebSocketConnectionImpl(serverWebSocket, this, serverFrame -> {
                if (serverFrame.frame().getCommand() == Command.CONNECTED || serverFrame.frame().getCommand() == Command.STOMP) {
                    atomicBoolean.set(true);
                }
                Handler<ServerFrame> handler = this.writingFrameHandler;
                if (handler != null) {
                    handler.handle(serverFrame);
                }
            });
            FrameParser frameParser = new FrameParser(this.options);
            serverWebSocket.exceptionHandler(th -> {
                LOGGER.error("The STOMP server caught a WebSocket error - closing connection", th);
                stompServerWebSocketConnectionImpl.close();
            });
            serverWebSocket.endHandler(r3 -> {
                stompServerWebSocketConnectionImpl.close();
            });
            frameParser.errorHandler(frameException -> {
                stompServerWebSocketConnectionImpl.write(Frames.createInvalidFrameErrorFrame(frameException));
                stompServerWebSocketConnectionImpl.close();
            }).handler(frame -> {
                if (frame.getCommand() == Command.CONNECT) {
                    if (atomicBoolean2.compareAndSet(false, true)) {
                        stompServerHandler.handle(new ServerFrameImpl(frame, stompServerWebSocketConnectionImpl));
                        return;
                    } else {
                        stompServerWebSocketConnectionImpl.write(Frames.createErrorFrame("Already connected", Collections.emptyMap(), ""));
                        stompServerWebSocketConnectionImpl.close();
                        return;
                    }
                }
                if (atomicBoolean.get()) {
                    stompServerHandler.handle(new ServerFrameImpl(frame, stompServerWebSocketConnectionImpl));
                } else {
                    stompServerWebSocketConnectionImpl.write(Frames.createErrorFrame("Not connected", Collections.emptyMap(), ""));
                    stompServerWebSocketConnectionImpl.close();
                }
            });
            serverWebSocket.handler(frameParser);
        };
    }

    @Override // io.vertx.ext.stomp.StompServer
    public StompServer writingFrameHandler(Handler<ServerFrame> handler) {
        synchronized (this) {
            this.writingFrameHandler = handler;
        }
        return this;
    }
}
