package org.infinispan.client.hotrod.impl.transport.netty;

import io.netty.channel.Channel;
import java.net.InetSocketAddress;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.assertj.core.api.Assertions;
import org.infinispan.client.hotrod.exceptions.TransportException;
import org.infinispan.client.hotrod.impl.transport.netty.CrashMidOperationTest;
import org.infinispan.client.hotrod.retry.AbstractRetryTest;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.server.hotrod.test.HotRodTestingUtil;
import org.mockito.Mockito;
import org.testng.annotations.Test;

@Test(testName = "client.hotrod.impl.transport.netty.ChannelCloseAndInactiveTest", groups = {"functional"})
/* loaded from: input_file:org/infinispan/client/hotrod/impl/transport/netty/ChannelCloseAndInactiveTest.class */
public class ChannelCloseAndInactiveTest extends AbstractRetryTest {
    public ChannelCloseAndInactiveTest() {
        this.nbrOfServers = 1;
    }

    @Override // org.infinispan.client.hotrod.retry.AbstractRetryTest
    protected ConfigurationBuilder getCacheConfig() {
        ConfigurationBuilder hotRodCacheConfiguration = HotRodTestingUtil.hotRodCacheConfiguration(getDefaultClusteredCacheConfig(CacheMode.DIST_SYNC, false));
        hotRodCacheConfiguration.clustering().hash().numOwners(1);
        return hotRodCacheConfiguration;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.infinispan.client.hotrod.retry.AbstractRetryTest
    public void amendRemoteCacheManagerConfiguration(org.infinispan.client.hotrod.configuration.ConfigurationBuilder configurationBuilder) {
        configurationBuilder.maxRetries(1);
        configurationBuilder.connectionPool().maxActive(2);
    }

    public void testKillAndInactiveDifferentChannelsConcurrently() throws Exception {
        ChannelFactory channelFactory = this.remoteCacheManager.getChannelFactory();
        InetSocketAddress createUnresolved = InetSocketAddress.createUnresolved(this.hotRodServer1.getHost(), this.hotRodServer1.getPort().intValue());
        CountDownLatch countDownLatch = new CountDownLatch(1);
        AtomicReference atomicReference = new AtomicReference();
        AtomicReference atomicReference2 = new AtomicReference();
        CrashMidOperationTest.NoopRetryingOperation noopRetryingOperation = new CrashMidOperationTest.NoopRetryingOperation(0, channelFactory, this.remoteCacheManager.getConfiguration(), atomicReference, countDownLatch);
        fork(() -> {
            return channelFactory.fetchChannelAndInvoke(createUnresolved, noopRetryingOperation);
        });
        eventually(() -> {
            return atomicReference.get() != null;
        });
        Channel channel = (Channel) atomicReference.get();
        HeaderDecoder headerDecoder = channel.pipeline().get("header-decoder");
        eventually(() -> {
            return headerDecoder.registeredOperations() == 1;
        });
        CrashMidOperationTest.NoopRetryingOperation noopRetryingOperation2 = new CrashMidOperationTest.NoopRetryingOperation(1, channelFactory, this.remoteCacheManager.getConfiguration(), atomicReference2, countDownLatch);
        fork(() -> {
            return channelFactory.fetchChannelAndInvoke(createUnresolved, noopRetryingOperation2);
        });
        eventually(() -> {
            return atomicReference2.get() != null;
        });
        Channel channel2 = (Channel) atomicReference2.get();
        Assertions.assertThat(headerDecoder.registeredOperations()).isOne();
        Channel channel3 = (Channel) Mockito.spy(channel2);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        ((Channel) Mockito.doAnswer(invocationOnMock -> {
            countDownLatch2.await(10L, TimeUnit.SECONDS);
            return invocationOnMock.callRealMethod();
        }).when(channel3)).close();
        ChannelPool channelPool = ChannelRecord.of(channel).channelPool();
        Assertions.assertThat(channelPool.getIdle()).isZero();
        Assertions.assertThat(channelPool.getConnected()).isEqualTo(2);
        ChannelRecord.of(channel2).release(channel3);
        Assertions.assertThat(channelPool.getIdle()).isOne();
        Assertions.assertThat(noopRetryingOperation2.isDone()).isFalse();
        Future fork = fork(() -> {
            noopRetryingOperation2.exceptionCaught(channel2, new TransportException("oops", createUnresolved));
        });
        channel.close().awaitUninterruptibly();
        Assertions.assertThat(channel.isActive()).isFalse();
        countDownLatch2.countDown();
        eventually(() -> {
            return channelPool.getConnected() > 0;
        });
        Objects.requireNonNull(fork);
        eventually(fork::isDone);
        fork.get(10L, TimeUnit.SECONDS);
        Objects.requireNonNull(noopRetryingOperation);
        eventually(noopRetryingOperation::isDone);
        Objects.requireNonNull(noopRetryingOperation2);
        eventually(noopRetryingOperation2::isDone);
        countDownLatch.countDown();
        noopRetryingOperation2.get(10L, TimeUnit.SECONDS);
        noopRetryingOperation.get(10L, TimeUnit.SECONDS);
    }
}
