package org.infinispan.client.hotrod.impl.iteration;

import io.reactivex.rxjava3.core.Flowable;
import java.net.InetSocketAddress;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.RemoteCacheManager;
import org.infinispan.client.hotrod.test.HotRodClientTestingUtil;
import org.infinispan.commons.configuration.ClassAllowList;
import org.infinispan.commons.util.CloseableIterator;
import org.infinispan.commons.util.Closeables;
import org.infinispan.commons.util.IntSet;
import org.infinispan.commons.util.IntSets;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.server.hotrod.test.HotRodTestingUtil;
import org.infinispan.test.TestingUtil;
import org.mockito.Mockito;
import org.reactivestreams.Publisher;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;

@Test(groups = {"functional"}, testName = "client.hotrod.iteration.MultiServerDistRemoteIteratorTest")
/* loaded from: input_file:org/infinispan/client/hotrod/impl/iteration/MultiServerDistRemoteIteratorTest.class */
public class MultiServerDistRemoteIteratorTest extends BaseMultiServerRemoteIteratorTest {
    private static final int NUM_SERVERS = 3;

    /* loaded from: input_file:org/infinispan/client/hotrod/impl/iteration/MultiServerDistRemoteIteratorTest$TestSegmentKeyTracker.class */
    private static class TestSegmentKeyTracker implements KeyTracker {
        final IntSet finished;

        public TestSegmentKeyTracker(int i) {
            this.finished = IntSets.concurrentSet(i);
        }

        public boolean track(byte[] bArr, short s, ClassAllowList classAllowList) {
            return true;
        }

        public void segmentsFinished(IntSet intSet) {
            this.finished.addAll(intSet);
        }

        public Set<Integer> missedSegments() {
            return null;
        }
    }

    protected void createCacheManagers() throws Throwable {
        createHotRodServers(NUM_SERVERS, getCacheConfiguration());
    }

    private ConfigurationBuilder getCacheConfiguration() {
        ConfigurationBuilder hotRodCacheConfiguration = HotRodTestingUtil.hotRodCacheConfiguration(getDefaultClusteredCacheConfig(CacheMode.DIST_SYNC, false));
        hotRodCacheConfiguration.clustering().hash().numSegments(60).numOwners(2);
        return hotRodCacheConfiguration;
    }

    public void testSegmentFinishedCallback() {
        RemoteCache cache = this.clients.get(0).getCache();
        Util.populateCache(20, (v0) -> {
            return Util.newAccount(v0);
        }, cache);
        TestSegmentKeyTracker testSegmentKeyTracker = new TestSegmentKeyTracker(60);
        Publisher publishEntries = cache.publishEntries((String) null, (Object[]) null, (Set) null, NUM_SERVERS);
        TestingUtil.replaceField(testSegmentKeyTracker, "segmentKeyTracker", publishEntries, RemotePublisher.class);
        CloseableIterator it = Closeables.iterator(publishEntries, NUM_SERVERS);
        while (it.hasNext()) {
            try {
                it.next();
            } catch (Throwable th) {
                if (it != null) {
                    try {
                        it.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
        AssertJUnit.assertEquals(60, testSegmentKeyTracker.finished.size());
        if (it != null) {
            it.close();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.infinispan.client.hotrod.test.MultiHotRodServersTest
    public org.infinispan.client.hotrod.configuration.ConfigurationBuilder createHotRodClientConfigurationBuilder(String str, int i) {
        org.infinispan.client.hotrod.configuration.ConfigurationBuilder newRemoteConfigurationBuilder = HotRodClientTestingUtil.newRemoteConfigurationBuilder();
        newRemoteConfigurationBuilder.addServer().host("localhost").port(i).maxRetries(maxRetries()).balancingStrategy(() -> {
            return new PreferredServerBalancingStrategy(new InetSocketAddress("localhost", i));
        });
        return newRemoteConfigurationBuilder;
    }

    @Test
    public void testIterationRouting() throws Exception {
        for (int i = 0; i < this.clients.size(); i++) {
            int i2 = i;
            RemoteCacheManager client = client(i);
            KeyTracker keyTracker = (KeyTracker) Mockito.mock(KeyTracker.class);
            Mockito.when(Boolean.valueOf(keyTracker.track((byte[]) Mockito.any(), Mockito.anyShort(), (ClassAllowList) Mockito.any()))).then(invocationOnMock -> {
                assertIterationActiveOnlyOnServer(i2);
                return invocationOnMock.callRealMethod();
            });
            Publisher publishEntries = client.getCache().publishEntries((String) null, (Object[]) null, (Set) null, 10);
            TestingUtil.replaceField(keyTracker, "segmentKeyTracker", publishEntries, RemotePublisher.class);
            Flowable.fromPublisher(publishEntries).lastStage((Object) null).toCompletableFuture().get(10L, TimeUnit.SECONDS);
        }
    }

    private void assertIterationActiveOnlyOnServer(int i) {
        for (int i2 = 0; i2 < this.servers.size(); i2++) {
            int activeIterations = server(i2).getIterationManager().activeIterations();
            if (i2 == i) {
                AssertJUnit.assertEquals(1L, activeIterations);
            } else {
                AssertJUnit.assertEquals(0L, activeIterations);
            }
        }
    }
}
