package org.normalization.client;

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.normalization.ClientMessage;
import org.normalization.Command;
import org.normalization.ConfigChangeRequest;
import org.normalization.ConfigChangeResponse;
import org.normalization.ConsumerManagementServiceGrpc;
import org.normalization.ExchangeConfig;
import org.normalization.HeartbeatMessage;
import org.normalization.ModuleConfig;
import org.normalization.ModuleDetail;
import org.normalization.ModuleStatus;
import org.normalization.NormalizationConfig;
import org.normalization.QueueConfig;
import org.normalization.RegisterRequest;
import org.normalization.RegisterResponse;
import org.normalization.ResourceMetrics;
import org.normalization.ServerMessage;
import org.normalization.StatusReport;
import org.normalization.SystemInfo;

/* loaded from: input_file:org/normalization/client/ConsumerManagementClient.class */
public class ConsumerManagementClient {
    private static final Logger logger = Logger.getLogger(ConsumerManagementClient.class.getName());
    private final ManagedChannel channel;
    private final ConsumerManagementServiceGrpc.ConsumerManagementServiceStub asyncStub;
    private ScheduledFuture<?> heartbeatTask;
    private final String instanceName;
    private final String ip;
    private final int port;
    private StreamObserver<ClientMessage> requestObserver;
    private volatile StatusReport lastStatusReport;
    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
    private final Map<String, ModuleConfig> modules = new ConcurrentHashMap();
    private final Map<String, ConfigChangeCallback> pendingConfigChanges = new ConcurrentHashMap();
    private int heartbeatIntervalSeconds = 15;
    private final AtomicBoolean connected = new AtomicBoolean(false);
    private volatile long connectionTime = 0;
    private volatile long lastHeartbeatTime = 0;
    private volatile long lastConnectionAttemptTime = 0;
    private final AtomicInteger reconnectAttempts = new AtomicInteger(0);
    private final List<ConnectionEventListener> connectionListeners = new CopyOnWriteArrayList();
    private String consumerId = UUID.randomUUID().toString();

    /* loaded from: input_file:org/normalization/client/ConsumerManagementClient$ConfigChangeCallback.class */
    public interface ConfigChangeCallback {
        void onComplete(boolean z, String str);
    }

    /* loaded from: input_file:org/normalization/client/ConsumerManagementClient$ConnectionEvent.class */
    public static class ConnectionEvent {
        private final EventType type;
        private final ConnectionEventListener.DisconnectReason reason;
        private final long timestamp;

        /* loaded from: input_file:org/normalization/client/ConsumerManagementClient$ConnectionEvent$EventType.class */
        public enum EventType {
            CONNECTED,
            DISCONNECTED
        }

        public ConnectionEvent(EventType eventType) {
            this(eventType, null);
        }

        public ConnectionEvent(EventType eventType, ConnectionEventListener.DisconnectReason disconnectReason) {
            this.type = eventType;
            this.reason = disconnectReason;
            this.timestamp = System.currentTimeMillis();
        }

        public EventType getType() {
            return this.type;
        }

        public ConnectionEventListener.DisconnectReason getReason() {
            return this.reason;
        }

        public long getTimestamp() {
            return this.timestamp;
        }

        public boolean isConnected() {
            return this.type == EventType.CONNECTED;
        }
    }

    /* loaded from: input_file:org/normalization/client/ConsumerManagementClient$ConnectionEventListener.class */
    public interface ConnectionEventListener {

        /* loaded from: input_file:org/normalization/client/ConsumerManagementClient$ConnectionEventListener$DisconnectReason.class */
        public enum DisconnectReason {
            CLIENT_CLOSED,
            SERVER_CLOSED,
            HEARTBEAT_TIMEOUT,
            ERROR
        }

        void onConnectionEvent(ConnectionEvent connectionEvent);
    }

    public ConsumerManagementClient(String str, int i, String str2, String str3, int i2) {
        this.channel = ManagedChannelBuilder.forAddress(str, i).usePlaintext().build();
        this.asyncStub = ConsumerManagementServiceGrpc.newStub(this.channel);
        this.instanceName = str2;
        this.ip = str3;
        this.port = i2;
    }

    public void addConnectionEventListener(ConnectionEventListener connectionEventListener) {
        this.connectionListeners.add(connectionEventListener);
    }

    public void removeConnectionEventListener(ConnectionEventListener connectionEventListener) {
        this.connectionListeners.remove(connectionEventListener);
    }

    public void connect() {
        this.lastConnectionAttemptTime = System.currentTimeMillis();
        this.requestObserver = this.asyncStub.connect(new StreamObserver<ServerMessage>() { // from class: org.normalization.client.ConsumerManagementClient.1
            public void onNext(ServerMessage serverMessage) {
                ConsumerManagementClient.this.handleServerMessage(serverMessage);
            }

            public void onError(Throwable th) {
                ConsumerManagementClient.logger.log(Level.SEVERE, "Error in server stream", th);
                ConsumerManagementClient.this.handleDisconnection(ConnectionEventListener.DisconnectReason.ERROR);
                ConsumerManagementClient.this.scheduleReconnect();
            }

            public void onCompleted() {
                ConsumerManagementClient.logger.info("Server stream completed");
                ConsumerManagementClient.this.handleDisconnection(ConnectionEventListener.DisconnectReason.SERVER_CLOSED);
                ConsumerManagementClient.this.scheduleReconnect();
            }
        });
        sendRegistrationRequest();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void scheduleReconnect() {
        int incrementAndGet = this.reconnectAttempts.incrementAndGet();
        int min = Math.min(30, incrementAndGet * 5);
        logger.info("Scheduling reconnect attempt #" + incrementAndGet + " in " + min + " seconds");
        this.scheduler.schedule(() -> {
            logger.info("Attempting to reconnect to consumer management server");
            connect();
        }, min, TimeUnit.SECONDS);
    }

    private void sendRegistrationRequest() {
        RegisterRequest.Builder systemInfo = RegisterRequest.newBuilder().setConsumerId(this.consumerId).setInstanceName(this.instanceName).setIp(this.ip).setPort(this.port).setVersion("1.0.0").setSystemInfo(SystemInfo.newBuilder().setOsName(System.getProperty("os.name")).setOsVersion(System.getProperty("os.version")).setJavaVersion(System.getProperty("java.version")).setTotalMemory(Runtime.getRuntime().totalMemory()).setAvailableProcessors(Runtime.getRuntime().availableProcessors()).m819build());
        Iterator<ModuleConfig> it = this.modules.values().iterator();
        while (it.hasNext()) {
            systemInfo.addCurrentModules(it.next());
        }
        try {
            this.requestObserver.onNext(ClientMessage.newBuilder().setType(ClientMessage.MessageType.REGISTER).setRegister(systemInfo.m579build()).m40build());
            logger.info("Sent registration request to server");
        } catch (Exception e) {
            logger.log(Level.SEVERE, "Failed to send registration request", (Throwable) e);
            this.requestObserver.onError(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleServerMessage(ServerMessage serverMessage) {
        switch (serverMessage.getType()) {
            case HEARTBEAT:
                this.lastHeartbeatTime = System.currentTimeMillis();
                return;
            case REGISTER_RESPONSE:
                handleRegisterResponse(serverMessage.getRegisterResponse());
                return;
            case CONFIG_CHANGE:
                handleConfigChange(serverMessage.getConfigChange());
                return;
            case COMMAND:
                handleCommand(serverMessage.getCommand());
                return;
            default:
                logger.warning("Received unknown message type from server");
                return;
        }
    }

    private void handleRegisterResponse(RegisterResponse registerResponse) {
        logger.info("Received registration response: " + registerResponse.getSuccess() + ", id: " + registerResponse.getAssignedConsumerId());
        if (!registerResponse.getSuccess()) {
            logger.severe("Registration failed: " + registerResponse.getMessage());
            return;
        }
        this.consumerId = registerResponse.getAssignedConsumerId();
        this.heartbeatIntervalSeconds = registerResponse.getHeartbeatInterval();
        this.connectionTime = System.currentTimeMillis();
        this.lastHeartbeatTime = this.connectionTime;
        this.reconnectAttempts.set(0);
        if (!this.connected.getAndSet(true)) {
            fireConnected();
        }
        startHeartbeatTask();
        if (registerResponse.getExpectedModulesCount() > 0) {
            for (ModuleConfig moduleConfig : registerResponse.getExpectedModulesList()) {
                String moduleId = moduleConfig.getModuleId();
                if (!this.modules.containsKey(moduleId)) {
                    addModule(moduleConfig);
                } else if (!this.modules.get(moduleId).equals(moduleConfig)) {
                    updateModule(moduleConfig);
                }
            }
        }
        updateStatus();
        sendStatusReport();
    }

    private void startHeartbeatTask() {
        if (this.heartbeatTask != null && !this.heartbeatTask.isDone()) {
            this.heartbeatTask.cancel(false);
        }
        this.heartbeatTask = this.scheduler.scheduleAtFixedRate(() -> {
            if (this.connected.get()) {
                sendHeartbeat();
            }
        }, 0L, this.heartbeatIntervalSeconds, TimeUnit.SECONDS);
    }

    private void sendHeartbeat() {
        try {
            this.requestObserver.onNext(ClientMessage.newBuilder().setType(ClientMessage.MessageType.HEARTBEAT).setHeartbeat(HeartbeatMessage.newBuilder().setTimestamp(System.currentTimeMillis()).m294build()).m40build());
        } catch (Exception e) {
            logger.log(Level.WARNING, "Failed to send heartbeat", (Throwable) e);
            handleDisconnection(ConnectionEventListener.DisconnectReason.ERROR);
            scheduleReconnect();
        }
    }

    private void handleConfigChange(ConfigChangeRequest configChangeRequest) {
        ConfigChangeCallback remove;
        logger.info("Received config change request: " + configChangeRequest.getRequestId() + ", type: " + configChangeRequest.getChangeType());
        boolean z = false;
        String str = "";
        try {
            switch (configChangeRequest.getChangeType()) {
                case ADD_MODULE:
                    z = addModule(configChangeRequest.getModule());
                    str = z ? "Module added successfully" : "Failed to add module";
                    break;
                case UPDATE_MODULE:
                    z = updateModule(configChangeRequest.getModule());
                    str = z ? "Module updated successfully" : "Failed to update module";
                    break;
                case DELETE_MODULE:
                    z = deleteModule(configChangeRequest.getModuleIdToDelete());
                    str = z ? "Module deleted successfully" : "Failed to delete module";
                    break;
                case UPDATE_CONCURRENCY:
                    z = updateConcurrency(configChangeRequest.getTargetModuleId(), configChangeRequest.getPrefetchCount(), configChangeRequest.getConcurrentConsumers(), configChangeRequest.getMaxConcurrentConsumers());
                    str = z ? "Concurrency updated successfully" : "Failed to update concurrency";
                    break;
            }
        } catch (Exception e) {
            z = false;
            str = "Error: " + e.getMessage();
            logger.log(Level.SEVERE, "Error processing config change", (Throwable) e);
        }
        sendConfigChangeResponse(configChangeRequest.getRequestId(), z, str);
        if (this.pendingConfigChanges.containsKey(configChangeRequest.getRequestId()) && (remove = this.pendingConfigChanges.remove(configChangeRequest.getRequestId())) != null) {
            remove.onComplete(z, str);
        }
        if (z) {
            updateStatus();
        }
    }

    private void sendConfigChangeResponse(String str, boolean z, String str2) {
        try {
            this.requestObserver.onNext(ClientMessage.newBuilder().setType(ClientMessage.MessageType.CONFIG_RESPONSE).setConfigResponse(ConfigChangeResponse.newBuilder().setRequestId(str).setSuccess(z).setMessage(str2).m188build()).m40build());
        } catch (Exception e) {
            logger.log(Level.SEVERE, "Failed to send config change response", (Throwable) e);
        }
    }

    private void handleCommand(Command command) {
        logger.info("Received command: " + command.getCommandId() + ", type: " + command.getType());
        String moduleId = command.getModuleId();
        try {
            switch (command.getType()) {
                case PAUSE_MODULE:
                    if (moduleId != null && !moduleId.isEmpty()) {
                        pauseModule(moduleId);
                        break;
                    }
                    break;
                case RESUME_MODULE:
                    if (moduleId != null && !moduleId.isEmpty()) {
                        resumeModule(moduleId);
                        break;
                    }
                    break;
                case RESTART_MODULE:
                    if (moduleId != null && !moduleId.isEmpty()) {
                        restartModule(moduleId);
                        break;
                    }
                    break;
                case SHUTDOWN_CONSUMER:
                    shutdown();
                    break;
                case RELOAD_CONFIG:
                    reloadConfig();
                    break;
                case CLEAR_METRICS:
                    clearMetrics(moduleId);
                    break;
                case DUMP_STATUS:
                    updateStatus();
                    sendStatusReport();
                    break;
            }
        } catch (Exception e) {
            logger.log(Level.SEVERE, "Error processing command", (Throwable) e);
        }
    }

    public boolean addModule(ModuleConfig moduleConfig) {
        try {
            String moduleId = moduleConfig.getModuleId();
            this.modules.put(moduleId, moduleConfig);
            logger.info("Module added: " + moduleId);
            updateStatus();
            return true;
        } catch (Exception e) {
            logger.log(Level.SEVERE, "Failed to add module", (Throwable) e);
            return false;
        }
    }

    public boolean updateModule(ModuleConfig moduleConfig) {
        try {
            String moduleId = moduleConfig.getModuleId();
            if (!this.modules.containsKey(moduleId)) {
                logger.warning("Cannot update non-existing module: " + moduleId);
                return false;
            }
            this.modules.put(moduleId, moduleConfig);
            logger.info("Module updated: " + moduleId);
            updateStatus();
            return true;
        } catch (Exception e) {
            logger.log(Level.SEVERE, "Failed to update module", (Throwable) e);
            return false;
        }
    }

    public boolean deleteModule(String str) {
        try {
            if (!this.modules.containsKey(str)) {
                logger.warning("Cannot delete non-existing module: " + str);
                return false;
            }
            this.modules.remove(str);
            logger.info("Module deleted: " + str);
            updateStatus();
            return true;
        } catch (Exception e) {
            logger.log(Level.SEVERE, "Failed to delete module", (Throwable) e);
            return false;
        }
    }

    public boolean updateConcurrency(String str, int i, int i2, int i3) {
        try {
            if (!this.modules.containsKey(str)) {
                logger.warning("Cannot update concurrency for non-existing module: " + str);
                return false;
            }
            ModuleConfig moduleConfig = this.modules.get(str);
            NormalizationConfig normalization = moduleConfig.getNormalization();
            this.modules.put(str, ModuleConfig.newBuilder(moduleConfig).setNormalization(NormalizationConfig.newBuilder(normalization).setModule(ModuleDetail.newBuilder(normalization.getModule()).setPrefetchCount(i).setConcurrentConsumers(i2).setMaxConcurrentConsumers(i3).m388build()).m484build()).m341build());
            logger.info("Concurrency updated for module: " + str);
            updateStatus();
            return true;
        } catch (Exception e) {
            logger.log(Level.SEVERE, "Failed to update concurrency", (Throwable) e);
            return false;
        }
    }

    public void pauseModule(String str) {
        logger.info("Pausing module: " + str);
        updateStatus();
    }

    public void resumeModule(String str) {
        logger.info("Resuming module: " + str);
        updateStatus();
    }

    public void restartModule(String str) {
        logger.info("Restarting module: " + str);
        updateStatus();
    }

    public void reloadConfig() {
        logger.info("Reloading configuration");
        updateStatus();
    }

    public void clearMetrics(String str) {
        if (str == null || str.isEmpty()) {
            logger.info("Clearing all metrics");
        } else {
            logger.info("Clearing metrics for module: " + str);
        }
        updateStatus();
    }

    private void updateStatus() {
        this.lastStatusReport = buildStatusReport();
    }

    private StatusReport buildStatusReport() {
        StatusReport.Builder status = StatusReport.newBuilder().setConsumerId(this.consumerId).setTimestamp(System.currentTimeMillis()).setStatus(StatusReport.ConsumerStatus.RUNNING);
        Iterator<Map.Entry<String, ModuleConfig>> it = this.modules.entrySet().iterator();
        while (it.hasNext()) {
            status.addModuleStatuses(ModuleStatus.newBuilder().setModuleId(it.next().getKey()).setState(ModuleStatus.ModuleState.RUNNING).setMessagesProcessed(1000L).setErrorsCount(5L).setProcessingRate(50.5d).setLastActivityTime(System.currentTimeMillis() - 5000).m435build());
        }
        status.setResources(ResourceMetrics.newBuilder().setCpuUsagePercent(25.5d).setMemoryUsagePercent(40.0d).setUsedHeapMemory(Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()).setActiveThreads(Thread.activeCount()).setQueuedMessages(100).m674build());
        return status.m770build();
    }

    public void sendStatusReport() {
        updateStatus();
        try {
            this.requestObserver.onNext(ClientMessage.newBuilder().setType(ClientMessage.MessageType.STATUS_REPORT).setStatusReport(this.lastStatusReport).m40build());
            logger.fine("Sent status report to server");
        } catch (Exception e) {
            logger.log(Level.WARNING, "Failed to send status report", (Throwable) e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleDisconnection(ConnectionEventListener.DisconnectReason disconnectReason) {
        if (this.connected.getAndSet(false)) {
            if (this.heartbeatTask != null) {
                this.heartbeatTask.cancel(false);
            }
            fireDisconnected(disconnectReason);
        }
    }

    private void fireConnected() {
        ConnectionEvent connectionEvent = new ConnectionEvent(ConnectionEvent.EventType.CONNECTED);
        Iterator<ConnectionEventListener> it = this.connectionListeners.iterator();
        while (it.hasNext()) {
            try {
                it.next().onConnectionEvent(connectionEvent);
            } catch (Exception e) {
                logger.log(Level.WARNING, "Error notifying listener of connection", (Throwable) e);
            }
        }
    }

    private void fireDisconnected(ConnectionEventListener.DisconnectReason disconnectReason) {
        ConnectionEvent connectionEvent = new ConnectionEvent(ConnectionEvent.EventType.DISCONNECTED, disconnectReason);
        Iterator<ConnectionEventListener> it = this.connectionListeners.iterator();
        while (it.hasNext()) {
            try {
                it.next().onConnectionEvent(connectionEvent);
            } catch (Exception e) {
                logger.log(Level.WARNING, "Error notifying listener of disconnection", (Throwable) e);
            }
        }
    }

    public void startStatusReporting(int i) {
        this.scheduler.scheduleAtFixedRate(this::updateStatus, 0L, Math.min(i / 2, 15), TimeUnit.SECONDS);
        this.scheduler.scheduleAtFixedRate(this::sendStatusReport, i, i, TimeUnit.SECONDS);
    }

    public void shutdown() {
        logger.info("Shutting down client");
        if (this.heartbeatTask != null) {
            this.heartbeatTask.cancel(false);
        }
        this.scheduler.shutdown();
        if (this.requestObserver != null) {
            try {
                this.requestObserver.onCompleted();
            } catch (Exception e) {
                logger.log(Level.WARNING, "Error completing request stream", (Throwable) e);
            }
        }
        try {
            this.channel.shutdown().awaitTermination(5L, TimeUnit.SECONDS);
        } catch (InterruptedException e2) {
            logger.log(Level.WARNING, "Error shutting down channel", (Throwable) e2);
            Thread.currentThread().interrupt();
        }
    }

    public boolean isConnected() {
        return this.connected.get();
    }

    public String getConsumerId() {
        return this.consumerId;
    }

    public long getLastHeartbeatTime() {
        return this.lastHeartbeatTime;
    }

    public long getConnectionTime() {
        return this.connectionTime;
    }

    public long getLastConnectionAttemptTime() {
        return this.lastConnectionAttemptTime;
    }

    public int getReconnectAttempts() {
        return this.reconnectAttempts.get();
    }

    public int getModuleCount() {
        return this.modules.size();
    }

    public Set<String> getModuleIds() {
        return new HashSet(this.modules.keySet());
    }

    public ModuleStatus.ModuleState getModuleState(String str) {
        if (this.lastStatusReport != null) {
            for (ModuleStatus moduleStatus : this.lastStatusReport.getModuleStatusesList()) {
                if (moduleStatus.getModuleId().equals(str)) {
                    return moduleStatus.getState();
                }
            }
        }
        return ModuleStatus.ModuleState.STARTING;
    }

    public double getCpuUsage() {
        if (this.lastStatusReport == null || !this.lastStatusReport.hasResources()) {
            return -1.0d;
        }
        return this.lastStatusReport.getResources().getCpuUsagePercent();
    }

    public double getMemoryUsage() {
        if (this.lastStatusReport == null || !this.lastStatusReport.hasResources()) {
            return -1.0d;
        }
        return this.lastStatusReport.getResources().getMemoryUsagePercent();
    }

    public long getTotalModuleErrors() {
        long j = 0;
        if (this.lastStatusReport != null) {
            Iterator<ModuleStatus> it = this.lastStatusReport.getModuleStatusesList().iterator();
            while (it.hasNext()) {
                j += it.next().getErrorsCount();
            }
        }
        return j;
    }

    public static void main(String[] strArr) {
        ConsumerManagementClient consumerManagementClient = new ConsumerManagementClient("192.168.1.62", 50052, "example-consumer", "127.0.0.1", 8080);
        consumerManagementClient.addConnectionEventListener(new ConnectionEventListener() { // from class: org.normalization.client.ConsumerManagementClient.2
            @Override // org.normalization.client.ConsumerManagementClient.ConnectionEventListener
            public void onConnectionEvent(ConnectionEvent connectionEvent) {
                if (connectionEvent.isConnected()) {
                    System.out.println("Connected to management server!");
                } else {
                    System.out.println("Disconnected from management server. Reason: " + connectionEvent.getReason());
                }
            }
        });
        consumerManagementClient.connect();
        consumerManagementClient.startStatusReporting(30);
        Runtime runtime = Runtime.getRuntime();
        Objects.requireNonNull(consumerManagementClient);
        runtime.addShutdownHook(new Thread(consumerManagementClient::shutdown));
        consumerManagementClient.addModule(ModuleConfig.newBuilder().setModuleId("example-module").setNormalization(NormalizationConfig.newBuilder().setEnable(true).setModule(ModuleDetail.newBuilder().setRoutingKey("guotong.normalization.send-project-example").setProducer("exampleProducer").setConsumer("exampleConsumer").setAutoAck(false).setPrefetchCount(250).setConcurrentConsumers(5).setMaxConcurrentConsumers(10).setQueue(QueueConfig.newBuilder().setName("guotong.normalization.queue-project-example").m532build()).setExchange(ExchangeConfig.newBuilder().setType(ExchangeConfig.ExchangeType.TOPIC).setName("guotong.normalization.topic-project-example").m245build()).m388build()).m484build()).m341build());
        try {
            Thread.sleep(Long.MAX_VALUE);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}
