package io.github.mpecan.pmt.transport.zmq;

import io.github.mpecan.pmt.client.model.Message;
import io.github.mpecan.pmt.client.serialization.MessageSerializationService;
import io.github.mpecan.pmt.client.serialization.MessageSerializer;
import io.github.mpecan.pmt.discovery.PushpinDiscoveryManager;
import io.github.mpecan.pmt.model.PushpinServer;
import io.github.mpecan.pmt.transport.PushpinTransport;
import jakarta.annotation.PreDestroy;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import kotlin.Metadata;
import kotlin.collections.CollectionsKt;
import kotlin.jvm.functions.Function1;
import kotlin.jvm.internal.Intrinsics;
import kotlin.jvm.internal.SourceDebugExtension;
import kotlin.text.Charsets;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Scheduled;
import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

/* compiled from: ZmqTransport.kt */
@Metadata(mv = {2, 1, 0}, k = 1, xi = 48, d1 = {"��t\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\u0010\u000e\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0010\u0002\n��\n\u0002\u0018\u0002\n\u0002\u0010\u000b\n��\n\u0002\u0010 \n��\n\u0002\u0018\u0002\n\u0002\b\u0003\u0018��2\u00020\u0001B'\u0012\u0006\u0010\u0002\u001a\u00020\u0003\u0012\u0006\u0010\u0004\u001a\u00020\u0005\u0012\u0006\u0010\u0006\u001a\u00020\u0007\u0012\u0006\u0010\b\u001a\u00020\t¢\u0006\u0004\b\n\u0010\u000bJ\u0010\u0010\u0019\u001a\u00020\u00142\u0006\u0010\u001a\u001a\u00020\u001bH\u0002J\u0010\u0010\u001c\u001a\u00020\u00142\u0006\u0010\u001a\u001a\u00020\u001bH\u0002J\b\u0010\u001d\u001a\u00020\u001eH\u0007J$\u0010\u001f\u001a\b\u0012\u0004\u0012\u00020!0 2\f\u0010\"\u001a\b\u0012\u0004\u0012\u00020\u001b0#2\u0006\u0010$\u001a\u00020%H\u0002J\"\u0010&\u001a\b\u0012\u0004\u0012\u00020!0 2\f\u0010\"\u001a\b\u0012\u0004\u0012\u00020\u001b0#2\u0006\u0010$\u001a\u00020%J\b\u0010'\u001a\u00020\u001eH\u0007J\u0016\u0010&\u001a\b\u0012\u0004\u0012\u00020!0 2\u0006\u0010$\u001a\u00020%H\u0016R\u000e\u0010\u0002\u001a\u00020\u0003X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0004\u001a\u00020\u0005X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0006\u001a\u00020\u0007X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\b\u001a\u00020\tX\u0082\u0004¢\u0006\u0002\n��R\u0016\u0010\f\u001a\n \u000e*\u0004\u0018\u00010\r0\rX\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u000f\u001a\u00020\u0010X\u0082\u0004¢\u0006\u0002\n��R\u001a\u0010\u0011\u001a\u000e\u0012\u0004\u0012\u00020\u0013\u0012\u0004\u0012\u00020\u00140\u0012X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0015\u001a\u00020\u0016X\u0082\u0004¢\u0006\u0002\n��R\u0016\u0010\u0017\u001a\n \u000e*\u0004\u0018\u00010\u00180\u0018X\u0082\u0004¢\u0006\u0002\n��¨\u0006("}, d2 = {"Lio/github/mpecan/pmt/transport/zmq/ZmqTransport;", "Lio/github/mpecan/pmt/transport/PushpinTransport;", "zmqProperties", "Lio/github/mpecan/pmt/transport/zmq/ZmqTransportProperties;", "messageSerializer", "Lio/github/mpecan/pmt/client/serialization/MessageSerializer;", "messageSerializationService", "Lio/github/mpecan/pmt/client/serialization/MessageSerializationService;", "discoveryManager", "Lio/github/mpecan/pmt/discovery/PushpinDiscoveryManager;", "<init>", "(Lio/github/mpecan/pmt/transport/zmq/ZmqTransportProperties;Lio/github/mpecan/pmt/client/serialization/MessageSerializer;Lio/github/mpecan/pmt/client/serialization/MessageSerializationService;Lio/github/mpecan/pmt/discovery/PushpinDiscoveryManager;)V", "logger", "Lorg/slf4j/Logger;", "kotlin.jvm.PlatformType", "context", "Lorg/zeromq/ZContext;", "sockets", "Ljava/util/concurrent/ConcurrentHashMap;", "", "Lorg/zeromq/ZMQ$Socket;", "lock", "Ljava/util/concurrent/locks/ReentrantReadWriteLock;", "executor", "Ljava/util/concurrent/ExecutorService;", "getSocket", "server", "Lio/github/mpecan/pmt/model/PushpinServer;", "createSocket", "refreshConnectionPool", "", "publishToServers", "Lreactor/core/publisher/Mono;", "", "servers", "", "message", "Lio/github/mpecan/pmt/client/model/Message;", "publish", "close", "pushpin-transport-zmq"})
@SourceDebugExtension({"SMAP\nZmqTransport.kt\nKotlin\n*S Kotlin\n*F\n+ 1 ZmqTransport.kt\nio/github/mpecan/pmt/transport/zmq/ZmqTransport\n+ 2 _Collections.kt\nkotlin/collections/CollectionsKt___CollectionsKt\n+ 3 _Maps.kt\nkotlin/collections/MapsKt___MapsKt\n*L\n1#1,228:1\n774#2:229\n865#2,2:230\n1869#2,2:232\n1563#2:238\n1634#2,3:239\n1740#2,3:242\n216#3,2:234\n216#3,2:236\n*S KotlinDebug\n*F\n+ 1 ZmqTransport.kt\nio/github/mpecan/pmt/transport/zmq/ZmqTransport\n*L\n108#1:229\n108#1:230,2\n110#1:232,2\n146#1:238\n146#1:239,3\n175#1:242,3\n116#1:234,2\n207#1:236,2\n*E\n"})
/* loaded from: input_file:io/github/mpecan/pmt/transport/zmq/ZmqTransport.class */
public final class ZmqTransport implements PushpinTransport {

    @NotNull
    private final ZmqTransportProperties zmqProperties;

    @NotNull
    private final MessageSerializer messageSerializer;

    @NotNull
    private final MessageSerializationService messageSerializationService;

    @NotNull
    private final PushpinDiscoveryManager discoveryManager;
    private final Logger logger;

    @NotNull
    private final ZContext context;

    @NotNull
    private final ConcurrentHashMap<String, ZMQ.Socket> sockets;

    @NotNull
    private final ReentrantReadWriteLock lock;
    private final ExecutorService executor;

    public ZmqTransport(@NotNull ZmqTransportProperties zmqTransportProperties, @NotNull MessageSerializer messageSerializer, @NotNull MessageSerializationService messageSerializationService, @NotNull PushpinDiscoveryManager pushpinDiscoveryManager) {
        Intrinsics.checkNotNullParameter(zmqTransportProperties, "zmqProperties");
        Intrinsics.checkNotNullParameter(messageSerializer, "messageSerializer");
        Intrinsics.checkNotNullParameter(messageSerializationService, "messageSerializationService");
        Intrinsics.checkNotNullParameter(pushpinDiscoveryManager, "discoveryManager");
        this.zmqProperties = zmqTransportProperties;
        this.messageSerializer = messageSerializer;
        this.messageSerializationService = messageSerializationService;
        this.discoveryManager = pushpinDiscoveryManager;
        this.logger = LoggerFactory.getLogger(ZmqTransport.class);
        this.context = new ZContext();
        this.sockets = new ConcurrentHashMap<>();
        this.lock = new ReentrantReadWriteLock();
        this.executor = Executors.newCachedThreadPool();
        this.logger.info("Initializing ZMQ transport with PUSH socket type and connection pool: " + this.zmqProperties.getConnectionPoolEnabled());
    }

    private final ZMQ.Socket getSocket(PushpinServer pushpinServer) {
        String id = pushpinServer.getId();
        if (!this.zmqProperties.getConnectionPoolEnabled()) {
            this.logger.debug("Connection pooling disabled, creating new socket for server: " + id);
            return createSocket(pushpinServer);
        }
        ConcurrentHashMap<String, ZMQ.Socket> concurrentHashMap = this.sockets;
        Function1 function1 = (v3) -> {
            return getSocket$lambda$0(r2, r3, r4, v3);
        };
        ZMQ.Socket computeIfAbsent = concurrentHashMap.computeIfAbsent(id, (v1) -> {
            return getSocket$lambda$1(r2, v1);
        });
        Intrinsics.checkNotNullExpressionValue(computeIfAbsent, "computeIfAbsent(...)");
        return computeIfAbsent;
    }

    private final ZMQ.Socket createSocket(PushpinServer pushpinServer) {
        String publishUrl = pushpinServer.getPublishUrl();
        ZMQ.Socket createSocket = this.context.createSocket(SocketType.PUSH);
        createSocket.setHWM(this.zmqProperties.getHwm());
        createSocket.setLinger(this.zmqProperties.getLinger());
        createSocket.setSendTimeOut(this.zmqProperties.getSendTimeout());
        createSocket.setReconnectIVL(this.zmqProperties.getReconnectIvl());
        createSocket.setReconnectIVLMax(this.zmqProperties.getReconnectIvlMax());
        this.logger.info("Connecting to ZMQ publish socket at: " + publishUrl + " for server ID: " + pushpinServer.getId());
        try {
            createSocket.connect(publishUrl);
            this.logger.info("Successfully connected to ZMQ publish socket at: " + publishUrl);
        } catch (Exception e) {
            this.logger.error("Failed to connect to ZMQ publish socket at: " + publishUrl, e);
        }
        Intrinsics.checkNotNull(createSocket);
        return createSocket;
    }

    @Scheduled(fixedDelayString = "${pushpin.transport.zmq.connectionPoolRefreshInterval:60000}")
    public final void refreshConnectionPool() {
        if (this.zmqProperties.getConnectionPoolEnabled()) {
            this.logger.debug("Refreshing ZMQ connection pool");
            this.lock.writeLock().lock();
            try {
                Set<String> keySet = this.sockets.keySet();
                Intrinsics.checkNotNullExpressionValue(keySet, "<get-keys>(...)");
                Set set = CollectionsKt.toSet(keySet);
                ArrayList<String> arrayList = new ArrayList();
                for (Object obj : set) {
                    if (0 != 0) {
                        arrayList.add(obj);
                    }
                }
                for (String str : arrayList) {
                    this.logger.info("Removing stale ZMQ socket for server: " + str);
                    ZMQ.Socket remove = this.sockets.remove(str);
                    if (remove != null) {
                        remove.close();
                    }
                }
                Iterator<Map.Entry<String, ZMQ.Socket>> it = this.sockets.entrySet().iterator();
                while (it.hasNext()) {
                    this.logger.debug("Connection pool contains socket for server: " + it.next().getKey());
                }
            } finally {
                this.lock.writeLock().unlock();
            }
        }
    }

    private final Mono<Boolean> publishToServers(List<PushpinServer> list, Message message) {
        if (list.isEmpty()) {
            this.logger.warn("No servers to publish to");
            Mono<Boolean> just = Mono.just(false);
            Intrinsics.checkNotNullExpressionValue(just, "just(...)");
            return just;
        }
        this.logger.debug("Publishing message to channel: " + message.getChannel() + " on " + list.size() + " servers");
        byte[] bytes = ("J" + this.messageSerializationService.serialize(this.messageSerializer.serialize(message))).getBytes(Charsets.UTF_8);
        Intrinsics.checkNotNullExpressionValue(bytes, "getBytes(...)");
        Mono<Boolean> subscribeOn = Mono.fromCallable(() -> {
            return publishToServers$lambda$7(r0, r1, r2, r3);
        }).subscribeOn(Schedulers.boundedElastic());
        Intrinsics.checkNotNullExpressionValue(subscribeOn, "subscribeOn(...)");
        return subscribeOn;
    }

    @NotNull
    public final Mono<Boolean> publish(@NotNull List<PushpinServer> list, @NotNull Message message) {
        Intrinsics.checkNotNullParameter(list, "servers");
        Intrinsics.checkNotNullParameter(message, "message");
        return publishToServers(list, message);
    }

    @PreDestroy
    public final void close() {
        this.logger.info("Closing ZMQ transport");
        this.executor.shutdown();
        try {
            if (!this.executor.awaitTermination(5L, TimeUnit.SECONDS)) {
                this.executor.shutdownNow();
            }
        } catch (InterruptedException e) {
            this.executor.shutdownNow();
            Thread.currentThread().interrupt();
        }
        this.lock.writeLock().lock();
        try {
            for (Map.Entry<String, ZMQ.Socket> entry : this.sockets.entrySet()) {
                String key = entry.getKey();
                ZMQ.Socket value = entry.getValue();
                this.logger.debug("Closing ZMQ socket for server: " + key);
                value.close();
            }
            this.sockets.clear();
            this.lock.writeLock().unlock();
            this.context.close();
        } catch (Throwable th) {
            this.lock.writeLock().unlock();
            throw th;
        }
    }

    /* JADX WARN: Code restructure failed: missing block: B:4:0x0013, code lost:
    
        if (r0 == null) goto L7;
     */
    @org.jetbrains.annotations.NotNull
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public reactor.core.publisher.Mono<java.lang.Boolean> publish(@org.jetbrains.annotations.NotNull io.github.mpecan.pmt.client.model.Message r5) {
        /*
            r4 = this;
            r0 = r5
            java.lang.String r1 = "message"
            kotlin.jvm.internal.Intrinsics.checkNotNullParameter(r0, r1)
            r0 = r4
            io.github.mpecan.pmt.discovery.PushpinDiscoveryManager r0 = r0.discoveryManager
            r1 = r0
            if (r1 == 0) goto L16
            java.util.List r0 = r0.getAllServers()
            r1 = r0
            if (r1 != 0) goto L1a
        L16:
        L17:
            java.util.List r0 = kotlin.collections.CollectionsKt.emptyList()
        L1a:
            r6 = r0
            r0 = r4
            r1 = r6
            r2 = r5
            reactor.core.publisher.Mono r0 = r0.publishToServers(r1, r2)
            return r0
        */
        throw new UnsupportedOperationException("Method not decompiled: io.github.mpecan.pmt.transport.zmq.ZmqTransport.publish(io.github.mpecan.pmt.client.model.Message):reactor.core.publisher.Mono");
    }

    private static final ZMQ.Socket getSocket$lambda$0(ZmqTransport zmqTransport, String str, PushpinServer pushpinServer, String str2) {
        Intrinsics.checkNotNullParameter(str2, "it");
        zmqTransport.logger.info("Creating new pooled socket for server: " + str);
        return zmqTransport.createSocket(pushpinServer);
    }

    private static final ZMQ.Socket getSocket$lambda$1(Function1 function1, Object obj) {
        return (ZMQ.Socket) function1.invoke(obj);
    }

    private static final Boolean publishToServers$lambda$7(List list, ZmqTransport zmqTransport, Message message, byte[] bArr) {
        boolean z;
        boolean z2;
        boolean z3;
        List<PushpinServer> list2 = list;
        ArrayList arrayList = new ArrayList(CollectionsKt.collectionSizeOrDefault(list2, 10));
        for (PushpinServer pushpinServer : list2) {
            try {
                ZMQ.Socket socket = zmqTransport.getSocket(pushpinServer);
                zmqTransport.logger.debug("Publishing to channel: '" + message.getChannel() + "' via ZMQ PUSH socket to server: " + pushpinServer.getId());
                if (socket.send(bArr, 0)) {
                    zmqTransport.logger.debug("Successfully published message to server " + pushpinServer.getId() + " on channel: " + message.getChannel());
                    z3 = true;
                } else {
                    zmqTransport.logger.error("Failed to send message to server " + pushpinServer.getId() + " on channel: " + message.getChannel());
                    if (!zmqTransport.zmqProperties.getConnectionPoolEnabled()) {
                        socket.close();
                    }
                    z3 = false;
                }
                z2 = z3;
            } catch (Exception e) {
                zmqTransport.logger.error("Failed to publish message to server " + pushpinServer.getId() + ": " + e.getMessage(), e);
                z2 = false;
            }
            arrayList.add(Boolean.valueOf(z2));
        }
        ArrayList arrayList2 = arrayList;
        if (!(arrayList2 instanceof Collection) || !arrayList2.isEmpty()) {
            Iterator it = arrayList2.iterator();
            while (true) {
                if (!it.hasNext()) {
                    z = true;
                    break;
                }
                if (!((Boolean) it.next()).booleanValue()) {
                    z = false;
                    break;
                }
            }
        } else {
            z = true;
        }
        return Boolean.valueOf(z);
    }
}
