package org.somda.sdc.dpws.udp;

import com.google.common.eventbus.EventBus;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.Service;
import com.google.inject.Inject;
import com.google.inject.name.Named;
import java.io.IOException;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.somda.sdc.common.logging.InstanceLogger;
import org.somda.sdc.dpws.soap.exception.TransportException;

/* loaded from: input_file:org/somda/sdc/dpws/udp/UdpMessageQueueServiceImpl.class */
public class UdpMessageQueueServiceImpl extends AbstractIdleService implements Service, UdpMessageQueueService {
    private static final Logger LOG = LogManager.getLogger(UdpMessageQueueServiceImpl.class);
    private static final AtomicInteger INSTANCE_ID_COUNTER = new AtomicInteger(0);
    private final EventBus eventBus;
    private final Logger instanceLogger;
    private Thread outgoingThread;
    private Thread incomingThread;
    private final int instanceId = INSTANCE_ID_COUNTER.getAndIncrement();
    private final LinkedBlockingDeque<UdpMessage> incomingMessageQueue = new LinkedBlockingDeque<>();
    private final LinkedBlockingDeque<UdpMessage> outgoingMessageQueue = new LinkedBlockingDeque<>();
    private UdpBindingService udpBinding = null;

    @Inject
    UdpMessageQueueServiceImpl(EventBus eventBus, @Named("Common.InstanceIdentifier") String str) {
        this.instanceLogger = InstanceLogger.wrapLogger(LOG, str);
        this.eventBus = eventBus;
    }

    @Override // org.somda.sdc.dpws.udp.UdpMessageQueueService
    public void setUdpBinding(UdpBindingService udpBindingService) {
        this.udpBinding = udpBindingService;
    }

    @Override // org.somda.sdc.dpws.udp.UdpMessageQueueService
    public boolean sendMessage(UdpMessage udpMessage) {
        return isRunning() && this.outgoingMessageQueue.offer(udpMessage);
    }

    @Override // org.somda.sdc.dpws.udp.UdpMessageQueueService
    public void registerUdpMessageQueueObserver(UdpMessageQueueObserver udpMessageQueueObserver) {
        this.eventBus.register(udpMessageQueueObserver);
    }

    @Override // org.somda.sdc.dpws.udp.UdpMessageQueueService
    public void unregisterUdpMessageQueueObserver(UdpMessageQueueObserver udpMessageQueueObserver) {
        this.eventBus.unregister(udpMessageQueueObserver);
    }

    protected void startUp() throws Exception {
        this.instanceLogger.info("[{}] Start UDP message queue for binding {}", Integer.valueOf(this.instanceId), this.udpBinding);
        if (this.udpBinding == null) {
            this.instanceLogger.warn("[{}] {}", Integer.valueOf(this.instanceId), "Cannot startup without UDP binding");
            throw new Exception("Cannot startup without UDP binding");
        }
        startProcessingOfIncomingMessages();
        startProcessingOfOutgoingMessages();
        this.instanceLogger.info("[{}] UDP message queue for binding {} is running", Integer.valueOf(this.instanceId), this.udpBinding);
    }

    private void startProcessingOfOutgoingMessages() {
        this.outgoingThread = new Thread(() -> {
            while (true) {
                try {
                    try {
                        try {
                            UdpMessage take = this.outgoingMessageQueue.take();
                            this.instanceLogger.trace("[{}] Outgoing UdpMessageQueueService received UDP message, sending: {}", Integer.valueOf(this.instanceId), take);
                            this.udpBinding.sendMessage(take);
                        } catch (IOException e) {
                            this.instanceLogger.warn("[{}] Outgoing UdpMessageQueueService IO exception caught", Integer.valueOf(this.instanceId), e);
                        }
                    } catch (InterruptedException e2) {
                        this.instanceLogger.info("[{}] Outgoing UdpMessageQueueService interrupted", Integer.valueOf(this.instanceId));
                        this.instanceLogger.trace("[{}] Outgoing UdpMessageQueueService interrupted", Integer.valueOf(this.instanceId), e2);
                        this.instanceLogger.info("[{}] Outgoing UdpMessageQueueService ended", Integer.valueOf(this.instanceId));
                        return;
                    } catch (TransportException e3) {
                        this.instanceLogger.info("[{}] Outgoing UdpMessageQueueService transport exception caught", Integer.valueOf(this.instanceId), e3);
                    }
                } catch (Throwable th) {
                    this.instanceLogger.info("[{}] Outgoing UdpMessageQueueService ended", Integer.valueOf(this.instanceId));
                    throw th;
                }
            }
        });
        this.outgoingThread.setName(String.format("[%s] Outgoing UdpMessageQueueService", Integer.valueOf(this.instanceId)));
        this.outgoingThread.setDaemon(true);
        this.outgoingThread.start();
    }

    private void startProcessingOfIncomingMessages() {
        this.incomingThread = new Thread(() -> {
            while (true) {
                try {
                    try {
                        UdpMessage take = this.incomingMessageQueue.take();
                        this.instanceLogger.trace("[{}] Incoming UdpMessageQueueService received UDP message, posting to EventBus: {}", Integer.valueOf(this.instanceId), take);
                        this.eventBus.post(take);
                    } catch (InterruptedException e) {
                        this.instanceLogger.info("[{}] Incoming UdpMessageQueueService interrupted", Integer.valueOf(this.instanceId));
                        this.instanceLogger.trace("[{}] Incoming UdpMessageQueueService interrupted", Integer.valueOf(this.instanceId), e);
                        this.instanceLogger.info("[{}] Incoming UdpMessageQueueService ended", Integer.valueOf(this.instanceId));
                        return;
                    } catch (Exception e2) {
                        this.instanceLogger.warn("[{}] Incoming UdpMessageQueueService encountered an error on event dissemination", Integer.valueOf(this.instanceId), e2);
                    }
                } catch (Throwable th) {
                    this.instanceLogger.info("[{}] Incoming UdpMessageQueueService ended", Integer.valueOf(this.instanceId));
                    throw th;
                }
            }
        });
        this.incomingThread.setName(String.format("[%s] Incoming UdpMessageQueueService", Integer.valueOf(this.instanceId)));
        this.incomingThread.setDaemon(true);
        this.incomingThread.start();
    }

    protected void shutDown() {
        this.instanceLogger.info("[{}] Shut down UDP message queue for binding {}", Integer.valueOf(this.instanceId), this.udpBinding);
        this.incomingMessageQueue.clear();
        this.outgoingMessageQueue.clear();
        this.incomingThread.interrupt();
        this.outgoingThread.interrupt();
        this.instanceLogger.info("[{}] UDP message queue for binding {} shut down", Integer.valueOf(this.instanceId), this.udpBinding);
    }

    @Override // org.somda.sdc.dpws.udp.UdpMessageReceiverCallback
    public void receive(UdpMessage udpMessage) {
        this.instanceLogger.debug("[{}] Received UDP message, adding to queue", Integer.valueOf(this.instanceId));
        if (this.incomingMessageQueue.offer(udpMessage)) {
            return;
        }
        this.instanceLogger.error("[{}] Lost incoming UDP message in message queue: {}", Integer.valueOf(this.instanceId), udpMessage);
    }
}
