package com.arcadedb.server.ha;

import com.arcadedb.database.Binary;
import com.arcadedb.database.DatabaseContext;
import com.arcadedb.database.DatabaseFactory;
import com.arcadedb.engine.ComponentFile;
import com.arcadedb.log.LogManager;
import com.arcadedb.network.HostUtil;
import com.arcadedb.network.binary.ChannelBinaryClient;
import com.arcadedb.network.binary.ConnectionException;
import com.arcadedb.network.binary.NetworkProtocolException;
import com.arcadedb.network.binary.ServerIsNotTheLeaderException;
import com.arcadedb.server.ReplicationCallback;
import com.arcadedb.server.ServerDatabase;
import com.arcadedb.server.ServerException;
import com.arcadedb.server.ha.message.DatabaseStructureRequest;
import com.arcadedb.server.ha.message.DatabaseStructureResponse;
import com.arcadedb.server.ha.message.FileContentRequest;
import com.arcadedb.server.ha.message.FileContentResponse;
import com.arcadedb.server.ha.message.HACommand;
import com.arcadedb.server.ha.message.ReplicaConnectFullResyncResponse;
import com.arcadedb.server.ha.message.ReplicaConnectRequest;
import com.arcadedb.server.ha.message.ReplicaReadyRequest;
import com.arcadedb.server.ha.message.TxRequest;
import com.arcadedb.utility.FileUtils;
import com.arcadedb.utility.Pair;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.logging.Level;

/* loaded from: input_file:com/arcadedb/server/ha/Replica2LeaderNetworkExecutor.class */
public class Replica2LeaderNetworkExecutor extends Thread {
    private final HAServer server;
    private String host;
    private int port;
    private String leaderServerHTTPAddress;
    private ChannelBinaryClient channel;
    private String leaderServerName = "?";
    private volatile boolean shutdown = false;
    private final Object channelOutputLock = new Object();
    private final Object channelInputLock = new Object();
    private long installDatabaseLastLogNumber = -1;

    public Replica2LeaderNetworkExecutor(HAServer hAServer, String str, int i) {
        this.server = hAServer;
        this.host = str;
        this.port = i;
        connect();
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        byte[] receiveResponse;
        LogManager.instance().setContext(this.server.getServer().getServerName());
        Binary binary = new Binary(8192);
        binary.setAllocationChunkSize(1024);
        long j = -1;
        while (!this.shutdown) {
            try {
                receiveResponse = receiveResponse();
            } catch (SocketTimeoutException e) {
            } catch (Exception e2) {
                LogManager.instance().log(this, Level.INFO, "Exception during execution of request %d (shutdown=%s name=%s error=%s)", -1L, Boolean.valueOf(this.shutdown), getName(), e2.toString());
                reconnect(e2);
            }
            if (this.shutdown) {
                break;
            }
            Pair<ReplicationMessage, HACommand> deserializeCommand = this.server.getMessageFactory().deserializeCommand(binary, receiveResponse);
            if (deserializeCommand == null) {
                LogManager.instance().log(this, Level.SEVERE, "Error on receiving message NULL, reconnecting (threadId=%d)", Long.valueOf(Thread.currentThread().threadId()));
                reconnect(null);
            } else {
                ReplicationMessage replicationMessage = (ReplicationMessage) deserializeCommand.getFirst();
                long j2 = replicationMessage.messageNumber;
                j = j2;
                if (j2 > -1) {
                    LogManager.instance().log(this, Level.FINE, "Received request %d from the Leader (threadId=%d)", Long.valueOf(j2), Long.valueOf(Thread.currentThread().threadId()));
                } else {
                    LogManager.instance().log(this, Level.FINE, "Received response %d from the Leader (threadId=%d)", Long.valueOf(j2), Long.valueOf(Thread.currentThread().threadId()));
                }
                if (j2 > -1) {
                    long lastMessageNumber = this.server.getReplicationLogFile().getLastMessageNumber();
                    if (j2 <= lastMessageNumber) {
                        LogManager.instance().log(this, Level.FINE, "Message %d already applied on local server (last=%d). Skip this", Long.valueOf(j2), Long.valueOf(lastMessageNumber));
                    } else if (!this.server.getReplicationLogFile().checkMessageOrder(replicationMessage)) {
                        closeChannel();
                        connect();
                        startup();
                    }
                }
                if (this.installDatabaseLastLogNumber > -1 && (deserializeCommand.getSecond() instanceof TxRequest)) {
                    ((TxRequest) deserializeCommand.getSecond()).installDatabaseLastLogNumber = this.installDatabaseLastLogNumber;
                }
                HACommand execute = ((HACommand) deserializeCommand.getSecond()).execute(this.server, this.leaderServerName, j2);
                if (j2 <= -1 || this.server.getReplicationLogFile().appendMessage(replicationMessage)) {
                    this.server.getServer().lifecycleEvent(ReplicationCallback.TYPE.REPLICA_MSG_RECEIVED, deserializeCommand);
                    if (execute != null) {
                        sendCommandToLeader(binary, execute, j2);
                    }
                } else {
                    closeChannel();
                    connect();
                    startup();
                }
            }
        }
        LogManager.instance().log(this, Level.INFO, "Replica message thread closed (shutdown=%s name=%s threadId=%d lastReqId=%d)", Boolean.valueOf(this.shutdown), getName(), Long.valueOf(Thread.currentThread().threadId()), Long.valueOf(j));
    }

    public String getRemoteServerName() {
        return this.leaderServerName;
    }

    public String getRemoteAddress() {
        return this.host + ":" + this.port;
    }

    private void reconnect(Exception exc) {
        if (Thread.currentThread().isInterrupted()) {
            shutdown();
        }
        if (this.shutdown) {
            return;
        }
        closeChannel();
        if (this.server.getLeader() != this) {
            LogManager.instance().log(this, Level.SEVERE, "Removing connection to the previous Leader ('%s'). New Leader is: %s", getRemoteServerName(), this.server.getLeader().getRemoteServerName());
            close();
            return;
        }
        LogManager.instance().log(this, Level.FINE, "Error on communication between current replica and the Leader ('%s'), reconnecting... (error=%s)", getRemoteServerName(), exc);
        if (this.shutdown) {
            return;
        }
        try {
            connect();
            startup();
        } catch (Exception e) {
            LogManager.instance().log(this, Level.SEVERE, "Error on re-connecting to the Leader ('%s') (error=%s)", getRemoteServerName(), e);
            HashSet hashSet = new HashSet(Arrays.asList(this.server.getServerAddressList().split(",")));
            for (int i = 0; i < 3 && !this.shutdown && !hashSet.isEmpty(); i++) {
                Iterator it = hashSet.iterator();
                while (it.hasNext()) {
                    String str = (String) it.next();
                    try {
                    } catch (Exception e2) {
                        LogManager.instance().log(this, Level.SEVERE, "Error on re-connecting to the server '%s' (error=%s)", getRemoteAddress(), e2);
                    }
                    if (!this.server.isCurrentServer(str)) {
                        String[] parseHostAddress = HostUtil.parseHostAddress(str, HAServer.DEFAULT_PORT);
                        this.host = parseHostAddress[0];
                        this.port = Integer.parseInt(parseHostAddress[1]);
                        connect();
                        startup();
                        return;
                    }
                }
                try {
                    Thread.sleep(2000L);
                    hashSet = new HashSet(Arrays.asList(this.server.getServerAddressList().split(",")));
                } catch (InterruptedException e3) {
                    Thread.currentThread().interrupt();
                    this.shutdown = true;
                    return;
                }
            }
            this.server.startElection(true);
        }
    }

    public void sendCommandToLeader(Binary binary, HACommand hACommand, long j) throws IOException {
        if (j > -1) {
            LogManager.instance().log(this, Level.FINE, "Sending message (response to %d) to the Leader '%s'...", Long.valueOf(j), hACommand);
        } else {
            LogManager.instance().log(this, Level.FINE, "Sending message (request %d) to the Leader '%s'...", Long.valueOf(j), hACommand);
        }
        this.server.getMessageFactory().serializeCommand(hACommand, binary, j);
        synchronized (this.channelOutputLock) {
            ChannelBinaryClient channelBinaryClient = this.channel;
            if (channelBinaryClient == null) {
                throw new ReplicationException("Error on sending command back to the leader server '" + this.leaderServerName + "' (cause=socket closed)");
            }
            channelBinaryClient.writeVarLengthBytes(binary.getContent(), binary.size());
            channelBinaryClient.flush();
        }
    }

    public void close() {
        shutdown();
        closeChannel();
    }

    public void kill() {
        shutdown();
        interrupt();
        close();
        try {
            join();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public void closeChannel() {
        ChannelBinaryClient channelBinaryClient = this.channel;
        if (channelBinaryClient != null) {
            channelBinaryClient.close();
            this.channel = null;
        }
    }

    public String getRemoteHTTPAddress() {
        return this.leaderServerHTTPAddress;
    }

    @Override // java.lang.Thread
    public String toString() {
        return this.leaderServerName;
    }

    private byte[] receiveResponse() throws IOException {
        byte[] readBytes;
        synchronized (this.channelInputLock) {
            readBytes = this.channel.readBytes();
        }
        return readBytes;
    }

    public void connect() {
        LogManager.instance().log(this, Level.FINE, "Connecting to server %s:%d...", this.host, Integer.valueOf(this.port));
        try {
            this.channel = this.server.createNetworkConnection(this.host, this.port, (short) 0);
            this.channel.flush();
            synchronized (this.channelInputLock) {
                if (!this.channel.readBoolean()) {
                    byte readByte = this.channel.readByte();
                    String readString = this.channel.readString();
                    switch (readByte) {
                        case 0:
                            String readString2 = this.channel.readString();
                            String readString3 = this.channel.readString();
                            LogManager.instance().log(this, Level.INFO, "Cannot accept incoming connections: remote server is not a Leader, connecting to the current Leader '%s' (%s)", readString2, readString3);
                            closeChannel();
                            throw new ServerIsNotTheLeaderException("Remote server is not a Leader, connecting to the current Leader '" + readString2 + "' (" + readString3 + ")", readString3);
                        case 1:
                            LogManager.instance().log(this, Level.INFO, "Cannot accept incoming connections: an election for the Leader server is in progress");
                            closeChannel();
                            throw new ReplicationException("An election for the Leader server is pending");
                        case 2:
                            LogManager.instance().log(this, Level.INFO, "Cannot accept incoming connections: remote server does not support protocol %d", (short) 0);
                            break;
                        case ReplicationProtocol.ERROR_CONNECT_WRONGCLUSTERNAME /* 3 */:
                            LogManager.instance().log(this, Level.INFO, "Cannot accept incoming connections: remote server joined a different cluster than '%s'", this.server.getClusterName());
                            break;
                        case ReplicationProtocol.ERROR_CONNECT_SAME_SERVERNAME /* 4 */:
                            LogManager.instance().log(this, Level.INFO, "Cannot accept incoming connections: remote server has the same name as the local server '%s'", this.server.getServerName());
                            break;
                        default:
                            LogManager.instance().log(this, Level.INFO, "Cannot accept incoming connections: unknown reason code '%s'", Byte.valueOf(readByte));
                            break;
                    }
                    closeChannel();
                    throw new ConnectionException(this.host + ":" + this.port, readString);
                }
                this.leaderServerName = this.channel.readString();
                long readLong = this.channel.readLong();
                this.leaderServerHTTPAddress = this.channel.readString();
                String readString4 = this.channel.readString();
                this.server.lastElectionVote = new Pair<>(Long.valueOf(readLong), this.leaderServerName);
                this.server.setServerAddresses(readString4);
            }
        } catch (Exception e) {
            LogManager.instance().log(this, Level.FINE, "Error on connecting to the server %s:%d (cause=%s)", this.host, Integer.valueOf(this.port), e.toString());
            throw new ConnectionException(this.host + ":" + this.port, e);
        }
    }

    public void startup() {
        LogManager.instance().log(this, Level.INFO, "Server connected to the Leader server %s:%d, members=[%s]", this.host, Integer.valueOf(this.port), this.server.getServerAddressList());
        setName(this.server.getServerName() + " replica2leader<-" + getRemoteServerName());
        LogManager.instance().log(this, Level.INFO, "Server started as Replica in HA mode (cluster=%s leader=%s:%d)", this.server.getClusterName(), this.host, Integer.valueOf(this.port));
        installDatabases();
    }

    private void installDatabases() {
        Binary binary = new Binary(8192);
        binary.setAllocationChunkSize(1024);
        long lastMessageNumber = this.server.getReplicationLogFile().getLastMessageNumber();
        LogManager.instance().log(this, Level.INFO, "Requesting install of databases up to log %d...", Long.valueOf(lastMessageNumber));
        try {
            sendCommandToLeader(binary, new ReplicaConnectRequest(lastMessageNumber), -1L);
            HACommand receiveCommandFromLeaderDuringJoin = receiveCommandFromLeaderDuringJoin(binary);
            if (receiveCommandFromLeaderDuringJoin instanceof ReplicaConnectFullResyncResponse) {
                LogManager.instance().log(this, Level.INFO, "Asking for a full resync...");
                this.server.getServer().lifecycleEvent(ReplicationCallback.TYPE.REPLICA_FULL_RESYNC, null);
                Iterator<String> it = ((ReplicaConnectFullResyncResponse) receiveCommandFromLeaderDuringJoin).getDatabases().iterator();
                while (it.hasNext()) {
                    requestInstallDatabase(binary, it.next());
                }
            } else {
                LogManager.instance().log(this, Level.INFO, "Receiving hot resync (from=%d)...", Long.valueOf(lastMessageNumber));
                this.server.getServer().lifecycleEvent(ReplicationCallback.TYPE.REPLICA_HOT_RESYNC, null);
            }
            sendCommandToLeader(binary, new ReplicaReadyRequest(), -1L);
        } catch (Exception e) {
            shutdown();
            LogManager.instance().log(this, Level.SEVERE, "Error starting HA service (error=%s)", e, e.getMessage());
            throw new ServerException("Cannot start HA service", e);
        }
    }

    public void requestInstallDatabase(Binary binary, String str) throws IOException {
        sendCommandToLeader(binary, new DatabaseStructureRequest(str), -1L);
        DatabaseStructureResponse databaseStructureResponse = (DatabaseStructureResponse) receiveCommandFromLeaderDuringJoin(binary);
        this.server.getReplicationLogFile().setLastMessageNumber(databaseStructureResponse.getCurrentLogNumber());
        ServerDatabase orCreateDatabase = this.server.getServer().getOrCreateDatabase(str);
        FileWriter fileWriter = new FileWriter(orCreateDatabase.getDatabasePath() + File.separator + "schema.json", DatabaseFactory.getDefaultCharset());
        try {
            fileWriter.write(databaseStructureResponse.getSchemaJson());
            fileWriter.close();
            long j = 0;
            ArrayList arrayList = new ArrayList(databaseStructureResponse.getFileNames().entrySet());
            for (int i = 0; i < arrayList.size(); i++) {
                Map.Entry entry = (Map.Entry) arrayList.get(i);
                try {
                    j += installFile(binary, str, ((Integer) entry.getKey()).intValue(), (String) entry.getValue(), 0, -1);
                } catch (Exception e) {
                    LogManager.instance().log(this, Level.SEVERE, "Error on installing file '%s' (%s %d/%d files)", e, entry.getKey(), FileUtils.getSizeAsString(j), Integer.valueOf(i), Integer.valueOf(arrayList.size()));
                    orCreateDatabase.getEmbedded().drop();
                    throw new ReplicationException("Error on installing database '" + str + "'", e);
                }
            }
            sendCommandToLeader(binary, new DatabaseStructureRequest(str), -1L);
            this.installDatabaseLastLogNumber = ((DatabaseStructureResponse) receiveCommandFromLeaderDuringJoin(binary)).getCurrentLogNumber();
            orCreateDatabase.getSchema().getEmbedded().close();
            DatabaseContext.INSTANCE.init(orCreateDatabase);
            orCreateDatabase.getSchema().getEmbedded().load(ComponentFile.MODE.READ_WRITE, true);
            LogManager.instance().log(this, Level.INFO, "Database '%s' installed from the cluster (%s - %d files lastLogNumber=%d)", (Throwable) null, str, FileUtils.getSizeAsString(j), Integer.valueOf(arrayList.size()), Long.valueOf(this.installDatabaseLastLogNumber));
        } catch (Throwable th) {
            try {
                fileWriter.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    private long installFile(Binary binary, String str, int i, String str2, int i2, int i3) throws IOException {
        int i4 = i2;
        LogManager.instance().log(this, Level.FINE, "Installing file '%s'...", str2);
        int i5 = 0;
        long j = 0;
        while (true) {
            sendCommandToLeader(binary, new FileContentRequest(str, i, i4, i3), -1L);
            FileContentResponse fileContentResponse = (FileContentResponse) receiveCommandFromLeaderDuringJoin(binary);
            j += fileContentResponse.getPagesContent().size();
            fileContentResponse.execute(this.server, null, -1L);
            if (fileContentResponse.getPages() == 0) {
                break;
            }
            i5 += fileContentResponse.getPages();
            if (fileContentResponse.isLast()) {
                break;
            }
            i4 += fileContentResponse.getPages();
        }
        LogManager.instance().log(this, Level.FINE, "File '%s' installed (pagesWritten=%d size=%s)", str2, Integer.valueOf(i5), FileUtils.getSizeAsString(j));
        return j;
    }

    private HACommand receiveCommandFromLeaderDuringJoin(Binary binary) throws IOException {
        byte[] receiveResponse = receiveResponse();
        Pair<ReplicationMessage, HACommand> deserializeCommand = this.server.getMessageFactory().deserializeCommand(binary, receiveResponse);
        if (deserializeCommand == null) {
            throw new NetworkProtocolException("Error on reading response, message " + receiveResponse[0] + " not valid");
        }
        return (HACommand) deserializeCommand.getSecond();
    }

    private void shutdown() {
        LogManager.instance().log(this, Level.FINE, "Shutting down thread %s (id=%d)...", getName(), Long.valueOf(getId()));
        this.shutdown = true;
    }
}
