package org.infinispan.server.resp.commands.list.blocking;

import io.netty.channel.ChannelHandlerContext;
import java.lang.invoke.MethodHandles;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import org.infinispan.AdvancedCache;
import org.infinispan.commons.logging.LogFactory;
import org.infinispan.commons.marshall.WrappedByteArray;
import org.infinispan.encoding.DataConversion;
import org.infinispan.multimap.impl.EmbeddedMultimapListCache;
import org.infinispan.multimap.impl.ListBucket;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryCreated;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryModified;
import org.infinispan.notifications.cachelistener.event.CacheEntryEvent;
import org.infinispan.server.resp.Resp3Handler;
import org.infinispan.server.resp.RespCommand;
import org.infinispan.server.resp.RespRequestHandler;
import org.infinispan.server.resp.RespUtil;
import org.infinispan.server.resp.commands.Resp3Command;
import org.infinispan.server.resp.filter.EventListenerConverter;
import org.infinispan.server.resp.filter.EventListenerKeysFilter;
import org.infinispan.server.resp.logging.Log;
import org.infinispan.server.resp.meta.ClientMetadata;
import org.infinispan.server.resp.serialization.ResponseWriter;
import org.infinispan.server.resp.tx.TransactionContext;

/* loaded from: input_file:org/infinispan/server/resp/commands/list/blocking/AbstractBlockingPop.class */
public abstract class AbstractBlockingPop extends RespCommand implements Resp3Command {
    private static final Log log = (Log) LogFactory.getLog(MethodHandles.lookup().lookupClass(), Log.class);

    /* loaded from: input_file:org/infinispan/server/resp/commands/list/blocking/AbstractBlockingPop$PollListenerSynchronizer.class */
    public static class PollListenerSynchronizer {
        private final EmbeddedMultimapListCache<byte[], byte[]> multimapList;
        private volatile boolean canPollJustEventKey;
        private final PopConfiguration configuration;
        private final ArrayDeque<Object> keyQueue = new ArrayDeque<>();
        private final CompletableFuture<Collection<byte[]>> resultFuture = new CompletableFuture<>();
        private final BiConsumer<? super Collection<byte[]>, ? super Throwable> whenCompleteConsumer = (collection, th) -> {
            Object peek;
            if (th != null) {
                if (RespUtil.isWrongTypeError(th)) {
                    return;
                }
                this.resultFuture.completeExceptionally(th);
            } else {
                if (collection != null && !collection.isEmpty()) {
                    this.resultFuture.complete(collection);
                    return;
                }
                synchronized (this) {
                    if (this.keyQueue.poll() == this) {
                        this.canPollJustEventKey = true;
                    }
                    peek = this.keyQueue.peek();
                }
                if (peek != null) {
                    runPoll(peek);
                }
            }
        };

        private PollListenerSynchronizer(EmbeddedMultimapListCache<byte[], byte[]> embeddedMultimapListCache, PopConfiguration popConfiguration) {
            this.multimapList = embeddedMultimapListCache;
            this.configuration = popConfiguration;
        }

        private void runPoll(Object obj) {
            if (!this.canPollJustEventKey || obj == this) {
                AbstractBlockingPop.pollAllKeys(this.multimapList, this.configuration).whenComplete(this.whenCompleteConsumer);
            } else {
                AbstractBlockingPop.pollKeyValue(this.multimapList, (byte[]) obj, this.configuration).whenComplete(this.whenCompleteConsumer);
            }
        }

        private void onListenerAdded() {
            boolean isEmpty;
            synchronized (this) {
                isEmpty = this.keyQueue.isEmpty();
                this.keyQueue.offer(this);
            }
            if (isEmpty) {
                AbstractBlockingPop.pollAllKeys(this.multimapList, this.configuration).whenComplete(this.whenCompleteConsumer);
            }
        }

        private void onEvent(byte[] bArr) {
            boolean isEmpty;
            synchronized (this) {
                isEmpty = this.keyQueue.isEmpty();
                this.keyQueue.offer(bArr);
            }
            if (isEmpty) {
                runPoll(bArr);
            }
        }
    }

    @Listener(clustered = true)
    /* loaded from: input_file:org/infinispan/server/resp/commands/list/blocking/AbstractBlockingPop$PubSubListener.class */
    public static class PubSubListener {
        private final AdvancedCache<byte[], Object> cache;
        private volatile ScheduledFuture<?> scheduledTimer;
        private final Resp3Handler handler;
        private final PollListenerSynchronizer synchronizer;

        private PubSubListener(Resp3Handler resp3Handler, AdvancedCache<byte[], Object> advancedCache, PopConfiguration popConfiguration) {
            this.cache = advancedCache;
            this.handler = resp3Handler;
            this.synchronizer = new PollListenerSynchronizer(resp3Handler.getListMultimap(), popConfiguration);
            this.synchronizer.resultFuture.whenComplete((collection, th) -> {
                deleteTimer();
                advancedCache.removeListenerAsync(this);
            });
        }

        public CompletableFuture<Collection<byte[]>> getFuture() {
            return this.synchronizer.resultFuture;
        }

        private void startTimer(long j) {
            deleteTimer();
            this.scheduledTimer = j > 0 ? this.handler.getScheduler().schedule(() -> {
                this.cache.removeListenerAsync(this);
                this.synchronizer.resultFuture.complete(null);
            }, j, TimeUnit.MILLISECONDS) : null;
        }

        private void deleteTimer() {
            if (this.scheduledTimer != null) {
                this.scheduledTimer.cancel(true);
            }
            this.scheduledTimer = null;
        }

        @CacheEntryCreated
        @CacheEntryModified
        public void onEvent(CacheEntryEvent<Object, Object> cacheEntryEvent) {
            try {
                if (cacheEntryEvent.getValue() instanceof ListBucket) {
                    this.synchronizer.onEvent(unwrapKey(cacheEntryEvent.getKey()));
                }
            } catch (Exception e) {
                this.synchronizer.resultFuture.completeExceptionally(e);
            }
        }

        private byte[] unwrapKey(Object obj) {
            return obj instanceof WrappedByteArray ? ((WrappedByteArray) obj).getBytes() : (byte[]) obj;
        }
    }

    public AbstractBlockingPop(int i, int i2, int i3, int i4, long j) {
        super(i, i2, i3, i4, j);
    }

    abstract PopConfiguration parseArguments(Resp3Handler resp3Handler, List<byte[]> list);

    @Override // org.infinispan.server.resp.commands.Resp3Command
    public CompletionStage<RespRequestHandler> perform(Resp3Handler resp3Handler, ChannelHandlerContext channelHandlerContext, List<byte[]> list) {
        EmbeddedMultimapListCache<byte[], byte[]> listMultimap = resp3Handler.getListMultimap();
        PopConfiguration parseArguments = parseArguments(resp3Handler, list);
        if (parseArguments == null) {
            return resp3Handler.myStage();
        }
        CompletionStage<Collection<byte[]>> pollAllKeys = pollAllKeys(listMultimap, parseArguments);
        return TransactionContext.isInTransactionContext(channelHandlerContext) ? resp3Handler.stageToReturn(pollAllKeys, channelHandlerContext, ResponseWriter.ARRAY_BULK_STRING) : resp3Handler.stageToReturn((CompletionStage) pollAllKeys.thenCompose(collection -> {
            return (collection == null || collection.isEmpty()) ? addSubscriber(parseArguments, resp3Handler) : CompletableFuture.completedFuture(collection);
        }), channelHandlerContext, (BiConsumer) ResponseWriter.ARRAY_BULK_STRING);
    }

    private CompletableFuture<Collection<byte[]>> addSubscriber(PopConfiguration popConfiguration, Resp3Handler resp3Handler) {
        if (log.isTraceEnabled()) {
            log.tracef("Subscriber for keys: " + String.valueOf(popConfiguration.keys()), new Object[0]);
        }
        AdvancedCache typedCache = resp3Handler.typedCache(null);
        DataConversion valueDataConversion = typedCache.getValueDataConversion();
        PubSubListener pubSubListener = new PubSubListener(resp3Handler, typedCache, popConfiguration);
        typedCache.addListenerAsync(pubSubListener, new EventListenerKeysFilter(popConfiguration.keys().stream()), new EventListenerConverter(valueDataConversion)).whenComplete((r6, th) -> {
            if (th != null) {
                pubSubListener.synchronizer.resultFuture.completeExceptionally(th);
            } else {
                pubSubListener.startTimer(popConfiguration.timeout());
                pubSubListener.synchronizer.onListenerAdded();
            }
        });
        ClientMetadata client = resp3Handler.respServer().metadataRepository().client();
        client.incrementBlockedClients();
        client.recordBlockedKeys(popConfiguration.keys().size());
        pubSubListener.getFuture().whenComplete((collection, th2) -> {
            client.decrementBlockedClients();
            client.recordBlockedKeys(-popConfiguration.keys().size());
        });
        return pubSubListener.getFuture();
    }

    private static CompletionStage<Collection<byte[]>> pollAllKeys(EmbeddedMultimapListCache<byte[], byte[]> embeddedMultimapListCache, PopConfiguration popConfiguration) {
        CompletionStage<Collection<byte[]>> pollKeyValue = pollKeyValue(embeddedMultimapListCache, popConfiguration.key(0), popConfiguration);
        for (int i = 1; i < popConfiguration.keys().size(); i++) {
            byte[] key = popConfiguration.key(i);
            pollKeyValue = pollKeyValue.thenCompose(collection -> {
                return (collection == null || collection.isEmpty()) ? pollKeyValue(embeddedMultimapListCache, key, popConfiguration) : CompletableFuture.completedFuture(collection);
            });
        }
        return pollKeyValue;
    }

    static CompletionStage<Collection<byte[]>> pollKeyValue(EmbeddedMultimapListCache<byte[], byte[]> embeddedMultimapListCache, byte[] bArr, PopConfiguration popConfiguration) {
        return (popConfiguration.isHead() ? embeddedMultimapListCache.pollFirst(bArr, popConfiguration.count()) : embeddedMultimapListCache.pollLast(bArr, popConfiguration.count())).thenApply(collection -> {
            if (collection == null || collection.isEmpty()) {
                return null;
            }
            ArrayList arrayList = new ArrayList(1 + collection.size());
            arrayList.add(bArr);
            arrayList.addAll(collection);
            return arrayList;
        });
    }
}
