package org.infinispan.server.resp;

import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.AttributeKey;
import io.netty.util.CharsetUtil;
import java.lang.invoke.MethodHandles;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletionStage;
import org.infinispan.commons.logging.LogFactory;
import org.infinispan.commons.marshall.WrappedByteArray;
import org.infinispan.commons.util.concurrent.AggregateCompletionStage;
import org.infinispan.commons.util.concurrent.CompletableFutures;
import org.infinispan.commons.util.concurrent.CompletionStages;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryCreated;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryModified;
import org.infinispan.notifications.cachelistener.event.CacheEntryEvent;
import org.infinispan.server.resp.commands.PubSubResp3Command;
import org.infinispan.server.resp.commands.pubsub.KeyChannelUtils;
import org.infinispan.server.resp.commands.pubsub.RespCacheListener;
import org.infinispan.server.resp.logging.Log;
import org.infinispan.server.resp.meta.ClientMetadata;
import org.infinispan.server.resp.serialization.Resp3Type;
import org.infinispan.server.resp.serialization.bytebuf.ByteBufResponseWriter;
import org.infinispan.server.resp.serialization.bytebuf.ByteBufferUtils;

/* loaded from: input_file:org/infinispan/server/resp/SubscriberHandler.class */
public class SubscriberHandler extends CacheRespRequestHandler {
    private static final Log log;
    private static final AttributeKey<Long> SUBSCRIPTIONS_COUNTER;
    private final Resp3Handler resp3Handler;
    private final Map<WrappedByteArray, RespCacheListener> specificChannelSubscribers;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:org/infinispan/server/resp/SubscriberHandler$PubSubEvents.class */
    private static final class PubSubEvents {
        private static final byte[] SUBSCRIBE = "subscribe".getBytes(StandardCharsets.US_ASCII);
        private static final byte[] UNSUBSCRIBE = "unsubscribe".getBytes(StandardCharsets.US_ASCII);
        private static final byte[] MESSAGE = "message".getBytes(StandardCharsets.US_ASCII);

        private PubSubEvents() {
        }
    }

    @Listener(clustered = true)
    /* loaded from: input_file:org/infinispan/server/resp/SubscriberHandler$PubSubListener.class */
    public static class PubSubListener implements RespCacheListener {
        private final Channel channel;
        private final byte[] key;
        private final byte[] pattern;
        static final /* synthetic */ boolean $assertionsDisabled;

        private PubSubListener(Channel channel, byte[] bArr) {
            this(channel, bArr, null);
        }

        private PubSubListener(Channel channel, byte[] bArr, byte[] bArr2) {
            this.channel = channel;
            this.key = bArr;
            this.pattern = bArr2;
        }

        @CacheEntryCreated
        @CacheEntryModified
        public CompletionStage<Void> onEvent(CacheEntryEvent<Object, byte[]> cacheEntryEvent) {
            byte[] channelToKey = KeyChannelUtils.channelToKey(unwrap(cacheEntryEvent.getKey()));
            byte[] bArr = (byte[]) cacheEntryEvent.getValue();
            if (channelToKey.length > 0 && bArr != null && bArr.length > 0) {
                int stringSize = 18 + ByteBufferUtils.stringSize(channelToKey.length) + 2 + channelToKey.length + 2 + 1 + ByteBufferUtils.stringSize(bArr.length) + 2 + bArr.length + 2;
                ByteBuf buffer = this.channel.alloc().buffer(stringSize, stringSize);
                new ByteBufResponseWriter(i -> {
                    return buffer;
                }).array(List.of(PubSubEvents.MESSAGE, channelToKey, bArr), Resp3Type.BULK_STRING);
                if (!$assertionsDisabled && buffer.writerIndex() != stringSize) {
                    throw new AssertionError();
                }
                this.channel.writeAndFlush(buffer, this.channel.voidPromise());
            }
            return CompletableFutures.completedNull();
        }

        private byte[] unwrap(Object obj) {
            return obj instanceof WrappedByteArray ? ((WrappedByteArray) obj).getBytes() : (byte[]) obj;
        }

        @Override // org.infinispan.server.resp.commands.pubsub.RespCacheListener
        public byte[] subscribedChannel() {
            return this.key;
        }

        @Override // org.infinispan.server.resp.commands.pubsub.RespCacheListener
        public byte[] pattern() {
            return this.pattern;
        }

        static {
            $assertionsDisabled = !SubscriberHandler.class.desiredAssertionStatus();
        }
    }

    public SubscriberHandler(RespServer respServer, Resp3Handler resp3Handler) {
        super(respServer, resp3Handler.cache());
        this.specificChannelSubscribers = new HashMap();
        this.resp3Handler = resp3Handler;
    }

    public static RespCacheListener newKeyListener(Channel channel, byte[] bArr) {
        return new PubSubListener(channel, bArr);
    }

    public static RespCacheListener newPatternListener(Channel channel, byte[] bArr) {
        return new PubSubListener(channel, null, bArr);
    }

    public Map<WrappedByteArray, RespCacheListener> specificChannelSubscribers() {
        return this.specificChannelSubscribers;
    }

    public Resp3Handler resp3Handler() {
        return this.resp3Handler;
    }

    @Override // org.infinispan.server.resp.RespRequestHandler
    public void handleChannelDisconnect(ChannelHandlerContext channelHandlerContext) {
        removeAllListeners();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.infinispan.server.resp.RespRequestHandler
    public CompletionStage<RespRequestHandler> actualHandleRequest(ChannelHandlerContext channelHandlerContext, RespCommand respCommand, List<byte[]> list) {
        initializeIfNecessary(channelHandlerContext);
        return respCommand instanceof PubSubResp3Command ? ((PubSubResp3Command) respCommand).perform(this, channelHandlerContext, list) : super.actualHandleRequest(channelHandlerContext, respCommand, list);
    }

    public CompletionStage<Void> handleStageListenerError(CompletionStage<Void> completionStage, byte[] bArr, boolean z) {
        return completionStage.whenComplete((r7, th) -> {
            if (th != null) {
                if (z) {
                    log.exceptionWhileRegisteringListener(th, CharsetUtil.UTF_8.decode(ByteBuffer.wrap(bArr)));
                } else {
                    log.exceptionWhileRemovingListener(th, CharsetUtil.UTF_8.decode(ByteBuffer.wrap(bArr)));
                }
            }
        });
    }

    public void removeAllListeners() {
        Iterator<Map.Entry<WrappedByteArray, RespCacheListener>> it = this.specificChannelSubscribers.entrySet().iterator();
        while (it.hasNext()) {
            this.cache.removeListenerAsync(it.next().getValue());
            it.remove();
        }
    }

    public CompletionStage<RespRequestHandler> unsubscribeAll(ChannelHandlerContext channelHandlerContext) {
        ClientMetadata client = respServer().metadataRepository().client();
        AggregateCompletionStage aggregateCompletionStage = CompletionStages.aggregateCompletionStage();
        ArrayList arrayList = new ArrayList(this.specificChannelSubscribers.size());
        Iterator<Map.Entry<WrappedByteArray, RespCacheListener>> it = this.specificChannelSubscribers.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<WrappedByteArray, RespCacheListener> next = it.next();
            CompletionStage<Void> removeListenerAsync = this.cache.removeListenerAsync(next.getValue());
            byte[] bytes = next.getKey().getBytes();
            arrayList.add(bytes);
            aggregateCompletionStage.dependsOn(handleStageListenerError(removeListenerAsync, bytes, false));
            it.remove();
            client.decrementPubSubClients();
        }
        return sendSubscriptions(channelHandlerContext, aggregateCompletionStage.freeze(), arrayList, false);
    }

    public CompletionStage<RespRequestHandler> sendSubscriptions(ChannelHandlerContext channelHandlerContext, CompletionStage<Void> completionStage, Collection<byte[]> collection, boolean z) {
        return stageToReturn(completionStage, channelHandlerContext, (r11, responseWriter) -> {
            if (!$assertionsDisabled && !channelHandlerContext.executor().inEventLoop()) {
                throw new AssertionError();
            }
            Long l = (Long) channelHandlerContext.channel().attr(SUBSCRIPTIONS_COUNTER).get();
            if (l == null) {
                l = 0L;
            }
            byte[] bArr = z ? PubSubEvents.SUBSCRIBE : PubSubEvents.UNSUBSCRIBE;
            Iterator it = collection.iterator();
            while (it.hasNext()) {
                byte[] bArr2 = (byte[]) it.next();
                l = Long.valueOf(Math.max(0L, l.longValue() + (z ? 1 : -1)));
                this.writer.array(List.of(bArr, bArr2, Long.valueOf(l.longValue())), (serializable, responseWriter) -> {
                    if (serializable instanceof byte[]) {
                        responseWriter.string((byte[]) serializable);
                    } else {
                        responseWriter.integers((Number) serializable);
                    }
                });
            }
            if (l.longValue() == 0) {
                channelHandlerContext.channel().attr(SUBSCRIPTIONS_COUNTER).set((Object) null);
            } else {
                channelHandlerContext.channel().attr(SUBSCRIPTIONS_COUNTER).set(l);
            }
        });
    }

    static {
        $assertionsDisabled = !SubscriberHandler.class.desiredAssertionStatus();
        log = (Log) LogFactory.getLog(MethodHandles.lookup().lookupClass(), Log.class);
        SUBSCRIPTIONS_COUNTER = AttributeKey.newInstance("channel-subscriptions");
    }
}
