package io.vertx.rabbitmq.impl;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import io.vertx.rabbitmq.RabbitMQBrokerProvider;
import io.vertx.rabbitmq.RabbitMQClient;
import io.vertx.rabbitmq.RabbitMQConnection;
import io.vertx.rabbitmq.RabbitMQConsumer;
import io.vertx.rabbitmq.RabbitMQConsumerOptions;
import io.vertx.rabbitmq.RabbitMQMessage;
import io.vertx.rabbitmq.RabbitMQOptions;
import io.vertx.rabbitmq.RabbitMQPublisher;
import io.vertx.rabbitmq.RabbitMQPublisherOptions;
import io.vertx.rabbitmq.impl.codecs.RabbitMQLongMessageCodec;
import java.io.IOException;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;

@RunWith(VertxUnitRunner.class)
/* loaded from: input_file:io/vertx/rabbitmq/impl/RabbitMQFuturePublisherImplTest.class */
public class RabbitMQFuturePublisherImplTest {
    private static final boolean DEFAULT_RABBITMQ_EXCHANGE_DURABLE = true;
    private static final boolean DEFAULT_RABBITMQ_EXCHANGE_AUTO_DELETE = false;
    private static final boolean DEFAULT_RABBITMQ_QUEUE_DURABLE = true;
    private static final boolean DEFAULT_RABBITMQ_QUEUE_EXCLUSIVE = false;
    private static final boolean DEFAULT_RABBITMQ_QUEUE_AUTO_DELETE = false;
    private final Vertx vertx;
    private RabbitMQConnection connection;
    private RabbitMQPublisher<Long> publisher;
    private RabbitMQConsumer consumer;
    private static final Logger logger = LoggerFactory.getLogger(RabbitMQFuturePublisherImplTest.class);
    private static final BuiltinExchangeType DEFAULT_RABBITMQ_EXCHANGE_TYPE = BuiltinExchangeType.FANOUT;
    private static final GenericContainer CONTAINER = RabbitMQBrokerProvider.getRabbitMqContainer();
    private final String TEST_EXCHANGE = getClass().getName() + "Exchange";
    private final String TEST_QUEUE = getClass().getName() + "Queue";
    private final Set<Long> receivedMessages = new HashSet();
    private final Promise<Long> allMessagesSent = Promise.promise();
    private final Promise<Long> allMessagesReceived = Promise.promise();

    public RabbitMQFuturePublisherImplTest() throws IOException {
        logger.info("Constructing");
        this.vertx = Vertx.vertx(new VertxOptions().setWorkerPoolSize(6));
    }

    @BeforeClass
    public static void startup() {
        CONTAINER.start();
    }

    @AfterClass
    public static void shutdown() {
        CONTAINER.stop();
    }

    private RabbitMQOptions getRabbitMQOptions() {
        RabbitMQOptions rabbitMQOptions = new RabbitMQOptions();
        rabbitMQOptions.setHost("localhost");
        rabbitMQOptions.setPort(CONTAINER.getMappedPort(5672).intValue());
        rabbitMQOptions.setConnectionTimeout(500);
        rabbitMQOptions.setNetworkRecoveryInterval(500L);
        rabbitMQOptions.setRequestedHeartbeat(60);
        rabbitMQOptions.setConnectionName(getClass().getSimpleName());
        return rabbitMQOptions;
    }

    @Before
    public void setup(TestContext testContext) throws Exception {
        RabbitMQClient.connect(this.vertx, getRabbitMQOptions()).onSuccess(rabbitMQConnection -> {
            this.connection = rabbitMQConnection;
        }).onComplete(testContext.asyncAssertSuccess());
    }

    @Test(timeout = 300000)
    public void testRecoverConnectionOutage(TestContext testContext) throws Exception {
        this.allMessagesSent.future().onSuccess(l -> {
            synchronized (this.receivedMessages) {
                if (this.receivedMessages.size() == l.longValue()) {
                    this.allMessagesReceived.tryComplete();
                }
            }
        });
        createConsumer().compose(r3 -> {
            return createPublisher();
        }).compose(r32 -> {
            return sendMessages();
        }).compose(r33 -> {
            return this.allMessagesSent.future();
        }).compose(l2 -> {
            return this.allMessagesReceived.future();
        }).compose(l3 -> {
            return this.publisher.cancel();
        }).compose(r34 -> {
            return this.consumer.cancel();
        }).compose(r35 -> {
            return this.connection.close();
        }).onComplete(testContext.asyncAssertSuccess());
    }

    private Future<Void> createPublisher() {
        return this.connection.createChannelBuilder().withChannelOpenHandler(channel -> {
            channel.exchangeDeclare(this.TEST_EXCHANGE, DEFAULT_RABBITMQ_EXCHANGE_TYPE, true, false, (Map) null);
        }).createPublisher(this.TEST_EXCHANGE, new RabbitMQLongMessageCodec(), new RabbitMQPublisherOptions()).onSuccess(rabbitMQPublisher -> {
            this.publisher = rabbitMQPublisher;
        }).mapEmpty();
    }

    private Future<Void> sendMessages() {
        AtomicLong atomicLong = new AtomicLong();
        AtomicLong atomicLong2 = new AtomicLong(20L);
        AtomicLong atomicLong3 = new AtomicLong();
        atomicLong3.set(this.vertx.setPeriodic(100L, l -> {
            long incrementAndGet = atomicLong.incrementAndGet();
            logger.info("Publishing message {}", Long.valueOf(incrementAndGet));
            this.publisher.publish("", new AMQP.BasicProperties(), Long.valueOf(incrementAndGet)).onComplete(asyncResult -> {
                if (asyncResult.succeeded()) {
                    logger.info("Published message {}", Long.valueOf(incrementAndGet));
                } else {
                    logger.warn("Failed to publish message {}: ", Long.valueOf(incrementAndGet), asyncResult.cause());
                }
            });
            if (atomicLong2.decrementAndGet() == 0) {
                this.vertx.cancelTimer(atomicLong3.get());
                logger.info("All messages sent: {}", Long.valueOf(atomicLong.get()));
                this.allMessagesSent.complete(Long.valueOf(atomicLong.get()));
            }
        }));
        return Future.succeededFuture();
    }

    private Future<Void> messageHandler(RabbitMQConsumer rabbitMQConsumer, RabbitMQMessage<Long> rabbitMQMessage) {
        Long l = (Long) rabbitMQMessage.body();
        synchronized (this.receivedMessages) {
            this.receivedMessages.add(l);
            logger.info("Received message: {} (have {})", l, Integer.valueOf(this.receivedMessages.size()));
            Future future = this.allMessagesSent.future();
            if (future.isComplete() && this.receivedMessages.size() == ((Long) future.result()).longValue()) {
                logger.info("All messages sents: {}", Integer.valueOf(this.receivedMessages.size()));
                this.allMessagesReceived.tryComplete();
            }
        }
        return rabbitMQMessage.basicAck();
    }

    private Future<Void> createConsumer() {
        return this.connection.createChannelBuilder().withChannelOpenHandler(channel -> {
            channel.exchangeDeclare(this.TEST_EXCHANGE, DEFAULT_RABBITMQ_EXCHANGE_TYPE, true, false, (Map) null);
            channel.queueDeclare(this.TEST_QUEUE, true, false, false, (Map) null);
            channel.queueBind(this.TEST_QUEUE, this.TEST_EXCHANGE, "", (Map) null);
        }).createConsumer(new RabbitMQLongMessageCodec(), this.TEST_QUEUE, (Supplier) null, new RabbitMQConsumerOptions(), this::messageHandler).onSuccess(rabbitMQConsumer -> {
            this.consumer = rabbitMQConsumer;
        }).mapEmpty();
    }
}
