package com.sugon.gsq.libraries.v530.kafka.process;

import cn.gsq.sdp.Respond;
import cn.gsq.sdp.RunLogLevel;
import cn.gsq.sdp.core.AbstractHost;
import cn.gsq.sdp.core.AbstractProcess;
import cn.gsq.sdp.core.AbstractServe;
import cn.gsq.sdp.core.HostGroupHandler;
import cn.gsq.sdp.core.ProcessHandler;
import cn.gsq.sdp.core.annotation.Function;
import cn.gsq.sdp.core.annotation.Process;
import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.core.util.StrUtil;
import com.sugon.gsq.libraries.v530.SdpHost530Impl;
import com.sugon.gsq.libraries.v530.kafka.Kafka;
import java.util.Iterator;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Process(master = Kafka.class, handler = ProcessHandler.SLAVE, group = HostGroupHandler.DATA, mark = "Kafka", home = "/kafka", start = "./bin/nokrb5-start.sh", stop = "./bin/kafka-stop.sh", dynamic = true, description = "Kafka服务进程", order = 1, min = 3)
/* loaded from: input_file:com/sugon/gsq/libraries/v530/kafka/process/Broker.class */
public class Broker extends AbstractProcess<SdpHost530Impl> {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(Broker.class);

    protected void initProcess(AbstractProcess<SdpHost530Impl> abstractProcess) {
        String brokerDirs = getBrokerDirs();
        abstractProcess.getHosts().parallelStream().forEach(sdpHost530Impl -> {
            installBroker(sdpHost530Impl, brokerDirs);
        });
    }

    public Integer getPort() {
        return 9092;
    }

    protected void recover(AbstractProcess<SdpHost530Impl> abstractProcess) {
        String brokerDirs = getBrokerDirs();
        abstractProcess.getHosts().parallelStream().forEach(sdpHost530Impl -> {
            uninstallBroker(sdpHost530Impl, brokerDirs);
        });
    }

    protected void extend(AbstractHost abstractHost) {
        AbstractServe serve = getServe();
        SdpHost530Impl sdpHost530Impl = (SdpHost530Impl) this.sdpManager.getExpectHostByName(abstractHost.getHostname());
        this.logDriver.log(RunLogLevel.INFO, "下载kafka...");
        sdpHost530Impl.downloadResource("kafka");
        this.logDriver.log(RunLogLevel.INFO, "修改配置文件...");
        serve.getAllConfigs().forEach(abstractConfig -> {
            abstractConfig.addBranchHostsByDefault(new String[]{abstractHost.getHostname()});
        });
        this.logDriver.log(RunLogLevel.INFO, "添加进主机列表...");
        addHosts(new String[]{abstractHost.getHostname()});
        this.logDriver.log(RunLogLevel.INFO, "扩容结束");
    }

    protected void shorten(AbstractHost abstractHost) {
        AbstractServe serve = getServe();
        this.logDriver.log(RunLogLevel.INFO, "停止进程Broker...");
        ((SdpHost530Impl) this.sdpManager.getExpectHostByName(abstractHost.getHostname())).stopProcess(serve.getProcessByName("Broker"));
        this.logDriver.log(RunLogLevel.INFO, "修改配置文件...");
        serve.getAllConfigs().forEach(abstractConfig -> {
            abstractConfig.delBranchHostsByDefault(new String[]{abstractHost.getHostname()});
        });
        this.logDriver.log(RunLogLevel.INFO, "从主机列表剔除...");
        deleteHosts(new String[]{abstractHost.getHostname()});
        ThreadUtil.safeSleep(1000L);
        this.logDriver.log(RunLogLevel.INFO, "缩容结束");
    }

    @Function(name = "重启", id = "restart")
    public synchronized void restart() {
        Iterator it = getHosts().iterator();
        while (it.hasNext()) {
            ((AbstractHost) it.next()).stopProcess(this);
        }
        ThreadUtil.safeSleep(3000L);
        Iterator it2 = getHosts().iterator();
        while (it2.hasNext()) {
            ((AbstractHost) it2.next()).startProcess(this);
            ThreadUtil.safeSleep(1000L);
        }
    }

    private String getBrokerDirs() {
        return StrUtil.replace((String) getServe().getConfigByName("server.properties").getDefaultBranchContent().get("log.dirs"), ",", " ");
    }

    private Respond installBroker(SdpHost530Impl sdpHost530Impl, String str) {
        Respond success = Respond.success();
        sdpHost530Impl.installBroker(str);
        return success;
    }

    private Respond uninstallBroker(SdpHost530Impl sdpHost530Impl, String str) {
        Respond success = Respond.success();
        sdpHost530Impl.uninstallBroker(str);
        return success;
    }

    protected String getLogFilePath() {
        return getHome() + "/logs";
    }

    protected String getLogFileName(String str) {
        return "controller.log";
    }
}
