package io.vertx.rabbitmq.tests;

import com.rabbitmq.client.AMQP;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.internal.logging.Logger;
import io.vertx.core.internal.logging.LoggerFactory;
import io.vertx.ext.unit.TestContext;
import io.vertx.rabbitmq.QueueOptions;
import io.vertx.rabbitmq.RabbitMQClient;
import io.vertx.rabbitmq.RabbitMQConsumer;
import io.vertx.rabbitmq.RabbitMQOptions;
import io.vertx.rabbitmq.RabbitMQPublisher;
import io.vertx.rabbitmq.RabbitMQPublisherOptions;
import java.io.IOException;
import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.ClassRule;
import org.junit.Test;
import org.testcontainers.containers.FixedHostPortGenericContainer;
import org.testcontainers.containers.GenericContainer;

/* loaded from: input_file:io/vertx/rabbitmq/tests/RabbitMQClientTransientQueueTest.class */
public class RabbitMQClientTransientQueueTest extends RabbitMQClientTestBase {
    private static final String EXCHANGE_NAME = "RabbitMQClientTransientQueueTest";
    private String queueName;
    private static final Logger logger = LoggerFactory.getLogger(RabbitMQClientTransientQueueTest.class);

    @ClassRule
    public static final GenericContainer fixedRabbitmq = new FixedHostPortGenericContainer("rabbitmq:3.7").withCreateContainerCmdModifier(createContainerCmd -> {
        createContainerCmd.withHostName("bouncing-rabbit2");
    }).withFixedExposedPort(getFreePort(), 5672);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/vertx/rabbitmq/tests/RabbitMQClientTransientQueueTest$Gap.class */
    public static class Gap {
        public final int from;
        public final int to;

        public Gap(int i, int i2) {
            this.from = i;
            this.to = i2;
        }

        public String toString() {
            return "[" + this.from + "-" + this.to + "]";
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/vertx/rabbitmq/tests/RabbitMQClientTransientQueueTest$MessageDefinition.class */
    public static class MessageDefinition {
        final int i;
        final String messageId;
        final String messageBody;

        public MessageDefinition(int i, String str, String str2) {
            this.i = i;
            this.messageId = str;
            this.messageBody = str2;
        }
    }

    private static final int getFreePort() {
        try {
            ServerSocket serverSocket = new ServerSocket(0);
            try {
                int localPort = serverSocket.getLocalPort();
                serverSocket.close();
                return localPort;
            } finally {
            }
        } catch (IOException e) {
            return -1;
        }
    }

    @Override // io.vertx.rabbitmq.tests.RabbitMQClientTestBase
    public RabbitMQOptions config() throws Exception {
        RabbitMQOptions config = super.config();
        config.setUri("amqp://" + fixedRabbitmq.getContainerIpAddress() + ":" + fixedRabbitmq.getMappedPort(5672));
        config.setAutomaticRecoveryEnabled(false);
        config.setReconnectAttempts(Integer.MAX_VALUE);
        config.setReconnectInterval(500L);
        return config;
    }

    @Test
    public void testStopEmpty(TestContext testContext) throws Throwable {
        this.client = RabbitMQClient.create(this.vertx, config());
        RabbitMQPublisher create = RabbitMQPublisher.create(this.vertx, this.client, new RabbitMQPublisherOptions().setReconnectAttempts(Integer.MAX_VALUE).setReconnectInterval(100L).setMaxInternalQueueSize(Integer.MAX_VALUE));
        CompletableFuture completableFuture = new CompletableFuture();
        create.start().onComplete(asyncResult -> {
            completableFuture.complete(null);
        });
        completableFuture.get();
        CompletableFuture completableFuture2 = new CompletableFuture();
        create.stop().onComplete(asyncResult2 -> {
            completableFuture2.complete(null);
        });
        completableFuture2.get();
    }

    @Test
    public void testPublishOverReconnect(TestContext testContext) throws Throwable {
        ArrayList<MessageDefinition> arrayList;
        ArrayList<MessageDefinition> arrayList2;
        int i = 1000;
        HashMap hashMap = new HashMap(1000);
        for (int i2 = 0; i2 < 1000; i2++) {
            String num = Integer.toString(i2);
            hashMap.put(num, new MessageDefinition(i2, num, "Message " + i2));
        }
        HashMap hashMap2 = new HashMap(1000);
        CompletableFuture completableFuture = new CompletableFuture();
        CompletableFuture completableFuture2 = new CompletableFuture();
        AtomicReference atomicReference = new AtomicReference();
        RabbitMQClient create = RabbitMQClient.create(this.vertx, config());
        AtomicLong atomicLong = new AtomicLong();
        prepareClient(create, promise -> {
            create.exchangeDeclare(EXCHANGE_NAME, "fanout", true, false).onComplete(asyncResult -> {
                if (asyncResult.succeeded()) {
                    create.queueDeclare("", false, true, true).onComplete(asyncResult -> {
                        if (!asyncResult.succeeded()) {
                            promise.fail(asyncResult.cause());
                            return;
                        }
                        this.queueName = ((AMQP.Queue.DeclareOk) asyncResult.result()).getQueue();
                        RabbitMQConsumer rabbitMQConsumer = (RabbitMQConsumer) atomicReference.get();
                        if (rabbitMQConsumer != null) {
                            rabbitMQConsumer.setQueueName(this.queueName);
                        }
                        create.queueBind(this.queueName, EXCHANGE_NAME, "").onComplete(asyncResult -> {
                            if (asyncResult.succeeded()) {
                                promise.complete();
                            } else {
                                promise.fail(asyncResult.cause());
                            }
                        });
                    });
                } else {
                    promise.fail(asyncResult.cause());
                }
            });
        }, completableFuture3 -> {
            create.basicConsumer(this.queueName, new QueueOptions().setAutoAck(false)).onComplete(asyncResult -> {
                if (!asyncResult.succeeded()) {
                    completableFuture3.completeExceptionally(asyncResult.cause());
                    return;
                }
                completableFuture2.complete(null);
                atomicReference.set((RabbitMQConsumer) asyncResult.result());
                ((RabbitMQConsumer) asyncResult.result()).handler(rabbitMQMessage -> {
                    synchronized (hashMap) {
                        if (hashMap2.put(rabbitMQMessage.properties().getMessageId(), rabbitMQMessage) != null) {
                            atomicLong.incrementAndGet();
                        }
                        create.basicAck(rabbitMQMessage.envelope().getDeliveryTag(), false);
                        if (Integer.toString(i - 1).equals(rabbitMQMessage.properties().getMessageId())) {
                            logger.info("Have received \"" + rabbitMQMessage.body().toString() + "\" the last message, with " + atomicLong.get() + " duplicates");
                            completableFuture.complete(null);
                        }
                    }
                });
                completableFuture3.complete(null);
            });
        });
        completableFuture2.get(2L, TimeUnit.MINUTES);
        logger.info("Consumer started, preparing producer");
        this.client = RabbitMQClient.create(this.vertx, config());
        prepareClient(this.client, promise2 -> {
            this.client.exchangeDeclare(EXCHANGE_NAME, "fanout", true, false).onComplete(asyncResult -> {
                if (asyncResult.succeeded()) {
                    promise2.complete();
                } else {
                    promise2.fail(asyncResult.cause());
                }
            });
        }, null);
        this.vertx.setTimer(200L, l -> {
            this.vertx.executeBlocking(() -> {
                logger.info("Stopping rabbitmq container");
                fixedRabbitmq.stop();
                logger.info("Starting rabbitmq container");
                fixedRabbitmq.start();
                logger.info("Started rabbitmq container");
                return null;
            });
        });
        RabbitMQPublisher create2 = RabbitMQPublisher.create(this.vertx, this.client, new RabbitMQPublisherOptions().setReconnectAttempts(Integer.MAX_VALUE).setReconnectInterval(100L).setMaxInternalQueueSize(Integer.MAX_VALUE));
        create2.start().onComplete(asyncResult -> {
            create2.getConfirmationStream().handler(rabbitMQPublisherConfirmation -> {
                synchronized (hashMap) {
                    hashMap.remove(rabbitMQPublisherConfirmation.getMessageId());
                }
            });
        });
        synchronized (hashMap) {
            arrayList = new ArrayList(hashMap.values());
            Collections.sort(arrayList, (messageDefinition, messageDefinition2) -> {
                return messageDefinition.messageId.compareTo(messageDefinition2.messageId);
            });
        }
        for (MessageDefinition messageDefinition3 : arrayList) {
            Thread.sleep(4L);
            create2.publish(EXCHANGE_NAME, "", new AMQP.BasicProperties.Builder().messageId(messageDefinition3.messageId).build(), Buffer.buffer(messageDefinition3.messageBody));
        }
        logger.info("Still got " + create2.queueSize() + " messages in the send queue, waiting for that to clear");
        CompletableFuture completableFuture4 = new CompletableFuture();
        create2.stop().onComplete(asyncResult2 -> {
            if (asyncResult2.succeeded()) {
                completableFuture4.complete(null);
            } else {
                completableFuture4.completeExceptionally(asyncResult2.cause());
            }
        });
        completableFuture4.get();
        create2.restart();
        synchronized (hashMap) {
            logger.info("After the publisher has sent everything there remain " + hashMap.size() + " messages unconfirmed (" + hashMap.keySet() + ")");
            arrayList2 = new ArrayList(hashMap.values());
            Collections.sort(arrayList2, (messageDefinition4, messageDefinition5) -> {
                return messageDefinition4.messageId.compareTo(messageDefinition5.messageId);
            });
        }
        for (MessageDefinition messageDefinition6 : arrayList2) {
            Thread.sleep(4L);
            create2.publish(EXCHANGE_NAME, "", new AMQP.BasicProperties.Builder().messageId(messageDefinition6.messageId).build(), Buffer.buffer(messageDefinition6.messageBody));
        }
        logger.info("Waiting up to 20s for the last message to be received (noting that it may not be)");
        try {
            completableFuture.get(20L, TimeUnit.SECONDS);
            logger.info("Latched, shutting down");
        } catch (TimeoutException e) {
            logger.info("Shutting down after waiting for last message that isn't going to arrive");
        }
        analyzeReceivedMessages(new ArrayList(hashMap2.keySet()));
        logger.info("Shutting down");
        ((RabbitMQConsumer) atomicReference.get()).cancel().onComplete(asyncResult3 -> {
            logger.info("Consumer cancelled");
            create.stop().onComplete(asyncResult3 -> {
                logger.info("Consumer client stopped");
                this.client.stop().onComplete(asyncResult3 -> {
                    logger.info("Producer client stopped");
                    testContext.async().complete();
                });
            });
        });
    }

    static void prepareClient(RabbitMQClient rabbitMQClient, Handler<Promise<Void>> handler, Handler<CompletableFuture<Void>> handler2) throws InterruptedException, Exception, TimeoutException, ExecutionException {
        if (handler != null) {
            rabbitMQClient.addConnectionEstablishedCallback(handler);
        }
        CompletableFuture completableFuture = new CompletableFuture();
        rabbitMQClient.start().onComplete(asyncResult -> {
            if (!asyncResult.succeeded()) {
                completableFuture.completeExceptionally(asyncResult.cause());
            } else if (handler2 != null) {
                handler2.handle(completableFuture);
            } else {
                completableFuture.complete(null);
            }
        });
        completableFuture.get(100L, TimeUnit.SECONDS);
    }

    private void analyzeReceivedMessages(List<String> list) {
        int[] array = list.stream().mapToInt(str -> {
            return Integer.parseInt(str);
        }).sorted().toArray();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < array.length - 1; i++) {
            if (array[i] != array[i + 1] - 1) {
                arrayList.add(new Gap(array[i], array[i + 1]));
            }
        }
        logger.info("Received messages had " + arrayList.size() + " gaps (expected one): " + arrayList);
    }
}
