package io.vertx.core.shareddata;

import io.vertx.Lifecycle;
import io.vertx.LoggingTestWatcher;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonObject;
import io.vertx.core.spi.cluster.ClusterManager;
import io.vertx.core.streams.ReadStream;
import io.vertx.ext.cluster.infinispan.InfinispanAsyncMap;
import io.vertx.ext.cluster.infinispan.InfinispanClusterManager;
import io.vertx.tests.shareddata.ClusteredAsyncMapTest;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Function;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:io/vertx/core/shareddata/InfinispanClusteredAsyncMapTest.class */
public class InfinispanClusteredAsyncMapTest extends ClusteredAsyncMapTest {

    @Rule
    public LoggingTestWatcher watchman = new LoggingTestWatcher();

    @Rule
    public TemporaryFolder temporaryFolder = new TemporaryFolder();

    public void setUp() throws Exception {
        System.setProperty("jgroups.file.location", this.temporaryFolder.newFolder().getAbsolutePath());
        super.setUp();
    }

    protected Future<Vertx> clusteredVertx(VertxOptions vertxOptions, ClusterManager clusterManager) {
        Future<Vertx> clusteredVertx = super.clusteredVertx(vertxOptions, clusterManager);
        try {
            clusteredVertx.await(2L, TimeUnit.MINUTES);
        } catch (Exception e) {
            fail(e.getMessage());
        }
        return clusteredVertx;
    }

    protected ClusterManager getClusterManager() {
        return new InfinispanClusterManager();
    }

    @Test
    public void testKeyStream() {
        testReadStream((v0) -> {
            return v0.keyStream();
        }, (map, list) -> {
            assertEquals(map.size(), list.size());
            assertTrue(list.containsAll(map.keySet()));
        });
    }

    @Test
    public void testValueStream() {
        testReadStream((v0) -> {
            return v0.valueStream();
        }, (map, list) -> {
            assertEquals(map.size(), list.size());
            assertTrue(list.containsAll(map.values()));
            assertTrue(map.values().containsAll(list));
        });
    }

    @Test
    public void testEntryStream() {
        testReadStream((v0) -> {
            return v0.entryStream();
        }, (map, list) -> {
            assertEquals(map.size(), list.size());
            assertTrue(list.containsAll(map.entrySet()));
        });
    }

    private <T> void testReadStream(Function<InfinispanAsyncMap<JsonObject, Buffer>, ReadStream<T>> function, BiConsumer<Map<JsonObject, Buffer>, List<T>> biConsumer) {
        Map genJsonToBuffer = genJsonToBuffer(100);
        loadData(genJsonToBuffer, (vertx, asyncMap) -> {
            ArrayList arrayList = new ArrayList();
            ReadStream readStream = (ReadStream) function.apply(InfinispanAsyncMap.unwrap(asyncMap));
            AtomicInteger atomicInteger = new AtomicInteger();
            long j = 500;
            long nanoTime = System.nanoTime();
            readStream.endHandler(r16 -> {
                biConsumer.accept(genJsonToBuffer, arrayList);
                assertTrue(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime) >= 3 * j);
                testComplete();
            }).exceptionHandler(th -> {
                fail(th);
            }).handler(obj -> {
                arrayList.add(obj);
                int andIncrement = atomicInteger.getAndIncrement();
                if (andIncrement == 3 || andIncrement == 16 || andIncrement == 38) {
                    readStream.pause();
                    int size = arrayList.size();
                    vertx.setTimer(j, l -> {
                        assertTrue("Items emitted during pause", size == arrayList.size());
                        readStream.resume();
                    });
                }
            });
        });
        await();
    }

    @Test
    public void testClosedKeyStream() {
        loadData(genJsonToBuffer(100), (vertx, asyncMap) -> {
            ArrayList arrayList = new ArrayList();
            ReadStream keyStream = InfinispanAsyncMap.unwrap(asyncMap).keyStream();
            keyStream.exceptionHandler(th -> {
                fail(th);
            }).handler(jsonObject -> {
                arrayList.add(jsonObject);
                if (jsonObject.getInteger("key").intValue() == 38) {
                    keyStream.handler((Handler) null);
                    int size = arrayList.size();
                    vertx.setTimer(500L, l -> {
                        assertTrue("Items emitted after close", size == arrayList.size());
                        testComplete();
                    });
                }
            });
        });
        await();
    }

    protected void close(List<Vertx> list) throws Exception {
        Lifecycle.close(list);
    }
}
