package org.infinispan.server.resp.pubsub;

import io.lettuce.core.api.sync.RedisCommands;
import io.lettuce.core.pubsub.RedisPubSubAdapter;
import io.lettuce.core.pubsub.api.sync.RedisPubSubCommands;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.assertj.core.api.Assertions;
import org.infinispan.server.resp.SingleNodeRespBaseTest;
import org.infinispan.server.resp.test.RespTestingUtil;
import org.infinispan.test.TestingUtil;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = {"functional"}, testName = "dist.server.resp.PublishSubscribeTest")
/* loaded from: input_file:org/infinispan/server/resp/pubsub/PublishSubscribeTest.class */
public class PublishSubscribeTest extends SingleNodeRespBaseTest {
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "booleans")
    protected Object[][] booleans() {
        return new Object[]{new Object[]{true}, new Object[]{false}};
    }

    public void testPubSubChannels() throws Exception {
        RedisCommands sync = this.redisConnection.sync();
        Assertions.assertThat(sync.pubsubChannels()).isEmpty();
        RedisPubSubCommands<String, String> createPubSubConnection = createPubSubConnection();
        BlockingQueue<String> addPubSubListener = addPubSubListener(createPubSubConnection);
        createPubSubConnection.subscribe(new String[]{"channel-1", "channel-2"});
        assertSubscription(addPubSubListener, "channel-1", "channel-2");
        Assertions.assertThat(sync.pubsubChannels()).hasSize(2).containsExactlyInAnyOrder(new String[]{"channel-1", "channel-2"});
        createPubSubConnection.unsubscribe(new String[]{"channel-1"});
        Assertions.assertThat(sync.pubsubChannels()).hasSize(1).containsExactly(new String[]{"channel-2"});
        createPubSubConnection.unsubscribe(new String[]{"channel-2"});
        Assertions.assertThat(sync.pubsubChannels()).isEmpty();
    }

    public void testPubSubChannelsFiltering() throws Exception {
        RedisCommands sync = this.redisConnection.sync();
        Assertions.assertThat(sync.pubsubChannels()).isEmpty();
        RedisPubSubCommands<String, String> createPubSubConnection = createPubSubConnection();
        BlockingQueue<String> addPubSubListener = addPubSubListener(createPubSubConnection);
        createPubSubConnection.subscribe(new String[]{"tx-channel", "kv-channel-1", "kv-channel-2"});
        assertSubscription(addPubSubListener, "tx-channel", "kv-channel-1", "kv-channel-2");
        Assertions.assertThat(sync.pubsubChannels("*")).hasSize(3).containsExactlyInAnyOrder(new String[]{"tx-channel", "kv-channel-1", "kv-channel-2"});
        Assertions.assertThat(sync.pubsubChannels("tx-*")).hasSize(1).containsExactlyInAnyOrder(new String[]{"tx-channel"});
        Assertions.assertThat(sync.pubsubChannels("kv-*")).hasSize(2).containsExactlyInAnyOrder(new String[]{"kv-channel-1", "kv-channel-2"});
        Assertions.assertThat(sync.pubsubChannels("*-channel*")).hasSize(3).containsExactlyInAnyOrder(new String[]{"tx-channel", "kv-channel-1", "kv-channel-2"});
        createPubSubConnection.unsubscribe(new String[]{"tx-channel", "kv-channel-1", "kv-channel-2"});
    }

    public void testPubSubMultipleClientsSameChannel() throws Exception {
        RedisCommands sync = this.redisConnection.sync();
        Assertions.assertThat(sync.pubsubChannels()).isEmpty();
        RedisPubSubCommands<String, String> createPubSubConnection = createPubSubConnection();
        BlockingQueue<String> addPubSubListener = addPubSubListener(createPubSubConnection);
        RedisPubSubCommands<String, String> createPubSubConnection2 = createPubSubConnection();
        BlockingQueue<String> addPubSubListener2 = addPubSubListener(createPubSubConnection2);
        createPubSubConnection.subscribe(new String[]{"default-channel"});
        createPubSubConnection2.subscribe(new String[]{"default-channel"});
        assertSubscription(addPubSubListener, "default-channel");
        assertSubscription(addPubSubListener2, "default-channel");
        Assertions.assertThat(sync.pubsubChannels()).hasSize(1).containsExactly(new String[]{"default-channel"});
        createPubSubConnection.unsubscribe(new String[]{"default-channel"});
        assertUnsubscribe(addPubSubListener, "default-channel");
        Assertions.assertThat(sync.pubsubChannels()).hasSize(1).containsExactly(new String[]{"default-channel"});
        createPubSubConnection2.unsubscribe(new String[]{"default-channel"});
        assertUnsubscribe(addPubSubListener2, "default-channel");
        Assertions.assertThat(sync.pubsubChannels()).isEmpty();
    }

    public void testDifferentConnectionsCounting() throws Exception {
        RedisCommands sync = this.redisConnection.sync();
        RedisCommands sync2 = this.redisConnection.sync();
        Assertions.assertThat(sync.pubsubChannels()).isEmpty();
        Assertions.assertThat(sync2.pubsubChannels()).isEmpty();
        RedisPubSubCommands<String, String> createPubSubConnection = createPubSubConnection();
        BlockingQueue<String> addPubSubListener = addPubSubListener(createPubSubConnection);
        createPubSubConnection.subscribe(new String[]{"global-channel"});
        assertSubscription(addPubSubListener, "global-channel");
        Assertions.assertThat(sync.pubsubChannels()).hasSize(1).containsExactly(new String[]{"global-channel"});
        Assertions.assertThat(sync2.pubsubChannels()).hasSize(1).containsExactly(new String[]{"global-channel"});
        createPubSubConnection.unsubscribe(new String[]{"global-channel"});
        assertUnsubscribe(addPubSubListener, "global-channel");
        Assertions.assertThat(sync.pubsubChannels()).isEmpty();
        Assertions.assertThat(sync2.pubsubChannels()).isEmpty();
    }

    @Test(dataProvider = "booleans")
    public void testPubSubUnsubscribe(boolean z) throws InterruptedException {
        int size = TestingUtil.getListeners(this.cache).size();
        RedisPubSubCommands<String, String> createPubSubConnection = createPubSubConnection();
        BlockingQueue<String> addPubSubListener = addPubSubListener(createPubSubConnection);
        createPubSubConnection.subscribe(new String[]{"channel2", "test"});
        assertSubscription(addPubSubListener, "channel2", "test");
        Assertions.assertThat(TestingUtil.getListeners(this.cache)).hasSize(size + 2);
        if (z) {
            createPubSubConnection.getStatefulConnection().close();
            eventually(() -> {
                return TestingUtil.getListeners(this.cache).size() == size;
            });
            Assertions.assertThat(TestingUtil.getListeners(this.cache)).hasSize(size);
            Assertions.assertThat(addPubSubListener).isEmpty();
            return;
        }
        createPubSubConnection.unsubscribe(new String[0]);
        for (int i = 0; i < 2; i++) {
            String poll = addPubSubListener.poll(10L, TimeUnit.SECONDS);
            Assertions.assertThat(poll).withFailMessage("Didn't receive any notifications", new Object[0]).isNotNull();
            if (!poll.startsWith("unsubscribed-channel2-") && !poll.startsWith("unsubscribed-test-") && (!poll.endsWith("0") || !poll.endsWith("1"))) {
                Assertions.fail("Notification doesn't match expected, was: " + poll);
            }
        }
        Assertions.assertThat(TestingUtil.getListeners(this.cache)).hasSize(size);
        Assertions.assertThat(createPubSubConnection.ping()).isEqualTo(RespTestingUtil.PONG);
    }

    @Test
    public void testPubSub() throws InterruptedException {
        RedisPubSubCommands<String, String> createPubSubConnection = createPubSubConnection();
        BlockingQueue<String> addPubSubListener = addPubSubListener(createPubSubConnection);
        createPubSubConnection.subscribe(new String[]{"channel2", "test"});
        assertSubscription(addPubSubListener, "channel2", "test");
        this.redisConnection.sync().publish("channel2", "boomshakayaka");
        Assertions.assertThat(addPubSubListener.poll(10L, TimeUnit.SECONDS)).isEqualTo("message-channel2-boomshakayaka");
        createPubSubConnection.subscribe(new String[]{"channel"});
        Assertions.assertThat(addPubSubListener.poll(10L, TimeUnit.SECONDS)).isEqualTo("subscribed-channel-3");
        createPubSubConnection.unsubscribe(new String[]{"channel2"});
        createPubSubConnection.unsubscribe(new String[]{"doesn't-exist"});
        createPubSubConnection.unsubscribe(new String[]{"channel", "test"});
        int i = 3;
        for (String str : new String[]{"channel2", "doesn't-exist", "channel", "test"}) {
            i--;
            Assertions.assertThat(addPubSubListener.poll(10L, TimeUnit.SECONDS)).isEqualTo("unsubscribed-" + str + "-" + Math.max(0, i));
        }
    }

    @Test
    public void testCountOnlyPatterns() throws Exception {
        RedisCommands sync = this.redisConnection.sync();
        Assertions.assertThat(sync.pubsubNumpat()).isZero();
        RedisPubSubCommands<String, String> createPubSubConnection = createPubSubConnection();
        BlockingQueue<String> addPubSubListener = addPubSubListener(createPubSubConnection);
        createPubSubConnection.subscribe(new String[]{"channel2", "test"});
        assertSubscription(addPubSubListener, "channel2", "test");
        Assertions.assertThat(sync.pubsubChannels()).containsExactlyInAnyOrder(new String[]{"channel2", "test"});
        Assertions.assertThat(sync.pubsubNumpat()).isZero();
        createPubSubConnection.unsubscribe(new String[]{"channel2", "test"});
    }

    protected RedisPubSubCommands<String, String> createPubSubConnection() {
        return this.client.connectPubSub().sync();
    }

    private void assertSubscription(BlockingQueue<String> blockingQueue, String... strArr) throws InterruptedException {
        int i = 1;
        for (String str : strArr) {
            int i2 = i;
            i++;
            Assertions.assertThat(blockingQueue.poll(10L, TimeUnit.SECONDS)).isEqualTo(String.format("subscribed-%s-%d", str, Integer.valueOf(i2)));
        }
    }

    private void assertUnsubscribe(BlockingQueue<String> blockingQueue, String... strArr) throws InterruptedException {
        int length = strArr.length;
        for (String str : strArr) {
            length--;
            Assertions.assertThat(blockingQueue.poll(10L, TimeUnit.SECONDS)).isEqualTo(String.format("unsubscribed-%s-%d", str, Integer.valueOf(length)));
        }
    }

    private BlockingQueue<String> addPubSubListener(RedisPubSubCommands<String, String> redisPubSubCommands) {
        final LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        redisPubSubCommands.getStatefulConnection().addListener(new RedisPubSubAdapter<String, String>() { // from class: org.infinispan.server.resp.pubsub.PublishSubscribeTest.1
            public void message(String str, String str2) {
                PublishSubscribeTest.log.tracef("Received message on channel %s of %s", str, str2);
                linkedBlockingQueue.add("message-" + str + "-" + str2);
            }

            public void subscribed(String str, long j) {
                PublishSubscribeTest.log.tracef("Subscribed to %s with %s", str, Long.valueOf(j));
                linkedBlockingQueue.add("subscribed-" + str + "-" + j);
            }

            public void unsubscribed(String str, long j) {
                PublishSubscribeTest.log.tracef("Unsubscribed to %s with %s", str, Long.valueOf(j));
                linkedBlockingQueue.add("unsubscribed-" + str + "-" + j);
            }
        });
        return linkedBlockingQueue;
    }
}
