package io.vertx.core.eventbus.impl.clustered;

import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.impl.CodecManager;
import io.vertx.core.net.NetSocket;
import io.vertx.core.parsetools.RecordParser;
import io.vertx.core.spi.metrics.EventBusMetrics;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/vertx/core/eventbus/impl/clustered/InboundConnection.class */
public final class InboundConnection implements Handler<Buffer> {
    private static final Buffer PONG = Buffer.buffer(new byte[]{1});
    private final ClusteredEventBus clusteredEventBus;
    private final NetSocket socket;
    private final RecordParser parser;
    private int size = -1;
    private Handler<ClusteredMessage<?, ?>> handler;

    public InboundConnection(ClusteredEventBus clusteredEventBus, NetSocket netSocket) {
        this.clusteredEventBus = clusteredEventBus;
        RecordParser newFixed = RecordParser.newFixed(4);
        newFixed.setOutput(this::decodeMessage);
        this.socket = netSocket;
        this.parser = newFixed;
    }

    @Override // io.vertx.core.Handler
    public void handle(Buffer buffer) {
        this.parser.handle(buffer);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public InboundConnection handler(Handler<ClusteredMessage<?, ?>> handler) {
        this.handler = handler;
        return this;
    }

    private void decodeMessage(Buffer buffer) {
        if (this.size == -1) {
            this.size = buffer.getInt(0);
            this.parser.fixedSizeMode(this.size);
            return;
        }
        ClusteredMessage<?, ?> clusteredMessage = new ClusteredMessage<>(this.clusteredEventBus);
        clusteredMessage.readFromWire(buffer, this.clusteredEventBus.codecManager());
        this.parser.fixedSizeMode(4);
        this.size = -1;
        if (clusteredMessage.hasFailure()) {
            clusteredMessage.internalError();
            return;
        }
        if (clusteredMessage.codec() == CodecManager.PING_MESSAGE_CODEC) {
            this.socket.write((NetSocket) PONG);
            return;
        }
        EventBusMetrics<?> metrics = this.clusteredEventBus.metrics();
        if (metrics != null) {
            metrics.messageRead(clusteredMessage.address(), buffer.length());
        }
        this.handler.handle(clusteredMessage);
    }
}
