package com.arcadedb.server.ha;

import com.arcadedb.ContextConfiguration;
import com.arcadedb.GlobalConfiguration;
import com.arcadedb.database.Binary;
import com.arcadedb.exception.TimeoutException;
import com.arcadedb.log.LogManager;
import com.arcadedb.network.binary.ChannelBinaryServer;
import com.arcadedb.network.binary.ConnectionException;
import com.arcadedb.server.ha.HAServer;
import com.arcadedb.server.ha.message.CommandForwardRequest;
import com.arcadedb.server.ha.message.HACommand;
import com.arcadedb.server.ha.message.ReplicaConnectHotResyncResponse;
import com.arcadedb.server.ha.message.TxForwardRequest;
import com.arcadedb.utility.Callable;
import com.arcadedb.utility.FileUtils;
import com.arcadedb.utility.Pair;
import com.conversantmedia.util.concurrent.PushPullBlockingQueue;
import java.io.IOException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;

/* loaded from: input_file:com/arcadedb/server/ha/Leader2ReplicaNetworkExecutor.class */
public class Leader2ReplicaNetworkExecutor extends Thread {
    private final HAServer server;
    private final String remoteServerName;
    private final String remoteServerAddress;
    private final String remoteServerHTTPAddress;
    private final BlockingQueue<Binary> senderQueue;
    private Thread senderThread;
    private final BlockingQueue<Pair<ReplicationMessage, HACommand>> forwarderQueue;
    private Thread forwarderThread;
    private long joinedOn;
    private ChannelBinaryServer channel;
    private long totalMessages;
    private long totalBytes;
    private long latencyMin;
    private long latencyMax;
    private long latencyTotalTime;
    private long leftOn = 0;
    private STATUS status = STATUS.JOINING;
    private final Object lock = new Object();
    private final Object channelOutputLock = new Object();
    private final Object channelInputLock = new Object();
    private volatile boolean shutdownCommunication = false;

    /* loaded from: input_file:com/arcadedb/server/ha/Leader2ReplicaNetworkExecutor$STATUS.class */
    public enum STATUS {
        JOINING,
        OFFLINE,
        ONLINE
    }

    public Leader2ReplicaNetworkExecutor(HAServer hAServer, ChannelBinaryServer channelBinaryServer, String str, String str2, String str3) throws IOException {
        this.server = hAServer;
        this.remoteServerName = str;
        this.remoteServerAddress = str2;
        this.remoteServerHTTPAddress = str3;
        this.channel = channelBinaryServer;
        ContextConfiguration configuration = hAServer.getServer().getConfiguration();
        int valueAsInteger = configuration.getValueAsInteger(GlobalConfiguration.HA_REPLICATION_QUEUE_SIZE);
        String valueAsString = configuration.getValueAsString(GlobalConfiguration.ASYNC_OPERATIONS_QUEUE_IMPL);
        if ("fast".equalsIgnoreCase(valueAsString)) {
            this.senderQueue = new PushPullBlockingQueue(valueAsInteger);
            this.forwarderQueue = new PushPullBlockingQueue(valueAsInteger);
        } else if ("standard".equalsIgnoreCase(valueAsString)) {
            this.senderQueue = new ArrayBlockingQueue(valueAsInteger);
            this.forwarderQueue = new ArrayBlockingQueue(valueAsInteger);
        } else {
            LogManager.instance().log(this, Level.WARNING, "Error on async operation queue implementation setting: %s is not supported", (Throwable) null, valueAsString);
            this.senderQueue = new ArrayBlockingQueue(valueAsInteger);
            this.forwarderQueue = new ArrayBlockingQueue(valueAsInteger);
        }
        setName(this.server.getServer().getServerName() + " leader2replica->?");
        synchronized (this.channelOutputLock) {
            try {
                if (!hAServer.isLeader()) {
                    Replica2LeaderNetworkExecutor leader = this.server.getLeader();
                    this.channel.writeBoolean(false);
                    this.channel.writeByte((byte) 0);
                    this.channel.writeString("Current server '" + hAServer.getServerName() + "' is not the Leader");
                    this.channel.writeString(leader != null ? leader.getRemoteServerName() : "");
                    this.channel.writeString(leader != null ? leader.getRemoteAddress() : "");
                    throw new ConnectionException(channelBinaryServer.socket.getInetAddress().toString(), "Current server '" + hAServer.getServerName() + "' is not the Leader");
                }
                HAServer.ELECTION_STATUS electionStatus = hAServer.getElectionStatus();
                if (electionStatus != HAServer.ELECTION_STATUS.DONE && electionStatus != HAServer.ELECTION_STATUS.LEADER_WAITING_FOR_QUORUM) {
                    this.channel.writeBoolean(false);
                    this.channel.writeByte((byte) 1);
                    this.channel.writeString("Election for the Leader is pending");
                    throw new ConnectionException(channelBinaryServer.socket.getInetAddress().toString(), "Election for Leader is pending");
                }
                setName(this.server.getServer().getServerName() + " leader2replica->" + str + "(" + str2 + ")");
                this.channel.writeBoolean(true);
                this.channel.writeString(this.server.getServerName());
                this.channel.writeLong(this.server.lastElectionVote != null ? ((Long) this.server.lastElectionVote.getFirst()).longValue() : 1L);
                this.channel.writeString(this.server.getServer().getHttpServer().getListeningAddress());
                this.channel.writeString(this.server.getServerAddressList());
                LogManager.instance().log(this, Level.INFO, "Remote Replica server '%s' (%s) successfully connected", str, str2);
                this.channel.flush();
            } catch (Throwable th) {
                this.channel.flush();
                throw th;
            }
        }
    }

    public void mergeFrom(Leader2ReplicaNetworkExecutor leader2ReplicaNetworkExecutor) {
        synchronized (leader2ReplicaNetworkExecutor.lock) {
            this.senderQueue.addAll(leader2ReplicaNetworkExecutor.senderQueue);
            leader2ReplicaNetworkExecutor.close();
        }
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        LogManager.instance().setContext(this.server.getServerName());
        this.senderThread = new Thread(new Runnable() { // from class: com.arcadedb.server.ha.Leader2ReplicaNetworkExecutor.1
            @Override // java.lang.Runnable
            public void run() {
                LogManager.instance().setContext(Leader2ReplicaNetworkExecutor.this.server.getServerName());
                Binary binary = null;
                while (true) {
                    if (!Leader2ReplicaNetworkExecutor.this.shutdownCommunication || !Leader2ReplicaNetworkExecutor.this.senderQueue.isEmpty()) {
                        if (binary == null) {
                            try {
                                binary = Leader2ReplicaNetworkExecutor.this.senderQueue.poll(500L, TimeUnit.MILLISECONDS);
                            } catch (IOException e) {
                                LogManager.instance().log(this, Level.INFO, "Error on sending replication message to remote server '%s' (error=%s)", Leader2ReplicaNetworkExecutor.this.remoteServerName, e);
                                Leader2ReplicaNetworkExecutor.this.shutdownCommunication = true;
                                return;
                            } catch (InterruptedException e2) {
                                Thread.currentThread().interrupt();
                            }
                        }
                        if (binary != null) {
                            if (!Leader2ReplicaNetworkExecutor.this.shutdownCommunication) {
                                switch (Leader2ReplicaNetworkExecutor.this.status.ordinal()) {
                                    case 2:
                                        LogManager.instance().log(this, Level.FINE, "Sending message to replica '%s' (msgSize=%d buffered=%d)...", Leader2ReplicaNetworkExecutor.this.remoteServerName, Integer.valueOf(binary.size()), Integer.valueOf(Leader2ReplicaNetworkExecutor.this.senderQueue.size()));
                                        Leader2ReplicaNetworkExecutor.this.sendMessage(binary);
                                        binary = null;
                                        break;
                                    default:
                                        LogManager.instance().log(this, Level.FINE, "Replica '%s' is not online, waiting and retry (buffered=%d)...", Leader2ReplicaNetworkExecutor.this.remoteServerName, Integer.valueOf(Leader2ReplicaNetworkExecutor.this.senderQueue.size()));
                                        Thread.sleep(500L);
                                        break;
                                }
                            }
                        }
                    }
                }
                LogManager.instance().log(this, Level.FINE, "Replication thread to remote server '%s' is off (buffered=%d)", Leader2ReplicaNetworkExecutor.this.remoteServerName, Integer.valueOf(Leader2ReplicaNetworkExecutor.this.senderQueue.size()));
            }
        });
        this.senderThread.start();
        this.senderThread.setName(this.server.getServer().getServerName() + " leader2replica-sender->" + this.remoteServerName);
        this.forwarderThread = new Thread(new Runnable() { // from class: com.arcadedb.server.ha.Leader2ReplicaNetworkExecutor.2
            @Override // java.lang.Runnable
            public void run() {
                LogManager.instance().setContext(Leader2ReplicaNetworkExecutor.this.server.getServerName());
                Binary binary = new Binary(8192);
                binary.setAllocationChunkSize(1024);
                while (true) {
                    if (Leader2ReplicaNetworkExecutor.this.shutdownCommunication && Leader2ReplicaNetworkExecutor.this.forwarderQueue.isEmpty()) {
                        break;
                    }
                    try {
                        Pair<ReplicationMessage, HACommand> poll = Leader2ReplicaNetworkExecutor.this.forwarderQueue.poll(500L, TimeUnit.MILLISECONDS);
                        if (poll != null) {
                            if (Leader2ReplicaNetworkExecutor.this.shutdownCommunication) {
                                break;
                            } else {
                                Leader2ReplicaNetworkExecutor.this.executeMessage(binary, poll);
                            }
                        }
                    } catch (IOException e) {
                        LogManager.instance().log(this, Level.INFO, "Error on sending replication message to remote server '%s' (error=%s)", Leader2ReplicaNetworkExecutor.this.remoteServerName, e);
                        Leader2ReplicaNetworkExecutor.this.shutdownCommunication = true;
                        return;
                    } catch (InterruptedException e2) {
                        Thread.currentThread().interrupt();
                    }
                }
                LogManager.instance().log(this, Level.FINE, "Replication thread to remote server '%s' is off (buffered=%d)", Leader2ReplicaNetworkExecutor.this.remoteServerName, Integer.valueOf(Leader2ReplicaNetworkExecutor.this.forwarderQueue.size()));
            }
        });
        this.forwarderThread.start();
        this.forwarderThread.setName(this.server.getServer().getServerName() + " leader-forwarder");
        Binary binary = new Binary(8192);
        while (!this.shutdownCommunication) {
            Pair<ReplicationMessage, HACommand> pair = null;
            try {
                pair = this.server.getMessageFactory().deserializeCommand(binary, readRequest());
                if (pair == null) {
                    this.channel.clearInput();
                } else {
                    HACommand hACommand = (HACommand) pair.getSecond();
                    LogManager.instance().log(this, Level.FINE, "Leader received message %d from replica %s: %s", Long.valueOf(((ReplicationMessage) pair.getFirst()).messageNumber), this.remoteServerName, hACommand);
                    if ((hACommand instanceof TxForwardRequest) || (hACommand instanceof CommandForwardRequest)) {
                        this.forwarderQueue.put(pair);
                    } else {
                        executeMessage(binary, pair);
                    }
                }
            } catch (TimeoutException e) {
                LogManager.instance().log(this, Level.FINE, "Request %s in timeout (cause=%s)", pair, e.getCause());
            } catch (IOException e2) {
                LogManager.instance().log(this, Level.FINE, "IO Error from reading requests (cause=%s)", e2.getCause());
                this.server.setReplicaStatus(this.remoteServerName, false);
                close();
            } catch (Exception e3) {
                LogManager.instance().log(this, Level.SEVERE, "Generic error during applying of request from Leader (cause=%s)", e3.toString());
                this.server.setReplicaStatus(this.remoteServerName, false);
                close();
            }
        }
    }

    public int getMessagesInQueue() {
        return this.senderQueue.size();
    }

    private void executeMessage(Binary binary, Pair<ReplicationMessage, HACommand> pair) throws IOException {
        ReplicationMessage replicationMessage = (ReplicationMessage) pair.getFirst();
        HACommand execute = ((HACommand) pair.getSecond()).execute(this.server, this.remoteServerName, replicationMessage.messageNumber);
        if (execute != null) {
            this.server.getMessageFactory().serializeCommand(execute, binary, replicationMessage.messageNumber);
            LogManager.instance().log(this, Level.FINE, "Request %s -> %s to '%s'", pair.getSecond(), execute, this.remoteServerName);
            sendMessage(binary);
            if (execute instanceof ReplicaConnectHotResyncResponse) {
                this.server.resendMessagesToReplica(((ReplicaConnectHotResyncResponse) execute).getMessageNumber(), this.remoteServerName);
                this.server.setReplicaStatus(this.remoteServerName, true);
            }
        }
    }

    private byte[] readRequest() throws IOException {
        byte[] readBytes;
        synchronized (this.channelInputLock) {
            readBytes = this.channel.readBytes();
        }
        return readBytes;
    }

    public void closeChannel() {
        ChannelBinaryServer channelBinaryServer = this.channel;
        if (channelBinaryServer != null) {
            channelBinaryServer.close();
            this.channel = null;
        }
    }

    public void close() {
        executeInLock(obj -> {
            this.shutdownCommunication = true;
            try {
                Thread thread = this.senderThread;
                if (thread != null) {
                    try {
                        thread.join(1000L);
                        this.senderThread = null;
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
                Thread thread2 = this.forwarderThread;
                if (thread2 != null) {
                    try {
                        thread2.join(1000L);
                        this.forwarderThread = null;
                    } catch (InterruptedException e2) {
                        Thread.currentThread().interrupt();
                    }
                }
                closeChannel();
                return null;
            } catch (Exception e3) {
                return null;
            }
        });
    }

    public boolean enqueueMessage(final long j, final Binary binary) {
        if (this.status == STATUS.OFFLINE) {
            return false;
        }
        return ((Boolean) executeInLock(new Callable<Object, Object>() { // from class: com.arcadedb.server.ha.Leader2ReplicaNetworkExecutor.3
            public Object call(Object obj) {
                if (Leader2ReplicaNetworkExecutor.this.senderQueue.size() > 1) {
                    LogManager.instance().log(this, Level.FINE, "Buffering request %d to server '%s' (status=%s buffered=%d)", Long.valueOf(j), Leader2ReplicaNetworkExecutor.this.remoteServerName, Leader2ReplicaNetworkExecutor.this.status, Integer.valueOf(Leader2ReplicaNetworkExecutor.this.senderQueue.size()));
                }
                if (!Leader2ReplicaNetworkExecutor.this.senderQueue.offer(binary)) {
                    if (Leader2ReplicaNetworkExecutor.this.status == STATUS.OFFLINE) {
                        return false;
                    }
                    LogManager.instance().log(this, Level.WARNING, "Applying back-pressure on replicating messages to server '%s' (latency=%s buffered=%d)...", Leader2ReplicaNetworkExecutor.this.getRemoteServerName(), Leader2ReplicaNetworkExecutor.this.getLatencyStats(), Integer.valueOf(Leader2ReplicaNetworkExecutor.this.senderQueue.size()));
                    try {
                        Thread.sleep(1000L);
                        if (Leader2ReplicaNetworkExecutor.this.status == STATUS.OFFLINE) {
                            return false;
                        }
                        if (!Leader2ReplicaNetworkExecutor.this.senderQueue.offer(binary)) {
                            LogManager.instance().log(this, Level.INFO, "Timeout on writing request to server '%s', setting it offline...", Leader2ReplicaNetworkExecutor.this.getRemoteServerName());
                            Leader2ReplicaNetworkExecutor.this.senderQueue.clear();
                            Leader2ReplicaNetworkExecutor.this.server.setReplicaStatus(Leader2ReplicaNetworkExecutor.this.remoteServerName, false);
                            throw new ReplicationException("Replica '" + Leader2ReplicaNetworkExecutor.this.remoteServerName + "' is not reading replication messages");
                        }
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        throw new ReplicationException("Error on replicating to server '" + Leader2ReplicaNetworkExecutor.this.remoteServerName + "'");
                    }
                }
                Leader2ReplicaNetworkExecutor.this.totalBytes += binary.size();
                return true;
            }
        })).booleanValue();
    }

    public void setStatus(final STATUS status) {
        if (this.status == status) {
            return;
        }
        executeInLock(new Callable<Object, Object>() { // from class: com.arcadedb.server.ha.Leader2ReplicaNetworkExecutor.4
            public Object call(Object obj) {
                Leader2ReplicaNetworkExecutor.this.status = status;
                LogManager.instance().log(this, Level.INFO, "Replica server '%s' is %s", Leader2ReplicaNetworkExecutor.this.remoteServerName, status);
                Leader2ReplicaNetworkExecutor.this.leftOn = status == STATUS.OFFLINE ? 0L : System.currentTimeMillis();
                if (status == STATUS.ONLINE) {
                    Leader2ReplicaNetworkExecutor.this.joinedOn = System.currentTimeMillis();
                    Leader2ReplicaNetworkExecutor.this.leftOn = 0L;
                    return null;
                }
                if (status != STATUS.OFFLINE) {
                    return null;
                }
                Leader2ReplicaNetworkExecutor.this.leftOn = System.currentTimeMillis();
                Leader2ReplicaNetworkExecutor.this.close();
                return null;
            }
        });
        if (this.server.getServer().isStarted()) {
            this.server.printClusterConfiguration();
        }
    }

    public String getRemoteServerName() {
        return this.remoteServerName;
    }

    public String getRemoteServerAddress() {
        return this.remoteServerAddress;
    }

    public String getRemoteServerHTTPAddress() {
        return this.remoteServerHTTPAddress;
    }

    public long getJoinedOn() {
        return this.joinedOn;
    }

    public long getLeftOn() {
        return this.leftOn;
    }

    public void updateStats(long j, long j2) {
        this.totalMessages++;
        long j3 = j2 - j;
        this.latencyTotalTime += j3;
        if (this.latencyMin == -1 || j3 < this.latencyMin) {
            this.latencyMin = j3;
        }
        if (j3 > this.latencyMax) {
            this.latencyMax = j3;
        }
    }

    public STATUS getStatus() {
        return this.status;
    }

    public String getLatencyStats() {
        if (this.totalMessages == 0) {
            return "";
        }
        long j = this.latencyTotalTime / this.totalMessages;
        long j2 = this.latencyMin;
        long j3 = this.latencyMax;
        return "avg=" + j + " (min=" + j + " max=" + j2 + ")";
    }

    public String getThroughputStats() {
        return this.totalBytes == 0 ? "" : FileUtils.getSizeAsString(this.totalBytes) + " (" + FileUtils.getSizeAsString((int) ((this.totalBytes / (System.currentTimeMillis() - this.joinedOn)) * 1000.0d)) + "/s)";
    }

    public void sendMessage(Binary binary) throws IOException {
        synchronized (this.channelOutputLock) {
            ChannelBinaryServer channelBinaryServer = this.channel;
            if (channelBinaryServer == null) {
                close();
                throw new IOException("Channel closed");
            }
            channelBinaryServer.writeVarLengthBytes(binary.getContent(), binary.size());
            channelBinaryServer.flush();
        }
    }

    @Override // java.lang.Thread
    public String toString() {
        return this.remoteServerName;
    }

    protected Object executeInLock(Callable<Object, Object> callable) {
        Object call;
        synchronized (this.lock) {
            call = callable.call((Object) null);
        }
        return call;
    }
}
