package io.vertx.core.eventbus.impl.clustered;

import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.Promise;
import io.vertx.core.VertxOptions;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.eventbus.EventBusOptions;
import io.vertx.core.eventbus.impl.CodecManager;
import io.vertx.core.eventbus.impl.EventBusImpl;
import io.vertx.core.eventbus.impl.HandlerHolder;
import io.vertx.core.eventbus.impl.HandlerRegistration;
import io.vertx.core.eventbus.impl.MessageImpl;
import io.vertx.core.impl.utils.ConcurrentCyclicSequence;
import io.vertx.core.internal.CloseFuture;
import io.vertx.core.internal.ContextInternal;
import io.vertx.core.internal.VertxInternal;
import io.vertx.core.internal.logging.Logger;
import io.vertx.core.internal.logging.LoggerFactory;
import io.vertx.core.internal.net.NetSocketInternal;
import io.vertx.core.net.NetClient;
import io.vertx.core.net.NetClientOptions;
import io.vertx.core.net.NetServerOptions;
import io.vertx.core.net.NetSocket;
import io.vertx.core.net.SocketAddress;
import io.vertx.core.net.impl.NetClientBuilder;
import io.vertx.core.net.impl.NetServerInternal;
import io.vertx.core.spi.cluster.ClusterManager;
import io.vertx.core.spi.cluster.NodeInfo;
import io.vertx.core.spi.cluster.RegistrationInfo;
import io.vertx.core.spi.metrics.EventBusMetrics;
import io.vertx.core.spi.metrics.VertxMetrics;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.util.Enumeration;
import java.util.Iterator;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;

/* loaded from: input_file:io/vertx/core/eventbus/impl/clustered/ClusteredEventBus.class */
public final class ClusteredEventBus extends EventBusImpl {
    private static final Logger log = LoggerFactory.getLogger(ClusteredEventBus.class);
    private final EventBusOptions options;
    private final ClusterManager clusterManager;
    private final NodeSelector nodeSelector;
    private final AtomicLong handlerSequence;
    private final NetClient client;
    private final ConcurrentMap<String, OutboundConnection> outboundConnections;
    private final ContextInternal context;
    private NodeInfo nodeInfo;
    private String nodeId;
    private NetServerInternal server;

    public ClusteredEventBus(VertxInternal vertxInternal, VertxOptions vertxOptions, ClusterManager clusterManager, NodeSelector nodeSelector) {
        super(vertxInternal);
        this.handlerSequence = new AtomicLong(0L);
        this.outboundConnections = new ConcurrentHashMap();
        NetClient createNetClient = createNetClient(vertxInternal, new NetClientOptions(vertxOptions.getEventBusOptions().toJson()).setHostnameVerificationAlgorithm(""));
        this.options = vertxOptions.getEventBusOptions();
        this.clusterManager = clusterManager;
        this.nodeSelector = nodeSelector;
        this.context = vertxInternal.contextBuilder().withClassLoader(Thread.currentThread().getContextClassLoader()).withCloseFuture(new CloseFuture()).build();
        this.client = createNetClient;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CodecManager codecManager() {
        return this.codecManager;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public EventBusMetrics<?> metrics() {
        return this.metrics;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public VertxInternal vertx() {
        return this.vertx;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public EventBusOptions options() {
        return this.options;
    }

    public static String defaultAddress() {
        try {
            Enumeration<NetworkInterface> networkInterfaces = NetworkInterface.getNetworkInterfaces();
            while (networkInterfaces.hasMoreElements()) {
                Enumeration<InetAddress> inetAddresses = networkInterfaces.nextElement().getInetAddresses();
                while (inetAddresses.hasMoreElements()) {
                    InetAddress nextElement = inetAddresses.nextElement();
                    if (!nextElement.isAnyLocalAddress() && !nextElement.isMulticastAddress() && !(nextElement instanceof Inet6Address)) {
                        return nextElement.getHostAddress();
                    }
                }
            }
            return null;
        } catch (SocketException e) {
            return null;
        }
    }

    private NetClient createNetClient(VertxInternal vertxInternal, NetClientOptions netClientOptions) {
        NetClientBuilder netClientBuilder = new NetClientBuilder(vertxInternal, netClientOptions);
        VertxMetrics metrics = vertxInternal.metrics();
        if (metrics != null) {
            netClientBuilder.metrics(metrics.createNetClientMetrics(netClientOptions));
        }
        return netClientBuilder.build();
    }

    private NetServerOptions getServerOptions() {
        return new NetServerOptions(this.options.toJson());
    }

    @Override // io.vertx.core.eventbus.impl.EventBusImpl, io.vertx.core.eventbus.impl.EventBusInternal
    public void start(Promise<Void> promise) {
        this.server = this.vertx.createNetServer(getServerOptions());
        this.server.connectHandler(netSocket -> {
            InboundConnection inboundConnection = new InboundConnection(this, netSocket);
            inboundConnection.handler((v1) -> {
                deliverMessageLocally(v1);
            });
            netSocket.handler2((Handler<Buffer>) inboundConnection);
        });
        int clusterPort = getClusterPort();
        String clusterHost = getClusterHost();
        this.server.listen(this.context, SocketAddress.inetSocketAddress(clusterPort, clusterHost)).flatMap(netServer -> {
            this.nodeInfo = new NodeInfo(getClusterPublicHost(clusterHost), getClusterPublicPort(this.server.actualPort()), this.options.getClusterNodeMetadata());
            this.nodeId = this.clusterManager.getNodeId();
            Promise<Void> promise2 = Promise.promise();
            this.clusterManager.setNodeInfo(this.nodeInfo, promise2);
            return promise2.future();
        }).andThen((Handler<AsyncResult<U>>) asyncResult -> {
            if (asyncResult.succeeded()) {
                this.started = true;
                this.nodeSelector.eventBusStarted();
            }
        }).onComplete(promise);
    }

    @Override // io.vertx.core.eventbus.impl.EventBusImpl, io.vertx.core.eventbus.impl.EventBusInternal
    public void close(Promise<Void> promise) {
        Promise<Void> promise2 = Promise.promise();
        super.close(promise2);
        Future<Void> future = promise2.future();
        NetClient netClient = this.client;
        Objects.requireNonNull(netClient);
        Future<Void> eventually = future.eventually(netClient::close);
        if (this.server != null) {
            eventually = eventually.eventually(() -> {
                return this.server.close();
            });
        }
        eventually.onComplete(promise);
    }

    @Override // io.vertx.core.eventbus.impl.EventBusImpl
    public MessageImpl createMessage(boolean z, boolean z2, String str, MultiMap multiMap, Object obj, String str2) {
        Objects.requireNonNull(str, "no null address accepted");
        return new ClusteredMessage(this.nodeId, str, multiMap, obj, this.codecManager.lookupCodec(obj, str2, z2), z, this);
    }

    @Override // io.vertx.core.eventbus.impl.EventBusImpl
    protected <T> void onLocalRegistration(HandlerHolder<T> handlerHolder, Promise<Void> promise) {
        this.clusterManager.addRegistration(handlerHolder.getHandler().address(), new RegistrationInfo(this.nodeId, handlerHolder.getSeq(), handlerHolder.isLocalOnly()), (Promise) Objects.requireNonNull(promise));
    }

    @Override // io.vertx.core.eventbus.impl.EventBusImpl
    protected <T> HandlerHolder<T> createHandlerHolder(HandlerRegistration<T> handlerRegistration, boolean z, ContextInternal contextInternal) {
        return new ClusteredHandlerHolder(handlerRegistration, z, contextInternal, this.handlerSequence.getAndIncrement());
    }

    @Override // io.vertx.core.eventbus.impl.EventBusImpl
    protected <T> void onLocalUnregistration(HandlerHolder<T> handlerHolder, Promise<Void> promise) {
        RegistrationInfo registrationInfo = new RegistrationInfo(this.nodeId, handlerHolder.getSeq(), handlerHolder.isLocalOnly());
        Promise<Void> promise2 = Promise.promise();
        this.clusterManager.removeRegistration(handlerHolder.getHandler().address(), registrationInfo, promise2);
        promise2.future().onComplete(promise);
    }

    @Override // io.vertx.core.eventbus.impl.EventBusImpl
    protected <T> void sendOrPub(ContextInternal contextInternal, MessageImpl<?, T> messageImpl, DeliveryOptions deliveryOptions, Promise<Void> promise) {
        if (((ClusteredMessage) messageImpl).getRepliedTo() != null) {
            clusteredSendReply(messageImpl, promise, ((ClusteredMessage) messageImpl).getRepliedTo());
            return;
        }
        if (deliveryOptions.isLocalOnly()) {
            sendLocally(messageImpl, promise);
        } else if (messageImpl.isSend()) {
            this.nodeSelector.selectForSend(messageImpl.address(), (str, th) -> {
                if (th == null) {
                    sendToNode(str, messageImpl, promise);
                } else {
                    sendOrPublishFailed(promise, th);
                }
            });
        } else {
            this.nodeSelector.selectForPublish(messageImpl.address(), (iterable, th2) -> {
                if (th2 == null) {
                    sendToNodes(iterable, messageImpl, promise);
                } else {
                    sendOrPublishFailed(promise, th2);
                }
            });
        }
    }

    private void sendOrPublishFailed(Promise<Void> promise, Throwable th) {
        if (log.isDebugEnabled()) {
            log.error("Failed to send message", th);
        }
        promise.tryFail(th);
    }

    @Override // io.vertx.core.eventbus.impl.EventBusImpl
    protected String generateReplyAddress() {
        return "__vertx.reply." + UUID.randomUUID().toString();
    }

    @Override // io.vertx.core.eventbus.impl.EventBusImpl
    protected boolean isMessageLocal(MessageImpl messageImpl) {
        return !((ClusteredMessage) messageImpl).isFromWire();
    }

    @Override // io.vertx.core.eventbus.impl.EventBusImpl
    protected HandlerHolder nextHandler(ConcurrentCyclicSequence<HandlerHolder> concurrentCyclicSequence, boolean z) {
        HandlerHolder handlerHolder = null;
        if (!z) {
            Iterator<HandlerHolder> it = concurrentCyclicSequence.iterator(false);
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                HandlerHolder next = it.next();
                if (!next.isLocalOnly()) {
                    handlerHolder = next;
                    break;
                }
            }
        } else {
            handlerHolder = concurrentCyclicSequence.next();
        }
        return handlerHolder;
    }

    private int getClusterPort() {
        return this.options.getPort();
    }

    private String getClusterHost() {
        String host = this.options.getHost();
        if (host != null) {
            return host;
        }
        String clusterHost = this.clusterManager.clusterHost();
        return clusterHost != null ? clusterHost : defaultAddress();
    }

    private int getClusterPublicPort(int i) {
        int clusterPublicPort = this.options.getClusterPublicPort();
        return clusterPublicPort > 0 ? clusterPublicPort : i;
    }

    private String getClusterPublicHost(String str) {
        String clusterPublicHost = this.options.getClusterPublicHost();
        if (clusterPublicHost != null) {
            return clusterPublicHost;
        }
        String host = this.options.getHost();
        if (host != null) {
            return host;
        }
        String clusterPublicHost2 = this.clusterManager.clusterPublicHost();
        return clusterPublicHost2 != null ? clusterPublicHost2 : str;
    }

    private <T> void sendToNode(String str, MessageImpl<?, T> messageImpl, Promise<Void> promise) {
        if (str == null || str.equals(this.nodeId)) {
            sendLocally(messageImpl, promise);
        } else {
            sendRemote(str, messageImpl, promise);
        }
    }

    private <T> void sendToNodes(Iterable<String> iterable, MessageImpl<?, T> messageImpl, Promise<Void> promise) {
        boolean z = false;
        if (iterable != null) {
            for (String str : iterable) {
                if (!z) {
                    z = true;
                }
                sendToNode(str, messageImpl, promise);
            }
        }
        if (z) {
            return;
        }
        sendLocally(messageImpl, promise);
    }

    private <T> void clusteredSendReply(MessageImpl<?, T> messageImpl, Promise<Void> promise, String str) {
        if (str.equals(this.nodeId)) {
            sendLocally(messageImpl, promise);
        } else {
            sendRemote(str, messageImpl, promise);
        }
    }

    private void sendRemote(String str, MessageImpl<?, ?> messageImpl, Promise<Void> promise) {
        getOutboundConnection(str).writeMessage(messageImpl, promise);
    }

    private OutboundConnection getOutboundConnection(String str) {
        OutboundConnection outboundConnection = this.outboundConnections.get(str);
        if (outboundConnection == null) {
            outboundConnection = new OutboundConnection(this, str);
            OutboundConnection putIfAbsent = this.outboundConnections.putIfAbsent(str, outboundConnection);
            if (putIfAbsent != null) {
                outboundConnection = putIfAbsent;
            } else {
                connect(outboundConnection);
            }
        }
        return outboundConnection;
    }

    private void connect(OutboundConnection outboundConnection) {
        Promise<NodeInfo> promise = Promise.promise();
        this.clusterManager.getNodeInfo(outboundConnection.remoteNodeId(), promise);
        promise.future().flatMap(nodeInfo -> {
            return this.client.connect(nodeInfo.port(), nodeInfo.host());
        }).onComplete2((Handler<AsyncResult<U>>) asyncResult -> {
            if (!asyncResult.succeeded()) {
                log.warn("Connecting to server " + outboundConnection.remoteNodeId() + " failed", asyncResult.cause());
                outboundConnection.handleClose(asyncResult.cause());
            } else {
                NetSocket netSocket = (NetSocket) asyncResult.result();
                netSocket.handler2((Handler<Buffer>) outboundConnection);
                netSocket.mo369closeHandler(r6 -> {
                    if (this.outboundConnections.remove(outboundConnection.remoteNodeId(), outboundConnection) && log.isDebugEnabled()) {
                        log.debug("Cluster connection closed for server " + outboundConnection.remoteNodeId());
                    }
                    outboundConnection.handleClose(NetSocketInternal.CLOSED_EXCEPTION);
                });
                outboundConnection.connected(netSocket);
            }
        });
    }
}
