package org.infinispan.server.functional.resp;

import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.junit5.VertxTestContext;
import io.vertx.redis.client.Command;
import io.vertx.redis.client.RedisConnection;
import io.vertx.redis.client.Request;
import java.util.ArrayList;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.infinispan.commons.test.Eventually;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/infinispan/server/functional/resp/RespPubSubTest.class */
public class RespPubSubTest extends AbstractRespTest {
    private static final String CHANNEL = "channel-test";

    @Test
    public void testBasicPubSubOperations(Vertx vertx, VertxTestContext vertxTestContext) throws Exception {
        RedisConnection blockingGetConnection = blockingGetConnection(vertx);
        RedisConnection blockingGetConnection2 = blockingGetConnection(vertx);
        BlockingQueue<Map.Entry<String, String>> subscribe = subscribe(blockingGetConnection2, vertxTestContext);
        Assertions.assertThat(subscribe.poll(10L, TimeUnit.SECONDS)).isEqualTo(Map.entry("subscribe", "1"));
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 10; i++) {
            String str = "content-" + i;
            Future send = blockingGetConnection.send(Request.cmd(Command.PUBLISH).arg(CHANNEL).arg(str));
            Objects.requireNonNull(vertxTestContext);
            Future onFailure = send.onFailure(vertxTestContext::failNow);
            Assertions.assertThat(subscribe.poll(10L, TimeUnit.SECONDS)).satisfies(new ThrowingConsumer[]{entry -> {
                Assertions.assertThat((String) entry.getKey()).isEqualTo("message");
            }}).satisfies(new ThrowingConsumer[]{entry2 -> {
                Assertions.assertThat((String) entry2.getValue()).isEqualTo(str);
            }});
            arrayList.add(onFailure);
        }
        Future send2 = blockingGetConnection2.send(Request.cmd(Command.UNSUBSCRIBE).arg(CHANNEL));
        Objects.requireNonNull(vertxTestContext);
        arrayList.add(send2.onFailure(vertxTestContext::failNow));
        Assertions.assertThat(subscribe.poll(10L, TimeUnit.SECONDS)).satisfies(new ThrowingConsumer[]{entry3 -> {
            Assertions.assertThat((String) entry3.getKey()).isEqualTo("unsubscribe");
        }}).satisfies(new ThrowingConsumer[]{entry4 -> {
            Assertions.assertThat((String) entry4.getValue()).isEqualTo("0");
        }});
        CompositeFuture all = Future.all(arrayList);
        Objects.requireNonNull(vertxTestContext);
        all.onFailure(vertxTestContext::failNow).onComplete(asyncResult -> {
            vertxTestContext.completeNow();
        });
    }

    private BlockingQueue<Map.Entry<String, String>> subscribe(RedisConnection redisConnection, VertxTestContext vertxTestContext) {
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        redisConnection.send(Request.cmd(Command.SUBSCRIBE).arg(CHANNEL), vertxTestContext.succeeding()).handler(response -> {
            linkedBlockingQueue.add(Map.entry(response.get(0).toString(), response.get(2).toString()));
        });
        return linkedBlockingQueue;
    }

    private RedisConnection blockingGetConnection(Vertx vertx) {
        Future connect = createBaseClient(vertx).connect();
        Objects.requireNonNull(connect);
        Eventually.eventually(connect::succeeded);
        return (RedisConnection) connect.result();
    }
}
