package io.vertx.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Envelope;
import io.vertx.core.Promise;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.RunTestOnContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import io.vertx.rabbitmq.impl.RabbitMQConnectionImpl;
import java.io.IOException;
import java.net.ServerSocket;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.FixedHostPortGenericContainer;
import org.testcontainers.containers.GenericContainer;

@RunWith(VertxUnitRunner.class)
/* loaded from: input_file:io/vertx/rabbitmq/RabbitMQClientTest.class */
public class RabbitMQClientTest {
    private static final Logger logger = LoggerFactory.getLogger(RabbitMQClientTest.class);
    private static final GenericContainer CONTAINER = RabbitMQBrokerProvider.getRabbitMqContainer();

    @Rule
    public RunTestOnContext testRunContext = new RunTestOnContext();

    /* loaded from: input_file:io/vertx/rabbitmq/RabbitMQClientTest$TestConsumer.class */
    private static final class TestConsumer extends DefaultConsumer {
        private final TestContext testContext;
        private final Promise promise;

        public TestConsumer(RabbitMQChannel rabbitMQChannel, TestContext testContext, Promise promise) {
            super(rabbitMQChannel);
            this.testContext = testContext;
            this.promise = promise;
        }

        public void handleDelivery(String str, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
            RabbitMQClientTest.logger.info("Message received");
            this.testContext.assertEquals("text/plain", basicProperties.getContentType());
            this.testContext.assertEquals((Object) null, basicProperties.getContentEncoding());
            this.testContext.assertEquals("Hello", new String(bArr, StandardCharsets.UTF_8));
            this.promise.complete();
        }
    }

    @BeforeClass
    public static void startup() {
        CONTAINER.start();
    }

    @AfterClass
    public static void shutdown() {
        CONTAINER.stop();
    }

    @Test
    public void testCreateWithWorkingServer(TestContext testContext) {
        RabbitMQOptions config = config();
        RabbitMQConnection[] rabbitMQConnectionArr = new RabbitMQConnection[1];
        Async async = testContext.async();
        RabbitMQClient.connect(this.testRunContext.vertx(), config).compose(rabbitMQConnection -> {
            rabbitMQConnectionArr[0] = rabbitMQConnection;
            return rabbitMQConnectionArr[0].createChannelBuilder().openChannel();
        }).onComplete(asyncResult -> {
            if (asyncResult.succeeded()) {
                logger.info("Completing test");
                rabbitMQConnectionArr[0].close().onComplete(asyncResult -> {
                    async.complete();
                });
            } else {
                logger.info("Failing test");
                testContext.fail(asyncResult.cause());
            }
        });
        logger.info("Ending test");
    }

    @Test
    public void testSendMessageWithWorkingServer(TestContext testContext) {
        logger.debug("testSendMessageWithWorkingServer");
        RabbitMQOptions config = config();
        RabbitMQConnection[] rabbitMQConnectionArr = new RabbitMQConnection[1];
        Async async = testContext.async();
        RabbitMQChannel[] rabbitMQChannelArr = new RabbitMQChannel[1];
        RabbitMQChannel[] rabbitMQChannelArr2 = new RabbitMQChannel[1];
        String str = "testSendMessageWithWorkingServer";
        String str2 = "testSendMessageWithWorkingServerQueue";
        Promise promise = Promise.promise();
        RabbitMQClient.connect(this.testRunContext.vertx(), config).compose(rabbitMQConnection -> {
            rabbitMQConnectionArr[0] = rabbitMQConnection;
            return rabbitMQConnectionArr[0].createChannelBuilder().openChannel();
        }).compose(rabbitMQChannel -> {
            rabbitMQChannelArr[0] = rabbitMQChannel;
            return rabbitMQConnectionArr[0].createChannelBuilder().openChannel();
        }).compose(rabbitMQChannel2 -> {
            rabbitMQChannelArr2[0] = rabbitMQChannel2;
            return rabbitMQChannelArr[0].getManagementChannel().exchangeDeclare(str, BuiltinExchangeType.FANOUT, true, false, (Map) null);
        }).compose(r9 -> {
            return rabbitMQChannelArr[0].getManagementChannel().queueDeclare(str2, true, false, true, (Map) null);
        }).compose(str3 -> {
            return rabbitMQChannelArr[0].getManagementChannel().queueBind(str2, str, "", (Map) null);
        }).compose(r18 -> {
            return rabbitMQChannelArr[0].basicConsume(str2, true, getClass().getSimpleName(), false, false, (Map) null, new TestConsumer(rabbitMQChannelArr[0], testContext, promise));
        }).compose(str4 -> {
            return rabbitMQChannelArr2[0].getManagementChannel().exchangeDeclare(str, BuiltinExchangeType.FANOUT, true, false, (Map) null);
        }).compose(r4 -> {
            return rabbitMQChannelArr2[0].getManagementChannel().confirmSelect();
        }).compose(r10 -> {
            return rabbitMQChannelArr2[0].basicPublish(new RabbitMQPublishOptions(), str, "", true, new AMQP.BasicProperties(), "Hello");
        }).compose(r5 -> {
            return rabbitMQChannelArr2[0].waitForConfirms(1000L);
        }).compose(r3 -> {
            return promise.future();
        }).onComplete(asyncResult -> {
            if (asyncResult.succeeded()) {
                rabbitMQConnectionArr[0].close().onComplete(asyncResult -> {
                    logger.info("Failing testSendMessageWithWorkingServer: ", asyncResult.cause());
                    async.complete();
                });
            } else {
                logger.info("Failing testSendMessageWithWorkingServer: ", asyncResult.cause());
                testContext.fail(asyncResult.cause());
            }
        });
        logger.info("Ending testSendMessageWithWorkingServer");
    }

    private int findOpenPort() throws IOException {
        ServerSocket serverSocket = new ServerSocket(0);
        try {
            int localPort = serverSocket.getLocalPort();
            serverSocket.close();
            return localPort;
        } catch (Throwable th) {
            try {
                serverSocket.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testCreateWithServerThatArrivesLate(TestContext testContext) throws IOException {
        logger.debug("testCreateWithServerThatArrivesLate");
        RabbitMQOptions rabbitMQOptions = new RabbitMQOptions();
        rabbitMQOptions.setReconnectInterval(1000);
        rabbitMQOptions.setInitialConnectAttempts(50);
        int findOpenPort = findOpenPort();
        FixedHostPortGenericContainer withFixedExposedPort = new FixedHostPortGenericContainer(RabbitMQBrokerProvider.IMAGE_NAME).withFixedExposedPort(findOpenPort, 5672);
        logger.info("Exposed port: {}", Integer.valueOf(findOpenPort));
        rabbitMQOptions.setUri("amqp://" + withFixedExposedPort.getHost() + ":" + findOpenPort);
        RabbitMQConnectionImpl[] rabbitMQConnectionImplArr = new RabbitMQConnectionImpl[1];
        long currentTimeMillis = System.currentTimeMillis();
        Async async = testContext.async();
        this.testRunContext.vertx().setTimer(1000L, l -> {
            withFixedExposedPort.start();
            logger.debug("Newly exposed port: {}", withFixedExposedPort.getMappedPort(5672));
        });
        RabbitMQClient.connect(this.testRunContext.vertx(), rabbitMQOptions).compose(rabbitMQConnection -> {
            rabbitMQConnectionImplArr[0] = (RabbitMQConnectionImpl) rabbitMQConnection;
            return rabbitMQConnectionImplArr[0].createChannelBuilder().openChannel();
        }).onComplete(asyncResult -> {
            if (!asyncResult.succeeded()) {
                logger.info("Failing testCreateWithServerThatArrivesLate");
                testContext.fail(asyncResult.cause());
                return;
            }
            long currentTimeMillis2 = System.currentTimeMillis();
            logger.info("Completing testCreateWithServerThatArrivesLate with reconnect count = {} (expected 1 < {} < {})", new Object[]{Integer.valueOf(rabbitMQConnectionImplArr[0].getReconnectCount()), Integer.valueOf(rabbitMQConnectionImplArr[0].getReconnectCount()), Long.valueOf(((2000 + currentTimeMillis2) - currentTimeMillis) / rabbitMQOptions.getReconnectInterval())});
            testContext.assertTrue(rabbitMQConnectionImplArr[0].getReconnectCount() > 1);
            testContext.assertTrue(((long) rabbitMQConnectionImplArr[0].getReconnectCount()) < ((2000 + currentTimeMillis2) - currentTimeMillis) / ((long) rabbitMQOptions.getReconnectInterval()));
            rabbitMQConnectionImplArr[0].close().onComplete(asyncResult -> {
                async.complete();
                withFixedExposedPort.stop();
            });
        });
        logger.info("Ending testCreateWithServerThatArrivesLate");
    }

    public RabbitMQOptions config() {
        RabbitMQOptions rabbitMQOptions = new RabbitMQOptions();
        rabbitMQOptions.setUri("amqp://" + CONTAINER.getHost() + ":" + CONTAINER.getMappedPort(5672));
        rabbitMQOptions.setConnectionName(getClass().getSimpleName());
        return rabbitMQOptions;
    }
}
