package org.infinispan.client.hotrod.impl.transport.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.assertj.core.api.Assertions;
import org.infinispan.client.hotrod.DataFormat;
import org.infinispan.client.hotrod.RemoteCacheManager;
import org.infinispan.client.hotrod.configuration.Configuration;
import org.infinispan.client.hotrod.impl.ClientTopology;
import org.infinispan.client.hotrod.impl.consistenthash.ConsistentHash;
import org.infinispan.client.hotrod.impl.operations.HotRodOperation;
import org.infinispan.client.hotrod.impl.operations.RetryOnFailureOperation;
import org.infinispan.client.hotrod.impl.protocol.Codec;
import org.infinispan.client.hotrod.impl.protocol.CodecHolder;
import org.infinispan.client.hotrod.metrics.RemoteCacheManagerMetricsRegistry;
import org.infinispan.client.hotrod.retry.AbstractRetryTest;
import org.infinispan.client.hotrod.telemetry.impl.TelemetryService;
import org.infinispan.client.hotrod.test.HotRodClientTestingUtil;
import org.infinispan.client.hotrod.test.InternalRemoteCacheManager;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.server.hotrod.test.HotRodTestingUtil;
import org.infinispan.test.fwk.CheckPoint;
import org.infinispan.test.fwk.CleanupAfterMethod;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;

@CleanupAfterMethod
@Test(testName = "client.hotrod.impl.transport.netty.CloseBeforeEnqueuingTest", groups = {"functional"})
/* loaded from: input_file:org/infinispan/client/hotrod/impl/transport/netty/CloseBeforeEnqueuingTest.class */
public class CloseBeforeEnqueuingTest extends AbstractRetryTest {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/infinispan/client/hotrod/impl/transport/netty/CloseBeforeEnqueuingTest$ControlledChannelOperation.class */
    public static class ControlledChannelOperation extends RetryOnFailureOperation<Void> {
        private static final String BEFORE_SCHEDULE_READ = "before-schedule-read-";
        private static final String PROCEED_SCHEDULE_READ = "proceed-schedule-read-";
        private final AtomicInteger counter;
        private final CheckPoint checkPoint;

        protected ControlledChannelOperation(ChannelFactory channelFactory, Configuration configuration, CheckPoint checkPoint) {
            super((short) 0, (short) 0, (Codec) null, channelFactory, (byte[]) null, new AtomicReference(new ClientTopology(-1, configuration.clientIntelligence())), 0, configuration, DataFormat.builder().build(), (TelemetryService) null);
            this.counter = new AtomicInteger(0);
            this.checkPoint = checkPoint;
        }

        public void acceptResponse(ByteBuf byteBuf, short s, HeaderDecoder headerDecoder) {
            complete(null);
        }

        protected void executeOperation(Channel channel) {
            int andIncrement = this.counter.getAndIncrement();
            this.checkPoint.trigger("before-schedule-read-" + andIncrement);
            try {
                this.checkPoint.awaitStrict("proceed-schedule-read-" + andIncrement, 10L, TimeUnit.SECONDS);
                scheduleRead(channel);
            } catch (Exception e) {
                completeExceptionally(e);
            }
            complete(null);
        }

        public void assertThatExecutedOnlyOnce() {
            Assertions.assertThat(this.counter.get()).withFailMessage("Operation executed more than once!", new Object[0]).isOne();
        }
    }

    /* loaded from: input_file:org/infinispan/client/hotrod/impl/transport/netty/CloseBeforeEnqueuingTest$CustomChannelFactory.class */
    private static class CustomChannelFactory extends ChannelFactory {
        private final Configuration configuration;
        private Supplier<Boolean> executeInstead;
        private Consumer<Channel> beforeInvoke;

        public CustomChannelFactory(Configuration configuration) {
            super(configuration, new CodecHolder(configuration.version().getCodec()));
            this.configuration = configuration;
            this.executeInstead = null;
        }

        public void setExecuteInstead(Supplier<Boolean> supplier) {
            this.executeInstead = supplier;
        }

        public void setBeforeInvoke(Consumer<Channel> consumer) {
            this.beforeInvoke = consumer;
        }

        protected ChannelPool createChannelPool(Bootstrap bootstrap, ChannelInitializer channelInitializer, SocketAddress socketAddress) {
            int maxActive = this.configuration.connectionPool().maxActive();
            if (maxActive < 0) {
                maxActive = Integer.MAX_VALUE;
            }
            return new ChannelPool(bootstrap.config().group().next(), socketAddress, channelInitializer, this.configuration.connectionPool().exhaustedAction(), this::onConnectionEvent, this.configuration.connectionPool().maxWait(), maxActive, this.configuration.connectionPool().maxPendingRequests(), RemoteCacheManagerMetricsRegistry.DISABLED) { // from class: org.infinispan.client.hotrod.impl.transport.netty.CloseBeforeEnqueuingTest.CustomChannelFactory.1
                boolean executeDirectlyIfPossible(ChannelOperation channelOperation, boolean z) {
                    if (CustomChannelFactory.this.executeInstead == null || CustomChannelFactory.this.executeInstead.get().booleanValue()) {
                        return super.executeDirectlyIfPossible(channelOperation, z);
                    }
                    return false;
                }

                boolean invokeCallback(Channel channel, ChannelOperation channelOperation, boolean z) {
                    if (CustomChannelFactory.this.beforeInvoke != null) {
                        CustomChannelFactory.this.beforeInvoke.accept(channel);
                    }
                    return super.invokeCallback(channel, channelOperation, z);
                }
            };
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/infinispan/client/hotrod/impl/transport/netty/CloseBeforeEnqueuingTest$NoopRetryingOperation.class */
    public static class NoopRetryingOperation extends RetryOnFailureOperation<Void> {
        private final AtomicReference<Channel> channelRef;
        private final CountDownLatch firstOp;
        private final int id;
        static final /* synthetic */ boolean $assertionsDisabled;

        protected NoopRetryingOperation(int i, ChannelFactory channelFactory, Configuration configuration, AtomicReference<Channel> atomicReference, CountDownLatch countDownLatch) {
            super((short) 0, (short) 0, (Codec) null, channelFactory, (byte[]) null, new AtomicReference(new ClientTopology(-1, configuration.clientIntelligence())), 0, configuration, DataFormat.builder().build(), (TelemetryService) null);
            this.channelRef = atomicReference;
            this.firstOp = countDownLatch;
            this.id = i;
        }

        public void acceptResponse(ByteBuf byteBuf, short s, HeaderDecoder headerDecoder) {
            complete(null);
        }

        protected void executeOperation(Channel channel) {
            if (!this.channelRef.compareAndSet(null, channel)) {
                complete(null);
                return;
            }
            try {
                scheduleRead(channel);
                this.firstOp.await();
            } catch (InterruptedException e) {
                completeExceptionally(e);
            }
            if (!$assertionsDisabled && !isDone()) {
                throw new AssertionError("Should be done");
            }
        }

        public String toString() {
            return "id = " + this.id;
        }

        static {
            $assertionsDisabled = !CloseBeforeEnqueuingTest.class.desiredAssertionStatus();
        }
    }

    @Override // org.infinispan.client.hotrod.retry.AbstractRetryTest
    protected ConfigurationBuilder getCacheConfig() {
        ConfigurationBuilder hotRodCacheConfiguration = HotRodTestingUtil.hotRodCacheConfiguration(getDefaultClusteredCacheConfig(CacheMode.DIST_SYNC, false));
        hotRodCacheConfiguration.clustering().hash().numOwners(1);
        return hotRodCacheConfiguration;
    }

    @Override // org.infinispan.client.hotrod.retry.AbstractRetryTest
    protected RemoteCacheManager createRemoteCacheManager(int i) {
        org.infinispan.client.hotrod.configuration.ConfigurationBuilder newRemoteConfigurationBuilder = HotRodClientTestingUtil.newRemoteConfigurationBuilder();
        amendRemoteCacheManagerConfiguration(newRemoteConfigurationBuilder);
        newRemoteConfigurationBuilder.forceReturnValues(true).connectionTimeout(5).connectionPool().maxActive(1).addServer().host("127.0.0.1").port(i);
        Configuration build = newRemoteConfigurationBuilder.build();
        InternalRemoteCacheManager internalRemoteCacheManager = new InternalRemoteCacheManager(build, new CustomChannelFactory(build));
        internalRemoteCacheManager.start();
        return internalRemoteCacheManager;
    }

    public void testSubmittingOnClosedPool() {
        ChannelFactory channelFactory = this.remoteCacheManager.getChannelFactory();
        InetSocketAddress createUnresolved = InetSocketAddress.createUnresolved(this.hotRodServer1.getHost(), this.hotRodServer1.getPort().intValue());
        CountDownLatch countDownLatch = new CountDownLatch(1);
        AtomicReference atomicReference = new AtomicReference();
        NoopRetryingOperation noopRetryingOperation = new NoopRetryingOperation(0, channelFactory, this.remoteCacheManager.getConfiguration(), atomicReference, countDownLatch);
        fork(() -> {
            return channelFactory.fetchChannelAndInvoke(createUnresolved, noopRetryingOperation);
        });
        eventually(() -> {
            return atomicReference.get() != null;
        });
        Channel channel = (Channel) atomicReference.get();
        ChannelPool pool = channelFactory.getPool(createUnresolved);
        countDownLatch.countDown();
        ChannelRecord.of(channel).release(channel);
        pool.close();
        this.remoteCache.put(getKeyForServer(createUnresolved, channelFactory.getConsistentHash(RemoteCacheManager.cacheNameBytes())), "something");
    }

    public void testClosingPoolDuringExecution() {
        ChannelFactory channelFactory = this.remoteCacheManager.getChannelFactory();
        AssertJUnit.assertTrue(channelFactory instanceof CustomChannelFactory);
        HotRodOperation<Void> hotRodOperation = new HotRodOperation<Void>((short) 0, (short) 0, null, 0, null, null, null, channelFactory, null) { // from class: org.infinispan.client.hotrod.impl.transport.netty.CloseBeforeEnqueuingTest.1
            private final CompletableFuture<Void> cf = new CompletableFuture<>();

            public CompletableFuture<Void> execute() {
                return this.cf;
            }

            public void acceptResponse(ByteBuf byteBuf, short s, HeaderDecoder headerDecoder) {
                this.cf.complete(null);
            }
        };
        AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        ((CustomChannelFactory) channelFactory).setBeforeInvoke(channel -> {
            if (atomicBoolean.getAndSet(false)) {
                HeaderDecoder headerDecoder = channel.pipeline().get(HeaderDecoder.class);
                headerDecoder.registerOperation(channel, hotRodOperation);
                ChannelPool pool = channelFactory.getPool(ChannelRecord.of(channel).getUnresolvedAddress());
                ChannelRecord.of(channel).release(channel);
                pool.close();
                Objects.requireNonNull(headerDecoder);
                eventually(headerDecoder::isClosing);
                Assertions.assertThat(channel.pipeline().get("header-decoder")).isNotNull();
                Assertions.assertThat(channel.isOpen()).isTrue();
            }
        });
        this.remoteCache.put("something", "something");
        hotRodOperation.acceptResponse((ByteBuf) null, (short) 0, (HeaderDecoder) null);
    }

    private String getKeyForServer(SocketAddress socketAddress, ConsistentHash consistentHash) {
        String uuid;
        int i = 0;
        do {
            int i2 = i;
            i++;
            if (i2 >= 100) {
                throw new IllegalStateException("Unable to find key for: " + String.valueOf(socketAddress));
            }
            uuid = UUID.randomUUID().toString();
        } while (!consistentHash.getServer(HotRodTestingUtil.marshall(uuid)).equals(socketAddress));
        return uuid;
    }

    public void testClosingAndEnqueueing() throws Exception {
        ChannelFactory channelFactory = this.remoteCacheManager.getChannelFactory();
        InetSocketAddress createUnresolved = InetSocketAddress.createUnresolved(this.hotRodServer1.getHost(), this.hotRodServer1.getPort().intValue());
        CountDownLatch countDownLatch = new CountDownLatch(1);
        AtomicReference atomicReference = new AtomicReference();
        NoopRetryingOperation noopRetryingOperation = new NoopRetryingOperation(0, channelFactory, this.remoteCacheManager.getConfiguration(), atomicReference, countDownLatch);
        fork(() -> {
            return channelFactory.fetchChannelAndInvoke(createUnresolved, noopRetryingOperation);
        });
        eventually(() -> {
            return atomicReference.get() != null;
        });
        Channel channel = (Channel) atomicReference.get();
        AssertJUnit.assertTrue(channelFactory instanceof CustomChannelFactory);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        ((CustomChannelFactory) channelFactory).setExecuteInstead(() -> {
            HotRodClientTestingUtil.killServers(this.hotRodServer1);
            eventually(() -> {
                return !channel.isActive();
            });
            eventually(() -> {
                return channelFactory.getNumActive(createUnresolved) == 0;
            });
            return Boolean.valueOf(!atomicBoolean.compareAndSet(false, true));
        });
        countDownLatch.countDown();
        NoopRetryingOperation noopRetryingOperation2 = new NoopRetryingOperation(0, channelFactory, this.remoteCacheManager.getConfiguration(), atomicReference, null);
        fork(() -> {
            return channelFactory.fetchChannelAndInvoke(createUnresolved, noopRetryingOperation2);
        });
        noopRetryingOperation2.get(10L, TimeUnit.SECONDS);
    }

    public void testEnqueueAndReleasing() throws Exception {
        ChannelFactory channelFactory = this.remoteCacheManager.getChannelFactory();
        InetSocketAddress createUnresolved = InetSocketAddress.createUnresolved(this.hotRodServer1.getHost(), this.hotRodServer1.getPort().intValue());
        CompletableFuture completableFuture = new CompletableFuture();
        fork(() -> {
            return (AcquireChannelOperation) channelFactory.fetchChannelAndInvoke(createUnresolved, new AcquireChannelOperation(completableFuture));
        });
        Channel channel = (Channel) completableFuture.get(10L, TimeUnit.SECONDS);
        CheckPoint checkPoint = new CheckPoint();
        ControlledChannelOperation controlledChannelOperation = new ControlledChannelOperation(channelFactory, this.remoteCacheManager.getConfiguration(), checkPoint);
        AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        ((CustomChannelFactory) channelFactory).setExecuteInstead(() -> {
            if (atomicBoolean.get()) {
                return Boolean.valueOf(!atomicBoolean.getAndSet(false));
            }
            fork(() -> {
                ChannelRecord.of(channel).release(channel);
            });
            try {
                checkPoint.trigger("before_execute_operation");
                checkPoint.awaitStrict("invoke_execute_operation", 10L, TimeUnit.SECONDS);
            } catch (Exception e) {
            }
            return true;
        });
        Future fork = fork(() -> {
            return channelFactory.fetchChannelAndInvoke(createUnresolved, controlledChannelOperation);
        });
        checkPoint.awaitStrict("before-schedule-read-0", 10L, TimeUnit.SECONDS);
        checkPoint.awaitStrict("before_execute_operation", 10L, TimeUnit.SECONDS);
        checkPoint.trigger("proceed-schedule-read-0");
        Objects.requireNonNull(controlledChannelOperation);
        eventually(controlledChannelOperation::isDone);
        controlledChannelOperation.assertThatExecutedOnlyOnce();
        checkPoint.trigger("invoke_execute_operation");
        controlledChannelOperation.assertThatExecutedOnlyOnce();
        Objects.requireNonNull(fork);
        eventually(fork::isDone);
        Assertions.assertThat(controlledChannelOperation.isDone()).isTrue();
        controlledChannelOperation.assertThatExecutedOnlyOnce();
        Assertions.assertThat(channelFactory.getNumIdle(createUnresolved)).isOne();
    }
}
