package io.vertx.rabbitmq.tests;

import com.rabbitmq.client.AMQP;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.internal.logging.Logger;
import io.vertx.core.internal.logging.LoggerFactory;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import io.vertx.rabbitmq.QueueOptions;
import io.vertx.rabbitmq.RabbitMQClient;
import io.vertx.rabbitmq.RabbitMQConsumer;
import io.vertx.rabbitmq.RabbitMQOptions;
import io.vertx.rabbitmq.RabbitMQPublisher;
import io.vertx.rabbitmq.RabbitMQPublisherOptions;
import java.io.IOException;
import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.testcontainers.containers.FixedHostPortGenericContainer;
import org.testcontainers.containers.GenericContainer;

@RunWith(VertxUnitRunner.class)
/* loaded from: input_file:io/vertx/rabbitmq/tests/RabbitMQClientPublisherTest.class */
public class RabbitMQClientPublisherTest {
    private String exchangeName;
    private String queueName;
    protected RabbitMQClient client;
    Vertx vertx;
    private static final Logger logger = LoggerFactory.getLogger(RabbitMQClientPublisherTest.class);

    @ClassRule
    public static final GenericContainer fixedRabbitmq = new FixedHostPortGenericContainer("rabbitmq:3.7-management").withCreateContainerCmdModifier(createContainerCmd -> {
        createContainerCmd.withHostName("bouncing-rabbit");
    }).withFixedExposedPort(getFreePort(), 5672);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/vertx/rabbitmq/tests/RabbitMQClientPublisherTest$MessageDefinition.class */
    public static class MessageDefinition {
        final int i;
        final String messageId;
        final String messageBody;

        public MessageDefinition(int i, String str, String str2) {
            this.i = i;
            this.messageId = str;
            this.messageBody = str2;
        }
    }

    private static int getFreePort() {
        try {
            ServerSocket serverSocket = new ServerSocket(0);
            try {
                int localPort = serverSocket.getLocalPort();
                serverSocket.close();
                return localPort;
            } finally {
            }
        } catch (IOException e) {
            return -1;
        }
    }

    @Before
    public void setUp() {
        this.vertx = Vertx.vertx();
        this.exchangeName = getClass().getSimpleName() + "Exchange-" + UUID.randomUUID();
        this.queueName = getClass().getSimpleName() + "Queue-" + UUID.randomUUID();
    }

    public RabbitMQOptions config() throws Exception {
        RabbitMQOptions rabbitMQOptions = new RabbitMQOptions();
        rabbitMQOptions.setUri("amqp://" + fixedRabbitmq.getContainerIpAddress() + ":" + fixedRabbitmq.getMappedPort(5672));
        rabbitMQOptions.setAutomaticRecoveryEnabled(true);
        rabbitMQOptions.setReconnectAttempts(Integer.MAX_VALUE);
        rabbitMQOptions.setReconnectInterval(500L);
        return rabbitMQOptions;
    }

    @Test
    public void testStopEmpty(TestContext testContext) throws Throwable {
        this.client = RabbitMQClient.create(this.vertx, config());
        RabbitMQPublisher create = RabbitMQPublisher.create(this.vertx, this.client, new RabbitMQPublisherOptions().setReconnectAttempts(Integer.MAX_VALUE).setReconnectInterval(100L).setMaxInternalQueueSize(Integer.MAX_VALUE));
        this.client.start().toCompletionStage().toCompletableFuture().get();
        create.start().toCompletionStage().toCompletableFuture().get();
        create.stop().toCompletionStage().toCompletableFuture().get();
    }

    @Test
    public void testPublishConfirm(TestContext testContext) throws Throwable {
        int i = 1000;
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap(1000);
        for (int i2 = 0; i2 < 1000; i2++) {
            String format = String.format("NewID-%05d", Integer.valueOf(i2));
            concurrentHashMap.put(format, new MessageDefinition(i2, format, "Message " + i2));
        }
        HashMap hashMap = new HashMap(1000);
        Async async = testContext.async(1000);
        AtomicReference atomicReference = new AtomicReference();
        RabbitMQClient create = RabbitMQClient.create(this.vertx, config());
        AtomicLong atomicLong = new AtomicLong();
        prepareClient(create, promise -> {
            Future compose = create.queueDeclare(this.queueName, true, false, false).compose(declareOk -> {
                return create.exchangeDeclare(this.exchangeName, "fanout", true, false);
            }).compose(r7 -> {
                return create.queueBind(this.queueName, this.exchangeName, "");
            });
            Objects.requireNonNull(promise);
            Future onFailure = compose.onFailure(promise::fail);
            Objects.requireNonNull(promise);
            onFailure.onSuccess((v1) -> {
                r1.complete(v1);
            });
        }, completableFuture -> {
            create.basicConsumer(this.queueName, new QueueOptions().setAutoAck(false)).onComplete(asyncResult -> {
                if (!asyncResult.succeeded()) {
                    completableFuture.completeExceptionally(asyncResult.cause());
                    return;
                }
                atomicReference.set((RabbitMQConsumer) asyncResult.result());
                ((RabbitMQConsumer) asyncResult.result()).handler(rabbitMQMessage -> {
                    synchronized (concurrentHashMap) {
                        if (hashMap.put(rabbitMQMessage.properties().getMessageId(), rabbitMQMessage) != null) {
                            atomicLong.incrementAndGet();
                        }
                        create.basicAck(rabbitMQMessage.envelope().getDeliveryTag(), false);
                        if (hashMap.size() == i) {
                            logger.info("Have received " + hashMap.size() + " messages with " + atomicLong.get() + " duplicates");
                            async.complete();
                        }
                    }
                });
                completableFuture.complete(null);
            });
        });
        this.client = RabbitMQClient.create(this.vertx, config());
        prepareClient(this.client, promise2 -> {
            Future compose = this.client.queueDeclare(this.queueName, true, false, false).compose(declareOk -> {
                return this.client.exchangeDeclare(this.exchangeName, "fanout", true, false);
            }).compose(r6 -> {
                return this.client.queueBind(this.queueName, this.exchangeName, "");
            });
            Objects.requireNonNull(promise2);
            Future onFailure = compose.onFailure(promise2::fail);
            Objects.requireNonNull(promise2);
            onFailure.onSuccess((v1) -> {
                r1.complete(v1);
            });
        }, null);
        Async async2 = testContext.async(concurrentHashMap.size());
        RabbitMQPublisher create2 = RabbitMQPublisher.create(this.vertx, this.client, new RabbitMQPublisherOptions().setReconnectAttempts(Integer.MAX_VALUE).setReconnectInterval(100L).setMaxInternalQueueSize(Integer.MAX_VALUE));
        create2.start().toCompletionStage().toCompletableFuture().get();
        for (MessageDefinition messageDefinition : concurrentHashMap.values()) {
            Thread.sleep(1L);
            create2.publishConfirm(this.exchangeName, "", new AMQP.BasicProperties.Builder().messageId(messageDefinition.messageId).build(), Buffer.buffer(messageDefinition.messageBody)).onComplete(asyncResult -> {
                async2.countDown();
            });
        }
        logger.info("Still got " + create2.queueSize() + " messages in the send queue, waiting for that to clear");
        logger.info("Waiting up to 20s for the latch");
        async.await(20000L);
        async2.await(20000L);
        logger.info("Latched, shutting down");
        logger.info("Shutting down");
        ((RabbitMQConsumer) atomicReference.get()).cancel().compose(r5 -> {
            return create.exchangeDelete(this.exchangeName);
        }).compose(r52 -> {
            return create.queueDelete(this.queueName);
        }).compose(deleteOk -> {
            return create.stop();
        }).onComplete(asyncResult2 -> {
            testContext.async().complete();
        });
    }

    @Test
    public void testPublishOverReconnect(TestContext testContext) throws Throwable {
        ArrayList<MessageDefinition> arrayList;
        ArrayList<MessageDefinition> arrayList2;
        int i = 1000;
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap(1000);
        for (int i2 = 0; i2 < 1000; i2++) {
            String format = String.format("NewID-%05d", Integer.valueOf(i2));
            concurrentHashMap.put(format, new MessageDefinition(i2, format, "Message " + i2));
        }
        ConcurrentHashMap concurrentHashMap2 = new ConcurrentHashMap(1000);
        Async async = testContext.async(1000);
        AtomicReference atomicReference = new AtomicReference();
        RabbitMQClient create = RabbitMQClient.create(this.vertx, config());
        AtomicLong atomicLong = new AtomicLong();
        prepareClient(create, promise -> {
            Future compose = create.queueDeclare(this.queueName, true, false, false).compose(declareOk -> {
                return create.exchangeDeclare(this.exchangeName, "fanout", true, false);
            }).compose(r7 -> {
                return create.queueBind(this.queueName, this.exchangeName, "");
            });
            Objects.requireNonNull(promise);
            Future onFailure = compose.onFailure(promise::fail);
            Objects.requireNonNull(promise);
            onFailure.onSuccess((v1) -> {
                r1.complete(v1);
            });
        }, completableFuture -> {
            create.basicConsumer(this.queueName, new QueueOptions().setAutoAck(false)).onComplete(asyncResult -> {
                if (!asyncResult.succeeded()) {
                    completableFuture.completeExceptionally(asyncResult.cause());
                    return;
                }
                atomicReference.set((RabbitMQConsumer) asyncResult.result());
                ((RabbitMQConsumer) asyncResult.result()).handler(rabbitMQMessage -> {
                    if (concurrentHashMap2.put(rabbitMQMessage.properties().getMessageId(), rabbitMQMessage) != null) {
                        atomicLong.incrementAndGet();
                    }
                    create.basicAck(rabbitMQMessage.envelope().getDeliveryTag(), false);
                    if (concurrentHashMap2.size() == i) {
                        logger.info("Have received " + concurrentHashMap2.size() + " messages with " + atomicLong.get() + " duplicates");
                        async.complete();
                    }
                });
                completableFuture.complete(null);
            });
        });
        this.client = RabbitMQClient.create(this.vertx, config());
        prepareClient(this.client, promise2 -> {
            Future compose = this.client.queueDeclare(this.queueName, true, false, false).compose(declareOk -> {
                return this.client.exchangeDeclare(this.exchangeName, "fanout", true, false);
            }).compose(r6 -> {
                return this.client.queueBind(this.queueName, this.exchangeName, "");
            });
            Objects.requireNonNull(promise2);
            Future onFailure = compose.onFailure(promise2::fail);
            Objects.requireNonNull(promise2);
            onFailure.onSuccess((v1) -> {
                r1.complete(v1);
            });
        }, null);
        this.vertx.setTimer(100L, l -> {
            this.vertx.executeBlocking(() -> {
                logger.info("Stopping rabbitmq container");
                fixedRabbitmq.stop();
                logger.info("Starting rabbitmq container");
                fixedRabbitmq.start();
                logger.info("Started rabbitmq container");
                return null;
            });
        });
        RabbitMQPublisher create2 = RabbitMQPublisher.create(this.vertx, this.client, new RabbitMQPublisherOptions().setReconnectAttempts(Integer.MAX_VALUE).setReconnectInterval(100L).setMaxInternalQueueSize(Integer.MAX_VALUE));
        create2.start().toCompletionStage().toCompletableFuture().get();
        Async async2 = testContext.async(concurrentHashMap.size());
        synchronized (concurrentHashMap) {
            arrayList = new ArrayList(concurrentHashMap.values());
            arrayList.sort(Comparator.comparing(messageDefinition -> {
                return messageDefinition.messageId;
            }));
        }
        for (MessageDefinition messageDefinition2 : arrayList) {
            Thread.sleep(1L);
            create2.publishConfirm(this.exchangeName, "", new AMQP.BasicProperties.Builder().messageId(messageDefinition2.messageId).build(), Buffer.buffer(messageDefinition2.messageBody)).onSuccess(l2 -> {
                concurrentHashMap.remove(messageDefinition2.messageId);
                async2.countDown();
            });
        }
        logger.info("Still got " + create2.queueSize() + " messages in the send queue, waiting for that to clear");
        create2.stop().toCompletionStage().toCompletableFuture().get();
        create2.restart();
        synchronized (concurrentHashMap) {
            logger.info("After the publisher has sent everything there remain " + concurrentHashMap.size() + " messages unconfirmed");
            arrayList2 = new ArrayList(concurrentHashMap.values());
            arrayList2.sort(Comparator.comparing(messageDefinition3 -> {
                return messageDefinition3.messageId;
            }));
        }
        for (MessageDefinition messageDefinition4 : arrayList2) {
            Thread.sleep(1L);
            create2.publishConfirm(this.exchangeName, "", new AMQP.BasicProperties.Builder().messageId(messageDefinition4.messageId).build(), Buffer.buffer(messageDefinition4.messageBody)).onSuccess(l3 -> {
                concurrentHashMap.remove(messageDefinition4.messageId);
                async2.countDown();
            });
        }
        logger.info("Waiting up to 20s for the latch");
        async2.await(20000L);
        async.await(20000L);
        logger.info("Latched, shutting down");
        logger.info("Shutting down");
        ((RabbitMQConsumer) atomicReference.get()).cancel().onComplete(asyncResult -> {
            logger.info("Consumer cancelled");
            create.stop().onComplete(asyncResult -> {
                logger.info("Consumer client stopped");
                this.client.stop().onComplete(asyncResult -> {
                    logger.info("Producer client stopped");
                    testContext.async().complete();
                });
            });
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void prepareClient(RabbitMQClient rabbitMQClient, Handler<Promise<Void>> handler, Handler<CompletableFuture<Void>> handler2) throws InterruptedException, Exception, TimeoutException, ExecutionException {
        if (handler != null) {
            rabbitMQClient.addConnectionEstablishedCallback(handler);
        }
        CompletableFuture completableFuture = new CompletableFuture();
        rabbitMQClient.start().onComplete(asyncResult -> {
            if (!asyncResult.succeeded()) {
                completableFuture.completeExceptionally(asyncResult.cause());
            } else if (handler2 != null) {
                handler2.handle(completableFuture);
            } else {
                completableFuture.complete(null);
            }
        });
        completableFuture.get(100L, TimeUnit.SECONDS);
    }
}
