package com.sugon.gsq.libraries.v531.kafka.process;

import cn.gsq.sdp.Respond;
import cn.gsq.sdp.core.AbstractHost;
import cn.gsq.sdp.core.AbstractProcess;
import cn.gsq.sdp.core.AbstractServe;
import cn.gsq.sdp.core.HostGroupHandler;
import cn.gsq.sdp.core.ProcessHandler;
import cn.gsq.sdp.core.annotation.Available;
import cn.gsq.sdp.core.annotation.Function;
import cn.gsq.sdp.core.annotation.Process;
import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.core.util.StrUtil;
import com.sugon.gsq.libraries.utils.ExtraIFace;
import com.sugon.gsq.libraries.utils.HostUtil;
import com.sugon.gsq.libraries.v531.SdpHost531Impl;
import com.sugon.gsq.libraries.v531.kafka.Kafka;
import java.util.Iterator;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

@Process(master = Kafka.class, handler = ProcessHandler.SLAVE, group = HostGroupHandler.DATA, mark = "Kafka", home = "/kafka", start = "./bin/kafka-start.sh", stop = "./bin/kafka-stop.sh", dynamic = true, description = "Kafka服务进程", order = 1, min = 3)
/* loaded from: input_file:com/sugon/gsq/libraries/v531/kafka/process/Broker.class */
public class Broker extends AbstractProcess<SdpHost531Impl> {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(Broker.class);

    @Autowired
    ExtraIFace extraIFace;

    protected void initProcess(AbstractProcess<SdpHost531Impl> abstractProcess) {
        String brokerDirs = getBrokerDirs();
        abstractProcess.getHosts().parallelStream().forEach(sdpHost531Impl -> {
            installBroker(sdpHost531Impl, brokerDirs);
        });
    }

    public Integer getPort() {
        return 9092;
    }

    protected void recover(AbstractProcess<SdpHost531Impl> abstractProcess) {
        String brokerDirs = getBrokerDirs();
        abstractProcess.getHosts().parallelStream().forEach(sdpHost531Impl -> {
            uninstallBroker(sdpHost531Impl, brokerDirs);
        });
    }

    protected void extend(AbstractHost abstractHost) {
    }

    protected void shorten(AbstractHost abstractHost) {
    }

    @Function(name = "重启", id = "restart")
    public synchronized void restart() {
        Iterator it = getHosts().iterator();
        while (it.hasNext()) {
            ((AbstractHost) it.next()).stopProcess(this);
        }
        ThreadUtil.safeSleep(3000L);
        Iterator it2 = getHosts().iterator();
        while (it2.hasNext()) {
            ((AbstractHost) it2.next()).startProcess(this);
            ThreadUtil.safeSleep(1000L);
        }
    }

    private String getBrokerDirs() {
        return StrUtil.replace((String) getServe().getConfigByName("server.properties").getDefaultBranchContent().get("log.dirs"), ",", " ");
    }

    private Respond installBroker(SdpHost531Impl sdpHost531Impl, String str) {
        Respond success = Respond.success();
        sdpHost531Impl.installBroker(str);
        return success;
    }

    private Respond uninstallBroker(SdpHost531Impl sdpHost531Impl, String str) {
        Respond success = Respond.success();
        sdpHost531Impl.uninstallBroker(str);
        return success;
    }

    public boolean isExistExtend() {
        return true;
    }

    @Function(id = "EXPAND", name = "服务扩容")
    public void expandServer(String str) {
        AbstractServe serve = getServe();
        stop();
        for (String str2 : str.split(",")) {
            this.extraIFace.createMachineUser("yarn", str2);
            SdpHost531Impl sdpHost531Impl = (SdpHost531Impl) this.sdpManager.getExpectHostByName(str2);
            sdpHost531Impl.installKerby();
            sdpHost531Impl.createKeytab(HostUtil.getHostname(), "kafka", str2, "9007");
            sdpHost531Impl.downloadResource("kafka");
        }
        serve.getAllConfigs().forEach(abstractConfig -> {
            abstractConfig.addBranchHostsByDefault(str.split(","));
        });
        addHosts(str.split(","));
        start();
    }

    @Available(fid = "EXPAND")
    public boolean canExpand() {
        Set set = (Set) getServe().getProcessByName(getName()).getHosts().parallelStream().map((v0) -> {
            return v0.getName();
        }).collect(Collectors.toSet());
        if (!getExcludes().isEmpty()) {
            getExcludes().parallelStream().forEach(abstractProcess -> {
                set.addAll((Set) abstractProcess.getHosts().parallelStream().map((v0) -> {
                    return v0.getName();
                }).collect(Collectors.toSet()));
            });
        }
        Set set2 = (Set) this.sdpManager.getHostManager().getHosts().parallelStream().map((v0) -> {
            return v0.getName();
        }).collect(Collectors.toSet());
        set2.removeAll(set);
        return !set2.isEmpty();
    }

    @Function(id = "SHRINK", name = "服务缩容")
    public void shrinkServer(String str) {
        AbstractServe serve = getServe();
        String[] split = str.split(",");
        for (String str2 : split) {
            ((SdpHost531Impl) this.sdpManager.getExpectHostByName(str2)).stopProcess(serve.getProcessByName("Broker"));
        }
        serve.getAllConfigs().forEach(abstractConfig -> {
            abstractConfig.delBranchHostsByDefault(split);
        });
        deleteHosts(split);
        ThreadUtil.safeSleep(1000L);
        serve.restart();
    }

    @Available(fid = "SHRINK")
    public boolean canShrink() {
        return ((Set) getServe().getProcessByName(getName()).getHosts().parallelStream().map((v0) -> {
            return v0.getName();
        }).collect(Collectors.toSet())).size() > getMin();
    }

    protected String getLogFilePath() {
        return getHome() + "/logs";
    }

    protected String getLogFileName(String str) {
        return "controller.log";
    }
}
