package io.cresco.agent.controller.communication;

import io.cresco.agent.controller.core.ControllerEngine;
import io.cresco.library.messaging.MsgEvent;
import io.cresco.library.plugin.PluginBuilder;
import io.cresco.library.utilities.CLogger;
import io.netty.handler.traffic.AbstractTrafficShapingHandler;
import jakarta.jms.JMSException;
import java.util.ArrayList;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;

/* loaded from: input_file:io/cresco/agent/controller/communication/AgentProducer.class */
public class AgentProducer {
    private Map<String, ActiveProducerWorker> producerWorkers;
    private ControllerEngine controllerEngine;
    private PluginBuilder plugin;
    private CLogger logger;
    private String baseURI;
    private Timer timer;
    private ActiveClient activeClient;
    private final int MAX_SEND_RETRIES_CONFIG;
    private final long INITIAL_RETRY_DELAY_MS;
    private final long WORKER_CLEANUP_INTERVAL_MS;
    private final long WORKER_CLEANUP_DELAY_MS;

    /* loaded from: input_file:io/cresco/agent/controller/communication/AgentProducer$ClearProducerTask.class */
    private class ClearProducerTask extends TimerTask {
        private ClearProducerTask() {
        }

        @Override // java.util.TimerTask, java.lang.Runnable
        public void run() {
            AgentProducer.this.logger.trace("Running ClearProducerTask cleanup...");
            ArrayList<String> arrayList = new ArrayList();
            synchronized (AgentProducer.this.producerWorkers) {
                for (Map.Entry<String, ActiveProducerWorker> entry : AgentProducer.this.producerWorkers.entrySet()) {
                    ActiveProducerWorker value = entry.getValue();
                    AgentProducer.this.logger.trace("Checking worker [{}] for queue [{}], current isActive flag: {}", new Object[]{value.toString(), entry.getKey(), Boolean.valueOf(value.isActive)});
                    if (value.isActive) {
                        value.isActive = false;
                        AgentProducer.this.logger.trace("Worker for queue [{}] was active, resetting flag for next check.", new Object[]{entry.getKey()});
                    } else {
                        AgentProducer.this.logger.info("Worker for queue [{}] was not active in the last cycle. Shutting it down.", new Object[]{entry.getKey()});
                        if (value.shutdown()) {
                            arrayList.add(entry.getKey());
                        } else {
                            AgentProducer.this.logger.warn("Failed to cleanly shutdown inactive worker for queue [{}]. Will still remove from map.", new Object[]{entry.getKey()});
                            arrayList.add(entry.getKey());
                        }
                    }
                }
                for (String str : arrayList) {
                    AgentProducer.this.producerWorkers.remove(str);
                    AgentProducer.this.logger.info("Removed inactive worker for queue [{}] from producerWorkers map.", new Object[]{str});
                }
            }
            AgentProducer.this.logger.trace("ClearProducerTask cleanup finished. Active workers: {}", new Object[]{Integer.valueOf(AgentProducer.this.producerWorkers.size())});
        }
    }

    public AgentProducer(ControllerEngine controllerEngine, String str, ActiveClient activeClient) {
        this.controllerEngine = controllerEngine;
        this.plugin = controllerEngine.getPluginBuilder();
        this.logger = this.plugin.getLogger(AgentProducer.class.getName(), CLogger.Level.Info);
        this.activeClient = activeClient;
        this.baseURI = str;
        this.MAX_SEND_RETRIES_CONFIG = this.plugin.getConfig().getIntegerParam("agentproducer_max_send_retries", 3).intValue();
        this.INITIAL_RETRY_DELAY_MS = this.plugin.getConfig().getLongParam("agentproducer_initial_retry_delay", 500L).longValue();
        this.WORKER_CLEANUP_INTERVAL_MS = this.plugin.getConfig().getLongParam("agentproducer_worker_cleanup_interval", 30000L).longValue();
        this.WORKER_CLEANUP_DELAY_MS = this.plugin.getConfig().getLongParam("agentproducer_worker_cleanup_delay", Long.valueOf(AbstractTrafficShapingHandler.DEFAULT_MAX_TIME)).longValue();
        try {
            this.producerWorkers = new ConcurrentHashMap();
            this.timer = new Timer("AgentProducerCleanupTimer", true);
            this.timer.scheduleAtFixedRate(new ClearProducerTask(), this.WORKER_CLEANUP_DELAY_MS, this.WORKER_CLEANUP_INTERVAL_MS);
            this.logger.info("AgentProducer initialized for base URI [{}] with worker cleanup every {} ms.", new Object[]{this.baseURI, Long.valueOf(this.WORKER_CLEANUP_INTERVAL_MS)});
        } catch (Exception e) {
            this.logger.error("AgentProducer Constructor Exception for URI [{}]: {}", new Object[]{this.baseURI, e.getMessage(), e});
            throw new RuntimeException("Failed to initialize AgentProducer", e);
        }
    }

    public void shutdown() {
        this.logger.info("Shutting down AgentProducer for base URI [{}]...", new Object[]{this.baseURI});
        if (this.timer != null) {
            this.timer.cancel();
            this.logger.debug("Cleanup timer cancelled.");
        }
        ArrayList<String> arrayList = new ArrayList(this.producerWorkers.keySet());
        this.logger.info("Shutting down {} active producer workers...", new Object[]{Integer.valueOf(arrayList.size())});
        for (String str : arrayList) {
            ActiveProducerWorker remove = this.producerWorkers.remove(str);
            if (remove != null) {
                this.logger.debug("Shutting down worker for queue [{}]", new Object[]{str});
                remove.shutdown();
            }
        }
        this.producerWorkers.clear();
        this.logger.info("AgentProducer shutdown complete for base URI [{}]", new Object[]{this.baseURI});
    }

    public void invalidateWorkersForURI(String str) {
        if (!this.baseURI.equals(str)) {
            this.logger.debug("Ignoring invalidateWorkersForURI call for URI [{}], this producer manages base URI [{}]", new Object[]{str, this.baseURI});
            return;
        }
        this.logger.warn("Invalidating ALL producer workers due to connection failure on base URI [{}]", new Object[]{str});
        synchronized (this.producerWorkers) {
            for (String str2 : new ArrayList(this.producerWorkers.keySet())) {
                ActiveProducerWorker activeProducerWorker = this.producerWorkers.get(str2);
                if (activeProducerWorker != null) {
                    this.logger.info("Shutting down and removing worker for destination [{}] due to connection failure on base URI [{}]", new Object[]{str2, str});
                    activeProducerWorker.shutdown();
                    this.producerWorkers.remove(str2);
                }
            }
        }
        this.logger.info("Finished invalidating ALL workers for base URI [{}]", new Object[]{str});
    }

    public boolean sendMessage(MsgEvent msgEvent) {
        boolean z = false;
        long j = this.INITIAL_RETRY_DELAY_MS;
        String forwardDst = msgEvent.getForwardDst();
        if (forwardDst == null) {
            this.logger.error("sendMessage Error: Destination path (dstPath) is null. Message: {}", new Object[]{msgEvent.getParams()});
            return false;
        }
        int i = 1;
        while (true) {
            if (i > this.MAX_SEND_RETRIES_CONFIG) {
                break;
            }
            ActiveProducerWorker activeProducerWorker = null;
            int i2 = i;
            try {
                if (!this.activeClient.isConnectionActive(this.baseURI)) {
                    this.logger.warn("Connection to base URI [{}] is not active. Send attempt {}/{} for [{}] will likely fail worker creation/usage.", new Object[]{this.baseURI, Integer.valueOf(i2), Integer.valueOf(this.MAX_SEND_RETRIES_CONFIG), forwardDst});
                }
                activeProducerWorker = this.producerWorkers.computeIfAbsent(forwardDst, str -> {
                    this.logger.info("No existing worker for [{}]. Attempting to create new worker (Overall Send Attempt {}/{})", new Object[]{str, Integer.valueOf(i2), Integer.valueOf(this.MAX_SEND_RETRIES_CONFIG)});
                    try {
                        ActiveProducerWorker activeProducerWorker2 = new ActiveProducerWorker(this.controllerEngine, str, this.baseURI);
                        if (activeProducerWorker2.isActive) {
                            this.logger.info("Successfully created and cached new worker for [{}]", new Object[]{str});
                            return activeProducerWorker2;
                        }
                        this.logger.error("Failed to initialize new ActiveProducerWorker for [{}]. Returning null for computeIfAbsent.", new Object[]{str});
                        return null;
                    } catch (Exception e) {
                        this.logger.error("Exception during ActiveProducerWorker creation for key [{}]: {}", new Object[]{str, e.getMessage(), e});
                        return null;
                    }
                });
                if (activeProducerWorker == null) {
                    this.logger.error("Failed to get or create a valid worker for destination [{}] (Attempt {}/{}) after computeIfAbsent.", new Object[]{forwardDst, Integer.valueOf(i2), Integer.valueOf(this.MAX_SEND_RETRIES_CONFIG)});
                } else {
                    activeProducerWorker.isActive = true;
                    if (msgEvent.hasFiles()) {
                        new Thread(new ActiveProducerWorkerData(this.controllerEngine, activeProducerWorker.getTXQueueName(), activeProducerWorker.getConnectionURI(), msgEvent)).start();
                        z = true;
                        this.logger.debug("File message for [{}] queued for async sending (Attempt {}/{})", new Object[]{forwardDst, Integer.valueOf(i2), Integer.valueOf(this.MAX_SEND_RETRIES_CONFIG)});
                        break;
                    }
                    try {
                        activeProducerWorker.sendMessage(msgEvent);
                        z = true;
                        this.logger.debug("Message to [{}] sent successfully (Attempt {}/{})", new Object[]{forwardDst, Integer.valueOf(i2), Integer.valueOf(this.MAX_SEND_RETRIES_CONFIG)});
                        break;
                    } catch (JMSException e) {
                        this.logger.error("JMSException during sendMessage via worker for [{}] (Attempt {}/{}): {}", new Object[]{forwardDst, Integer.valueOf(i2), Integer.valueOf(this.MAX_SEND_RETRIES_CONFIG), e.getMessage()});
                        synchronized (this.producerWorkers) {
                            if (this.producerWorkers.get(forwardDst) == activeProducerWorker) {
                                this.logger.warn("Removing failed worker for [{}] from map due to JMSException.", new Object[]{forwardDst});
                                activeProducerWorker.shutdown();
                                this.producerWorkers.remove(forwardDst);
                            }
                        }
                    }
                }
            } catch (Exception e2) {
                this.logger.error("Unexpected Exception in AgentProducer.sendMessage for [{}] (Attempt {}/{}): {}", new Object[]{forwardDst, Integer.valueOf(i2), Integer.valueOf(this.MAX_SEND_RETRIES_CONFIG), e2.getMessage(), e2});
                if (activeProducerWorker != null) {
                    synchronized (this.producerWorkers) {
                        if (this.producerWorkers.get(forwardDst) == activeProducerWorker) {
                            this.logger.warn("Removing worker for [{}] due to unexpected exception.", new Object[]{forwardDst});
                            activeProducerWorker.shutdown();
                            this.producerWorkers.remove(forwardDst);
                        }
                    }
                }
            }
            if (z) {
                break;
            }
            if (i2 < this.MAX_SEND_RETRIES_CONFIG) {
                try {
                    this.logger.warn("Send attempt {}/{} for [{}] failed. Retrying in {} ms...", new Object[]{Integer.valueOf(i2), Integer.valueOf(this.MAX_SEND_RETRIES_CONFIG), forwardDst, Long.valueOf(j)});
                    Thread.sleep(j);
                    j = Math.min(j * 2, 5000L);
                } catch (InterruptedException e3) {
                    Thread.currentThread().interrupt();
                    this.logger.error("Retry delay interrupted for destination [{}]", new Object[]{forwardDst});
                }
            }
            i++;
        }
        if (!z) {
            this.logger.error("Failed to send message to destination [{}] after {} attempts.", new Object[]{forwardDst, Integer.valueOf(this.MAX_SEND_RETRIES_CONFIG)});
        }
        return z;
    }
}
