package io.vertx.rabbitmq;

import com.rabbitmq.client.AMQP;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.ext.unit.TestContext;
import io.vertx.rabbitmq.RabbitMQClientPublisherTest;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Test;

/* loaded from: input_file:io/vertx/rabbitmq/RabbitMQClientConsumerCancelTest.class */
public class RabbitMQClientConsumerCancelTest extends RabbitMQClientTestBase {
    private static final Logger logger = LoggerFactory.getLogger(RabbitMQClientConsumerCancelTest.class);
    private static final String EXCHANGE_NAME = "RabbitMQClientConsumerCancelTest";
    private static final String QUEUE_NAME = "RabbitMQClientConsumerCancelTestQueue";

    @Test
    public void testConsumerShutdown(TestContext testContext) throws Throwable {
        ArrayList<RabbitMQClientPublisherTest.MessageDefinition> arrayList;
        int i = 1000;
        HashMap hashMap = new HashMap(1000);
        for (int i2 = 0; i2 < 1000; i2++) {
            String str = "ID-" + i2;
            hashMap.put(str, new RabbitMQClientPublisherTest.MessageDefinition(i2, str, "Message " + i2));
        }
        HashMap hashMap2 = new HashMap(1000);
        CompletableFuture completableFuture = new CompletableFuture();
        CompletableFuture completableFuture2 = new CompletableFuture();
        AtomicReference atomicReference = new AtomicReference();
        RabbitMQClient create = RabbitMQClient.create(this.vertx, config());
        RabbitMQClientPublisherTest.prepareClient(create, promise -> {
            create.queueDeclare(QUEUE_NAME, true, false, false).onComplete(asyncResult -> {
                if (asyncResult.succeeded()) {
                    create.exchangeDeclare(EXCHANGE_NAME, "fanout", true, false).onComplete(asyncResult -> {
                        if (asyncResult.succeeded()) {
                            create.queueBind(QUEUE_NAME, EXCHANGE_NAME, "", asyncResult -> {
                                if (asyncResult.succeeded()) {
                                    promise.complete();
                                } else {
                                    promise.fail(asyncResult.cause());
                                }
                            });
                        } else {
                            promise.fail(asyncResult.cause());
                        }
                    });
                } else {
                    promise.fail(asyncResult.cause());
                }
            });
        }, completableFuture3 -> {
            create.basicConsumer(QUEUE_NAME, new QueueOptions().setAutoAck(false), asyncResult -> {
                if (!asyncResult.succeeded()) {
                    completableFuture3.completeExceptionally(asyncResult.cause());
                    return;
                }
                completableFuture2.complete(null);
                atomicReference.set((RabbitMQConsumer) asyncResult.result());
                ((RabbitMQConsumer) asyncResult.result()).handler(rabbitMQMessage -> {
                    synchronized (hashMap) {
                        hashMap2.put(rabbitMQMessage.properties().getMessageId(), rabbitMQMessage);
                        create.basicAck(rabbitMQMessage.envelope().getDeliveryTag(), false);
                        if (hashMap2.size() > i / 2) {
                            ((RabbitMQConsumer) atomicReference.get()).cancel();
                            completableFuture.complete(null);
                        }
                    }
                });
                completableFuture3.complete(null);
            });
        });
        completableFuture2.get(2L, TimeUnit.MINUTES);
        this.client = RabbitMQClient.create(this.vertx, config());
        RabbitMQClientPublisherTest.prepareClient(this.client, promise2 -> {
            this.client.queueDeclare(QUEUE_NAME, true, false, false).onComplete(asyncResult -> {
                if (asyncResult.succeeded()) {
                    this.client.exchangeDeclare(EXCHANGE_NAME, "fanout", true, false).onComplete(asyncResult -> {
                        if (asyncResult.succeeded()) {
                            this.client.queueBind(QUEUE_NAME, EXCHANGE_NAME, "", asyncResult -> {
                                if (asyncResult.succeeded()) {
                                    promise2.complete();
                                } else {
                                    promise2.fail(asyncResult.cause());
                                }
                            });
                        } else {
                            promise2.fail(asyncResult.cause());
                        }
                    });
                } else {
                    promise2.fail(asyncResult.cause());
                }
            });
        }, null);
        CompletableFuture completableFuture4 = new CompletableFuture();
        RabbitMQPublisher create2 = RabbitMQPublisher.create(this.vertx, this.client, new RabbitMQPublisherOptions().setReconnectAttempts(Integer.MAX_VALUE).setReconnectInterval(100L).setMaxInternalQueueSize(Integer.MAX_VALUE));
        create2.start(asyncResult -> {
            create2.getConfirmationStream().handler(rabbitMQPublisherConfirmation -> {
                synchronized (hashMap) {
                    hashMap.remove(rabbitMQPublisherConfirmation.getMessageId());
                    if (hashMap.isEmpty()) {
                        completableFuture4.complete(null);
                    }
                }
            });
        });
        synchronized (hashMap) {
            arrayList = new ArrayList(hashMap.values());
            arrayList.sort(Comparator.comparing(messageDefinition -> {
                return messageDefinition.messageId;
            }));
        }
        for (RabbitMQClientPublisherTest.MessageDefinition messageDefinition2 : arrayList) {
            Thread.sleep(1L);
            create2.publish(EXCHANGE_NAME, "", new AMQP.BasicProperties.Builder().messageId(messageDefinition2.messageId).build(), Buffer.buffer(messageDefinition2.messageBody), (Handler) null);
        }
        completableFuture4.get(1L, TimeUnit.MINUTES);
        logger.info("Publisher has sent everything and got confirmations for all of them");
        completableFuture.get(1L, TimeUnit.MINUTES);
        testContext.assertTrue(hashMap2.size() >= 1000 / 2, "Have received " + hashMap2.size() + " messages, which is fewer than the expected " + (1000 / 2));
        testContext.assertTrue(hashMap2.size() <= (1000 / 2) + 10, "Have received " + hashMap2.size() + " messages, which is too many more than the expected " + (1000 / 2));
        create.stop(asyncResult2 -> {
            logger.info("Consumer client stopped");
            this.client.stop(asyncResult2 -> {
                logger.info("Producer client stopped");
                testContext.async().complete();
            });
        });
    }
}
