package io.vertx.tests.rabbitmq;

import com.rabbitmq.client.AMQP;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import io.vertx.rabbitmq.QueueOptions;
import io.vertx.test.core.TestUtils;
import java.util.Map;
import java.util.Set;
import org.junit.Test;
import org.junit.runner.RunWith;

@RunWith(VertxUnitRunner.class)
/* loaded from: input_file:io/vertx/tests/rabbitmq/RabbitMQConsumptionStreamingTest.class */
public class RabbitMQConsumptionStreamingTest extends RabbitMQClientTestBase {
    @Override // io.vertx.tests.rabbitmq.RabbitMQClientTestBase
    public void setUp() throws Exception {
        super.setUp();
        connect();
    }

    @Test
    public void consumerTagShouldBeTheSameAsInAMessage(TestContext testContext) throws Exception {
        Set<String> createMessages = createMessages(1);
        String str = setupQueue(testContext, createMessages);
        Async async = testContext.async(1);
        this.client.basicConsumer(str).onComplete(testContext.asyncAssertSuccess(rabbitMQConsumer -> {
            rabbitMQConsumer.handler(rabbitMQMessage -> {
                testContext.assertNotNull(rabbitMQMessage);
                testContext.assertTrue(rabbitMQMessage.consumerTag().equals(rabbitMQConsumer.consumerTag()));
                String buffer = rabbitMQMessage.body().toString();
                testContext.assertNotNull(buffer);
                testContext.assertTrue(createMessages.contains(buffer));
                async.countDown();
            });
        }));
    }

    @Test
    public void pauseAndResumeShouldWork(TestContext testContext) throws Exception {
        String str = setupQueue(testContext, createMessages(1));
        Async async = testContext.async();
        Async async2 = testContext.async();
        Async async3 = testContext.async();
        this.client.basicConsumer(str, new QueueOptions()).onComplete(testContext.asyncAssertSuccess(rabbitMQConsumer -> {
            rabbitMQConsumer.pause();
            rabbitMQConsumer.handler(rabbitMQMessage -> {
                testContext.assertNotNull(rabbitMQMessage);
                if (async2.count() == 1) {
                    testContext.fail();
                } else {
                    async3.complete();
                }
            });
            async.complete();
            async2.await();
            rabbitMQConsumer.resume();
        }));
        async.awaitSuccess(15000L);
        Thread.sleep(1000L);
        async2.complete();
    }

    @Test
    public void endHandlerAndCancelShouldWork(TestContext testContext) throws Exception {
        String randomAlphaString = TestUtils.randomAlphaString(10);
        this.channel.queueDeclare(randomAlphaString, false, false, true, (Map) null);
        Async async = testContext.async();
        Async async2 = testContext.async();
        this.client.basicConsumer(randomAlphaString).onComplete(testContext.asyncAssertSuccess(rabbitMQConsumer -> {
            rabbitMQConsumer.endHandler(r3 -> {
                async2.complete();
            });
            rabbitMQConsumer.handler(rabbitMQMessage -> {
                testContext.fail();
            });
            this.vertx.executeBlocking(() -> {
                rabbitMQConsumer.cancel().onComplete(testContext.asyncAssertSuccess(r32 -> {
                    async.complete();
                }));
                return null;
            });
        }));
        async.awaitSuccess(15000L);
        async2.awaitSuccess(15000L);
        this.channel.basicPublish("", randomAlphaString, new AMQP.BasicProperties.Builder().build(), "whatever".getBytes());
        Async async3 = testContext.async();
        this.vertx.setTimer(1000L, l -> {
            async3.complete();
        });
    }
}
