package org.infinispan.client.hotrod.retry;

import io.netty.channel.Channel;
import java.net.InetSocketAddress;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.assertj.core.api.Assertions;
import org.infinispan.AdvancedCache;
import org.infinispan.client.hotrod.ProtocolVersion;
import org.infinispan.client.hotrod.RemoteCacheManager;
import org.infinispan.client.hotrod.annotation.ClientCacheEntryModified;
import org.infinispan.client.hotrod.annotation.ClientCacheFailover;
import org.infinispan.client.hotrod.annotation.ClientListener;
import org.infinispan.client.hotrod.event.ClientCacheEntryModifiedEvent;
import org.infinispan.client.hotrod.event.ClientCacheFailoverEvent;
import org.infinispan.client.hotrod.impl.protocol.Codec25;
import org.infinispan.client.hotrod.impl.transport.netty.ChannelRecord;
import org.infinispan.client.hotrod.test.NoopChannelOperation;
import org.infinispan.commands.write.PutKeyValueCommand;
import org.infinispan.commons.util.concurrent.AggregateCompletionStage;
import org.infinispan.commons.util.concurrent.CompletionStages;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.context.InvocationContext;
import org.infinispan.interceptors.DDAsyncInterceptor;
import org.infinispan.server.hotrod.test.HotRodTestingUtil;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.CleanupAfterMethod;
import org.testng.annotations.Test;

@CleanupAfterMethod
@Test(groups = {"functional"}, testName = "client.hotrod.retry.ClientListenerFailoverBusyTest")
/* loaded from: input_file:org/infinispan/client/hotrod/retry/ClientListenerFailoverBusyTest.class */
public class ClientListenerFailoverBusyTest extends AbstractRetryTest {
    private static final int MAX_PENDING_REQUESTS = 10;

    /* loaded from: input_file:org/infinispan/client/hotrod/retry/ClientListenerFailoverBusyTest$DelayedInterceptor.class */
    public static class DelayedInterceptor extends DDAsyncInterceptor {
        private final CyclicBarrier barrier;
        private final ExecutorService executor;

        public DelayedInterceptor(CyclicBarrier cyclicBarrier, ExecutorService executorService) {
            this.barrier = cyclicBarrier;
            this.executor = executorService;
        }

        public Object visitPutKeyValueCommand(InvocationContext invocationContext, PutKeyValueCommand putKeyValueCommand) throws Throwable {
            CompletableFuture completableFuture = new CompletableFuture();
            this.executor.submit(() -> {
                try {
                    this.barrier.await();
                    completableFuture.complete(super.visitPutKeyValueCommand(invocationContext, putKeyValueCommand));
                } catch (Throwable th) {
                    completableFuture.completeExceptionally(th);
                }
            });
            return asyncValue(completableFuture);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    @ClientListener
    /* loaded from: input_file:org/infinispan/client/hotrod/retry/ClientListenerFailoverBusyTest$Listener.class */
    public static class Listener {
        private final AtomicInteger count = new AtomicInteger(0);
        private final AtomicBoolean failover = new AtomicBoolean(false);

        private Listener() {
        }

        @ClientCacheEntryModified
        public void handleModifiedEvent(ClientCacheEntryModifiedEvent<?> clientCacheEntryModifiedEvent) {
            this.count.incrementAndGet();
        }

        @ClientCacheFailover
        public void handleFailoverEvent(ClientCacheFailoverEvent clientCacheFailoverEvent) {
            this.failover.set(true);
        }

        int getReceived() {
            return this.count.intValue();
        }

        boolean didFailover() {
            return this.failover.get();
        }
    }

    public ClientListenerFailoverBusyTest() {
        this.nbrOfServers = 1;
    }

    @Override // org.infinispan.client.hotrod.retry.AbstractRetryTest
    protected ConfigurationBuilder getCacheConfig() {
        return HotRodTestingUtil.hotRodCacheConfiguration(getDefaultClusteredCacheConfig(CacheMode.DIST_SYNC, false));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.infinispan.client.hotrod.HitsAwareCacheManagersTest
    public RemoteCacheManager createClient() {
        RemoteCacheManager createClient = super.createClient();
        createClient.getChannelFactory().setNegotiatedCodec(new Codec25());
        return createClient;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.infinispan.client.hotrod.retry.AbstractRetryTest
    public void amendRemoteCacheManagerConfiguration(org.infinispan.client.hotrod.configuration.ConfigurationBuilder configurationBuilder) {
        configurationBuilder.version(ProtocolVersion.PROTOCOL_VERSION_25).connectionPool().maxActive(2).maxPendingRequests(11);
    }

    public void testWithASingleOperation() throws Exception {
        testListenerWithSlowServer(1);
    }

    public void testWithMultipleOperations() throws Exception {
        testListenerWithSlowServer(MAX_PENDING_REQUESTS);
    }

    private void testListenerWithSlowServer(int i) throws Exception {
        AdvancedCache<?, ?> cacheToHit = cacheToHit(1);
        Channel channel = ((NoopChannelOperation) this.channelFactory.fetchChannelAndInvoke(InetSocketAddress.createUnresolved(this.hotRodServer1.getHost(), this.hotRodServer1.getPort().intValue()), new NoopChannelOperation())).get(10L, TimeUnit.SECONDS);
        ChannelRecord.of(channel).release(channel);
        Listener listener = new Listener();
        this.remoteCache.addClientListener(listener);
        ScheduledExecutorService newScheduledThreadPool = Executors.newScheduledThreadPool(i);
        CyclicBarrier cyclicBarrier = new CyclicBarrier(i + 1);
        TestingUtil.extractInterceptorChain(cacheToHit).addInterceptor(new DelayedInterceptor(cyclicBarrier, newScheduledThreadPool), 1);
        AggregateCompletionStage aggregateCompletionStage = CompletionStages.aggregateCompletionStage();
        for (int i2 = 0; i2 < i; i2++) {
            aggregateCompletionStage.dependsOn(this.remoteCache.putAsync(1, "v" + i2));
        }
        int received = listener.getReceived();
        Assertions.assertThat(received).isZero();
        Assertions.assertThat(listener.didFailover()).isFalse();
        channel.close().awaitUninterruptibly();
        eventually(() -> {
            return this.channelFactory.getNumActive() == 1;
        });
        Objects.requireNonNull(listener);
        eventually(listener::didFailover);
        try {
            cyclicBarrier.await(10L, TimeUnit.SECONDS);
            aggregateCompletionStage.freeze().toCompletableFuture().get(10L, TimeUnit.SECONDS);
            if (i > 1) {
                eventually(() -> {
                    return String.format("Never got more events: %d of %d", Integer.valueOf(received), Integer.valueOf(listener.getReceived()));
                }, () -> {
                    return listener.getReceived() > received;
                });
            }
        } finally {
            TestingUtil.extractInterceptorChain(cacheToHit).removeInterceptor(DelayedInterceptor.class);
            newScheduledThreadPool.shutdown();
        }
    }
}
