package com.sugon.gsq.libraries.v532.kafka;

import cn.gsq.sdp.Blueprint;
import cn.gsq.sdp.RpcRespond;
import cn.gsq.sdp.core.AbstractHost;
import cn.gsq.sdp.core.AbstractProcess;
import cn.gsq.sdp.core.AbstractServe;
import cn.gsq.sdp.core.ClassifyHandler;
import cn.gsq.sdp.core.ServeHandler;
import cn.gsq.sdp.core.annotation.Available;
import cn.gsq.sdp.core.annotation.Function;
import cn.gsq.sdp.core.annotation.Serve;
import cn.hutool.core.collection.CollUtil;
import com.google.common.base.Joiner;
import com.sugon.gsq.libraries.utils.HostUtil;
import com.sugon.gsq.libraries.v532.SdpHost532Impl;
import com.sugon.gsq.libraries.v532.SdpKerbyIFace;
import com.sugon.gsq.libraries.v532.zookeeper.Zookeeper;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

@Serve(version = "2.4.1", type = ClassifyHandler.BIGDATA, handler = ServeHandler.MASTER_ELECTION_MODE, depends = {Zookeeper.class}, labels = {"消息队列", "中间件"}, pkg = "kafka", description = "高吞吐量无事物型消息队列", order = 9)
/* loaded from: input_file:com/sugon/gsq/libraries/v532/kafka/Kafka.class */
public class Kafka extends AbstractServe {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(Kafka.class);

    @Autowired
    SdpKerbyIFace extraIFace;
    private final String rangerKey = "authorizer.class.name";
    private final String rangerValue = "org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer";

    protected void initServe(Blueprint.Serve serve) {
        this.extraIFace.createNormalUser("kafka");
        for (AbstractHost abstractHost : this.sdpManager.getHostManager().getHosts()) {
            this.extraIFace.createMachineUser("kafka", abstractHost.getHostname());
            ((SdpHost532Impl) this.sdpManager.getExpectHostByName(abstractHost.getName())).createKeytab(HostUtil.getHostname(), "kafka", abstractHost.getHostname(), "9007");
        }
    }

    public synchronized void restart() {
        getProcessByName("Broker").restart();
    }

    protected void extendProperties(Map<String, String> map) {
        Map configDefaultContentToMap = getConfigDefaultContentToMap("server.properties");
        map.put("地址", Joiner.on(",").join((Iterable) getProcessByName("Broker").getHosts().stream().map(abstractHost -> {
            return abstractHost.getName() + ":9092";
        }).collect(Collectors.toList())));
        map.put("ZK地址", (String) configDefaultContentToMap.get("zookeeper.connect"));
        map.put("kerberos认证", "开启");
        if (Objects.equals(configDefaultContentToMap.get("authorizer.class.name"), "org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer")) {
            map.put("权限管理", "开启");
        } else {
            map.put("权限管理", "关闭");
        }
        map.put("写入线程", (String) configDefaultContentToMap.get("num.network.threads"));
        map.put("落盘线程", (String) configDefaultContentToMap.get("num.io.threads"));
        map.put("topc切片", (String) configDefaultContentToMap.get("num.partitions"));
        map.put("数据时限", ((String) configDefaultContentToMap.get("log.retention.hours")) + "小时");
        map.put("数据目录", (String) configDefaultContentToMap.get("log.dirs"));
    }

    @Function(id = "ACTIVEAUTHORITY", name = "开启权限管理")
    public void activeAuthority() {
        AbstractServe serveByName = this.sdpManager.getServeByName("Ranger");
        if (!serveByName.isInstalled() || !serveByName.isAvailable()) {
            throw new RuntimeException("ranger服务异常!");
        }
        AbstractProcess processByName = getProcessByName("Broker");
        String name = ((AbstractHost) CollUtil.getFirst(serveByName.getProcessByName("RangerAdmin").getHosts())).getName();
        processByName.stop();
        Iterator it = getProcessByNameForImpl(processByName.getName()).getHosts().iterator();
        while (it.hasNext()) {
            if (((SdpHost532Impl) it.next()).kafkaOpenRanger(name)) {
                HashMap hashMap = new HashMap();
                hashMap.put("authorizer.class.name", "org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer");
                updateConfigDefault("server.properties", hashMap);
            }
        }
        processByName.start();
        createPlugInRanger();
    }

    private void createPlugInRanger() {
        HashMap hashMap = new HashMap();
        String str = ((String) this.sdpManager.getServeByName("Zookeeper").getHosts().parallelStream().map(abstractHost -> {
            return abstractHost.getHostname() + ":2181";
        }).collect(Collectors.joining(","))) + "/kafka";
        hashMap.put("username", "kafka");
        hashMap.put("password", "admin1234@sugon");
        hashMap.put("zookeeper.connect", str);
        hashMap.put("commonNameForCertificate", "");
        hashMap.put("policy.download.auth.users", "kafka,hdfs");
        log.info(this.extraIFace.createPlugInRanger("kafka", hashMap));
    }

    @Available(fid = "ACTIVEAUTHORITY")
    public boolean isActiveOn() {
        AbstractServe serveByName = this.sdpManager.getServeByName("Ranger");
        return serveByName.isInstalled() && serveByName.isAvailable() && !Objects.equals(getConfigDefaultContentToMap("server.properties").get("authorizer.class.name"), "org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer");
    }

    @Function(id = "INACTIVEAUTHORITY", name = "关闭权限管理")
    public void inactiveAuthority() {
        AbstractServe serveByName = this.sdpManager.getServeByName("Ranger");
        if (!serveByName.isInstalled() || !serveByName.isAvailable()) {
            throw new RuntimeException("ranger服务异常!");
        }
        AbstractProcess processByName = getProcessByName("Broker");
        processByName.stop();
        Iterator it = getProcessByNameForImpl(processByName.getName()).getHosts().iterator();
        while (it.hasNext()) {
            if (((SdpHost532Impl) it.next()).kafkaCloseRanger()) {
                Map configDefaultContentToMap = getConfigDefaultContentToMap("server.properties");
                configDefaultContentToMap.remove("authorizer.class.name");
                getConfigByName("server.properties").updateDefaultConfig(configDefaultContentToMap);
            }
        }
        processByName.start();
        log.info(this.extraIFace.removePlugInRanger("kafka"));
    }

    @Available(fid = "INACTIVEAUTHORITY")
    public boolean isInActiveOn() {
        AbstractServe serveByName = this.sdpManager.getServeByName("Ranger");
        if (serveByName.isInstalled() && serveByName.isAvailable()) {
            return Objects.equals(getConfigDefaultContentToMap("server.properties").get("authorizer.class.name"), "org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer");
        }
        return false;
    }

    public RpcRespond<String> isServeAvailable() {
        for (AbstractHost abstractHost : getHosts()) {
            if (abstractHost.isHostActive()) {
                return ((SdpHost532Impl) this.sdpManager.getExpectHostByName(abstractHost.getHostname())).checkServe("kafka", abstractHost.getHostname() + ":" + getProcessByName("Broker").getPort());
            }
        }
        return new RpcRespond<>(false, "节点均不可用", "节点均不可用");
    }

    protected void callbackServe() {
        AbstractServe serveByName = this.sdpManager.getServeByName("Ranger");
        if (serveByName.isInstalled() && serveByName.isAvailable()) {
            this.extraIFace.rangerSync("http://" + ((AbstractHost) CollUtil.getFirst(serveByName.getProcessByName("RangerAdmin").getHosts())).getName() + ":6080", "admin", "admin1234@sugon");
        }
    }
}
