package com.sugon.gsq.libraries.v532.kafka.process;

import cn.gsq.sdp.Respond;
import cn.gsq.sdp.RunLogLevel;
import cn.gsq.sdp.core.AbstractHost;
import cn.gsq.sdp.core.AbstractProcess;
import cn.gsq.sdp.core.ProcessHandler;
import cn.gsq.sdp.core.annotation.Function;
import cn.gsq.sdp.core.annotation.Group;
import cn.gsq.sdp.core.annotation.Process;
import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.core.util.StrUtil;
import com.sugon.gsq.libraries.utils.ExtraIFace;
import com.sugon.gsq.libraries.utils.HostUtil;
import com.sugon.gsq.libraries.v532.SCIsolateMode;
import com.sugon.gsq.libraries.v532.SdpHost532Impl;
import com.sugon.gsq.libraries.v532.kafka.Kafka;
import java.util.Iterator;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

@Process(master = Kafka.class, handler = ProcessHandler.SLAVE, groups = {@Group(mode = SCIsolateMode.class, name = "DATA")}, mark = "Kafka", home = "/kafka", start = "./bin/kafka-start.sh", stop = "./bin/kafka-stop.sh", dynamic = true, description = "Kafka服务进程", order = 1, min = 3)
/* loaded from: input_file:com/sugon/gsq/libraries/v532/kafka/process/Broker.class */
public class Broker extends AbstractProcess<SdpHost532Impl> {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(Broker.class);

    @Autowired
    ExtraIFace extraIFace;

    protected void initProcess() {
        String brokerDirs = getBrokerDirs();
        getHosts().parallelStream().forEach(sdpHost532Impl -> {
            installBroker(sdpHost532Impl, brokerDirs);
        });
    }

    public Integer getPort() {
        return 9092;
    }

    protected void reset() {
        String brokerDirs = getBrokerDirs();
        getHosts().parallelStream().forEach(sdpHost532Impl -> {
            uninstallBroker(sdpHost532Impl, brokerDirs);
        });
    }

    protected void extend(AbstractHost abstractHost) {
        this.logDriver.log(RunLogLevel.INFO, "创建机器用户kafka...");
        getServe();
        this.extraIFace.createMachineUser("kafka", abstractHost.getHostname());
        SdpHost532Impl sdpHost532Impl = (SdpHost532Impl) this.sdpManager.getExpectHostByName(abstractHost.getHostname());
        this.logDriver.log(RunLogLevel.INFO, "安装kerby...");
        sdpHost532Impl.installKerby();
        sdpHost532Impl.createKeytab(HostUtil.getHostname(), "kafka", abstractHost.getHostname(), "9007");
    }

    @Function(name = "重启", id = "restart")
    public synchronized void restart() {
        for (AbstractHost abstractHost : getHosts()) {
            if (abstractHost.isProcessActive(this)) {
                abstractHost.stopProcess(this);
            }
        }
        ThreadUtil.safeSleep(3000L);
        Iterator it = getHosts().iterator();
        while (it.hasNext()) {
            ((AbstractHost) it.next()).startProcess(this);
            ThreadUtil.safeSleep(1000L);
        }
    }

    private String getBrokerDirs() {
        return StrUtil.replace((String) getServe().getConfigByName("server.properties").getDefaultBranchContent().get("log.dirs"), ",", " ");
    }

    private Respond installBroker(SdpHost532Impl sdpHost532Impl, String str) {
        Respond success = Respond.success();
        sdpHost532Impl.installBroker(str);
        return success;
    }

    private Respond uninstallBroker(SdpHost532Impl sdpHost532Impl, String str) {
        Respond success = Respond.success();
        sdpHost532Impl.uninstallBroker(str);
        return success;
    }

    protected String getLogFilePath() {
        return getHome() + "/logs";
    }

    protected String getLogFileName(String str) {
        return "controller.log";
    }
}
