package io.cresco.agent.controller.communication;

import io.cresco.agent.controller.core.ControllerEngine;
import io.cresco.library.messaging.MsgEvent;
import io.cresco.library.plugin.PluginBuilder;
import io.cresco.library.utilities.CLogger;
import io.netty.handler.traffic.AbstractTrafficShapingHandler;
import jakarta.jms.JMSException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;

/* loaded from: input_file:io/cresco/agent/controller/communication/AgentProducer.class */
public class AgentProducer {
    private Map<String, ActiveProducerWorker> producerWorkers;
    private Map<String, Long> producerWorkersInProcess;
    private AtomicBoolean lockWIP = new AtomicBoolean();
    private ControllerEngine controllerEngine;
    private PluginBuilder plugin;
    private CLogger logger;
    private String URI;
    private Timer timer;
    private ActiveClient activeClient;

    /* loaded from: input_file:io/cresco/agent/controller/communication/AgentProducer$ClearProducerTask.class */
    private class ClearProducerTask extends TimerTask {
        private ClearProducerTask() {
        }

        @Override // java.util.TimerTask, java.lang.Runnable
        public void run() {
            AgentProducer.this.logger.trace("Running ClearProducerTask cleanup...");
            ArrayList<String> arrayList = new ArrayList();
            synchronized (AgentProducer.this.producerWorkers) {
                for (Map.Entry<String, ActiveProducerWorker> entry : AgentProducer.this.producerWorkers.entrySet()) {
                    ActiveProducerWorker value = entry.getValue();
                    AgentProducer.this.logger.trace("Checking worker [{}] isActive: {}", new Object[]{entry.getKey(), Boolean.valueOf(value.isActive)});
                    if (value.isActive) {
                        value.isActive = false;
                    } else {
                        AgentProducer.this.logger.info("Shutting down inactive worker [{}]", new Object[]{entry.getKey()});
                        if (value.shutdown()) {
                            arrayList.add(entry.getKey());
                        } else {
                            AgentProducer.this.logger.warn("Failed to shutdown inactive worker [{}] cleanly.", new Object[]{entry.getKey()});
                            arrayList.add(entry.getKey());
                        }
                    }
                }
                for (String str : arrayList) {
                    AgentProducer.this.producerWorkers.remove(str);
                    AgentProducer.this.logger.info("Removed inactive worker [{}] from map.", new Object[]{str});
                }
            }
            AgentProducer.this.logger.trace("ClearProducerTask cleanup finished.");
        }
    }

    public AgentProducer(ControllerEngine controllerEngine, String str, ActiveClient activeClient) {
        this.controllerEngine = controllerEngine;
        this.plugin = controllerEngine.getPluginBuilder();
        this.logger = this.plugin.getLogger(AgentProducer.class.getName(), CLogger.Level.Info);
        this.activeClient = activeClient;
        this.URI = str;
        try {
            this.producerWorkers = new ConcurrentHashMap();
            this.producerWorkersInProcess = Collections.synchronizedMap(new HashMap());
            this.timer = new Timer("AgentProducerCleanupTimer", true);
            this.timer.scheduleAtFixedRate(new ClearProducerTask(), AbstractTrafficShapingHandler.DEFAULT_MAX_TIME, 30000L);
            this.logger.info("AgentProducer initialized for URI [{}]", new Object[]{str});
        } catch (Exception e) {
            this.logger.error("AgentProducer Constructor Exception for URI [{}]: {}", new Object[]{str, e.getMessage(), e});
            throw new RuntimeException("Failed to initialize AgentProducer", e);
        }
    }

    public void shutdown() {
        ActiveProducerWorker remove;
        this.logger.info("Shutting down AgentProducer for URI [{}]...", new Object[]{this.URI});
        if (this.timer != null) {
            this.timer.cancel();
            this.logger.debug("Cleanup timer cancelled.");
        }
        ArrayList<String> arrayList = new ArrayList();
        synchronized (this.producerWorkers) {
            arrayList.addAll(this.producerWorkers.keySet());
        }
        this.logger.info("Shutting down {} active producer workers...", new Object[]{Integer.valueOf(arrayList.size())});
        for (String str : arrayList) {
            synchronized (this.producerWorkers) {
                remove = this.producerWorkers.remove(str);
            }
            if (remove != null) {
                this.logger.debug("Shutting down worker [{}]", new Object[]{str});
                remove.shutdown();
            }
        }
        this.producerWorkers.clear();
        this.producerWorkersInProcess.clear();
        this.logger.info("AgentProducer shutdown complete for URI [{}]", new Object[]{this.URI});
    }

    public void invalidateWorkersForURI(String str) {
        if (!this.URI.equals(str)) {
            this.logger.debug("Ignoring invalidateWorkersForURI call for URI [{}], this producer manages URI [{}]", new Object[]{str, this.URI});
            return;
        }
        this.logger.warn("Invalidating ALL producer workers due to connection failure on URI [{}]", new Object[]{str});
        ArrayList<String> arrayList = new ArrayList();
        synchronized (this.producerWorkers) {
            arrayList.addAll(this.producerWorkers.keySet());
            for (String str2 : arrayList) {
                ActiveProducerWorker activeProducerWorker = this.producerWorkers.get(str2);
                if (activeProducerWorker != null) {
                    this.logger.info("Shutting down and removing worker for destination [{}] due to connection failure on URI [{}]", new Object[]{str2, str});
                    activeProducerWorker.shutdown();
                }
            }
            this.producerWorkers.clear();
        }
        this.logger.info("Finished invalidating ALL workers for URI [{}]", new Object[]{str});
    }

    public boolean sendMessage(MsgEvent msgEvent) {
        boolean z = false;
        long j = 500;
        String forwardDst = msgEvent.getForwardDst();
        if (forwardDst == null) {
            this.logger.error("sendMessage Error: Destination path (dstPath) is null. Message: {}", new Object[]{msgEvent.getParams()});
            return false;
        }
        int i = 1;
        while (true) {
            if (i > 3) {
                break;
            }
            ActiveProducerWorker activeProducerWorker = null;
            boolean z2 = false;
            try {
                synchronized (this.producerWorkers) {
                    if (this.producerWorkers.containsKey(forwardDst)) {
                        activeProducerWorker = this.producerWorkers.get(forwardDst);
                        if (activeProducerWorker == null || !this.activeClient.isConnectionActive(activeProducerWorker.getURI())) {
                            CLogger cLogger = this.logger;
                            Object[] objArr = new Object[2];
                            objArr[0] = forwardDst;
                            objArr[1] = activeProducerWorker != null ? activeProducerWorker.getURI() : "N/A";
                            cLogger.warn("Worker found for [{}], but connection to URI [{}] is inactive or worker is null. Invalidating.", objArr);
                            if (activeProducerWorker != null) {
                                activeProducerWorker.shutdown();
                            }
                            this.producerWorkers.remove(forwardDst);
                            activeProducerWorker = null;
                        } else {
                            z2 = true;
                            this.logger.trace("Found valid existing worker for [{}]", new Object[]{forwardDst});
                        }
                    }
                }
                if (activeProducerWorker == null) {
                    this.logger.info("Attempting to create new worker for destination [{}] (Attempt {}/{})", new Object[]{forwardDst, Integer.valueOf(i), 3});
                    synchronized (this.producerWorkers) {
                        if (this.producerWorkers.containsKey(forwardDst)) {
                            activeProducerWorker = this.producerWorkers.get(forwardDst);
                            if (activeProducerWorker == null || !this.activeClient.isConnectionActive(activeProducerWorker.getURI())) {
                                this.logger.warn("Worker for [{}] created by another thread is invalid. Removing.", new Object[]{forwardDst});
                                if (activeProducerWorker != null) {
                                    activeProducerWorker.shutdown();
                                }
                                this.producerWorkers.remove(forwardDst);
                                activeProducerWorker = null;
                            } else {
                                z2 = true;
                                this.logger.info("Worker for [{}] created by another thread, using it.", new Object[]{forwardDst});
                            }
                        }
                        if (activeProducerWorker == null) {
                            try {
                                activeProducerWorker = new ActiveProducerWorker(this.controllerEngine, forwardDst, this.URI);
                                if (activeProducerWorker.isActive) {
                                    this.producerWorkers.put(forwardDst, activeProducerWorker);
                                    z2 = true;
                                    this.logger.info("Successfully created and added new worker for [{}]", new Object[]{forwardDst});
                                } else {
                                    this.logger.error("Failed to initialize new ActiveProducerWorker for [{}].", new Object[]{forwardDst});
                                }
                            } catch (Exception e) {
                                this.logger.error("Exception creating ActiveProducerWorker for [{}]: {}", new Object[]{forwardDst, e.getMessage(), e});
                            }
                        }
                    }
                }
                if (!z2 || activeProducerWorker == null) {
                    this.logger.error("Failed to obtain/create a valid worker for destination [{}] (Attempt {}/{})", new Object[]{forwardDst, Integer.valueOf(i), 3});
                } else {
                    activeProducerWorker.isActive = true;
                    if (msgEvent.hasFiles()) {
                        new Thread(new ActiveProducerWorkerData(this.controllerEngine, activeProducerWorker.getTXQueueName(), activeProducerWorker.getURI(), msgEvent)).start();
                        z = true;
                        this.logger.debug("File message queued for async sending via worker [{}]", new Object[]{forwardDst});
                        break;
                    }
                    try {
                        activeProducerWorker.sendMessage(msgEvent);
                        z = true;
                        this.logger.debug("Message sent successfully via worker [{}]", new Object[]{forwardDst});
                        break;
                    } catch (JMSException e2) {
                        this.logger.error("JMSException during sendMessage via worker [{}] (Attempt {}/{}): {}", new Object[]{forwardDst, Integer.valueOf(i), 3, e2.getMessage()});
                        synchronized (this.producerWorkers) {
                            if (this.producerWorkers.get(forwardDst) == activeProducerWorker) {
                                this.logger.warn("Invalidating worker [{}] due to JMSException during send.", new Object[]{forwardDst});
                                activeProducerWorker.shutdown();
                                this.producerWorkers.remove(forwardDst);
                            }
                        }
                    }
                }
            } catch (Exception e3) {
                this.logger.error("Unexpected Exception in AgentProducer sendMessage for [{}] (Attempt {}/{}): {}", new Object[]{forwardDst, Integer.valueOf(i), 3, e3.getMessage(), e3});
                if (activeProducerWorker != null) {
                    synchronized (this.producerWorkers) {
                        if (this.producerWorkers.get(forwardDst) == activeProducerWorker) {
                            this.logger.warn("Invalidating worker [{}] due to general Exception.", new Object[]{forwardDst});
                            activeProducerWorker.shutdown();
                            this.producerWorkers.remove(forwardDst);
                        }
                    }
                }
            }
            if (!z && i < 3) {
                try {
                    this.logger.warn("Send attempt {}/{} failed for [{}]. Retrying in {} ms...", new Object[]{Integer.valueOf(i), 3, forwardDst, Long.valueOf(j)});
                    Thread.sleep(j);
                    j *= 2;
                } catch (InterruptedException e4) {
                    Thread.currentThread().interrupt();
                    this.logger.error("Retry delay interrupted for destination [{}]", new Object[]{forwardDst});
                }
            }
            i++;
        }
        if (!z) {
            this.logger.error("Failed to send message to destination [{}] after {} attempts.", new Object[]{forwardDst, 3});
        }
        return z;
    }
}
