package io.vertx.rabbitmq.tests;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.BuiltinExchangeType;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.DecodeException;
import io.vertx.core.json.EncodeException;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import io.vertx.rabbitmq.RabbitMQClient;
import io.vertx.rabbitmq.RabbitMQOptions;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.ToxiproxyContainer;
import org.testcontainers.utility.DockerImageName;

@RunWith(VertxUnitRunner.class)
/* loaded from: input_file:io/vertx/rabbitmq/tests/RabbitMQClientBuiltInReconnectTest.class */
public class RabbitMQClientBuiltInReconnectTest {
    private static final String TEST_EXCHANGE = "RabbitMQClientBuiltInReconnectExchange";
    private static final String TEST_QUEUE = "RabbitMQClientBuiltInReconnectQueue";
    private static final String TEST_MESSAGE = "My Message";
    private static final boolean DEFAULT_RABBITMQ_EXCHANGE_DURABLE = false;
    private static final boolean DEFAULT_RABBITMQ_EXCHANGE_AUTO_DELETE = true;
    private static final boolean DEFAULT_RABBITMQ_QUEUE_DURABLE = false;
    private static final boolean DEFAULT_RABBITMQ_QUEUE_EXCLUSIVE = true;
    private static final boolean DEFAULT_RABBITMQ_QUEUE_AUTO_DELETE = true;
    private final AtomicLong producerTimeReference = new AtomicLong();
    private final AtomicReference<RabbitMQMessageProducer> producerReference = new AtomicReference<>();
    private final AtomicReference<RabbitMQClient> consumerReference = new AtomicReference<>();
    private final AtomicReference<Handler<AsyncResult<String>>> handlerReference = new AtomicReference<>();
    private final ObjectMapper mapper;
    private final Network network;
    private final GenericContainer networkedRabbitmq;
    private final ToxiproxyContainer toxiproxy;
    private static final String DEFAULT_RABBITMQ_EXCHANGE_TYPE = BuiltinExchangeType.FANOUT.getType();
    private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQClientBuiltInReconnectTest.class);

    /* loaded from: input_file:io/vertx/rabbitmq/tests/RabbitMQClientBuiltInReconnectTest$RabbitMQMessageProducer.class */
    public class RabbitMQMessageProducer {
        private final RabbitMQClient rabbitMQClient;
        private final String topic;

        public RabbitMQMessageProducer(RabbitMQClient rabbitMQClient, String str) {
            this.rabbitMQClient = rabbitMQClient;
            this.topic = str;
        }

        public Future<Void> setUp() {
            return this.rabbitMQClient.start().compose(r7 -> {
                return this.rabbitMQClient.exchangeDeclare(this.topic, RabbitMQClientBuiltInReconnectTest.DEFAULT_RABBITMQ_EXCHANGE_TYPE, false, true);
            }).onSuccess(r6 -> {
                RabbitMQClientBuiltInReconnectTest.LOGGER.info("Created exchange '{}': {}", this.topic, r6);
            }).compose(r3 -> {
                return this.rabbitMQClient.confirmSelect();
            }).onSuccess(r5 -> {
                RabbitMQClientBuiltInReconnectTest.LOGGER.info("Confirmation enabled for topic {}", this.topic);
            }).onFailure(th -> {
                RabbitMQClientBuiltInReconnectTest.LOGGER.warn("Failed to create exchange '{}'", this.topic, th);
            });
        }

        public void send(String str) {
            RabbitMQClientBuiltInReconnectTest.LOGGER.info("Sending message {} to RabbitMQ topic {}", str, this.topic);
            this.rabbitMQClient.basicPublish(this.topic, "", Buffer.buffer(RabbitMQClientBuiltInReconnectTest.this.encode(str))).compose(r5 -> {
                return this.rabbitMQClient.waitForConfirms(60000L);
            }).onSuccess(r7 -> {
                RabbitMQClientBuiltInReconnectTest.LOGGER.debug("Published message {} to RabbitMQ topic {}", str, this.topic);
            }).onFailure(th -> {
                RabbitMQClientBuiltInReconnectTest.LOGGER.warn("Failed to publish message {} to RabbitMQ topic {}", new Object[]{str, this.topic, th});
            });
        }

        public Future<Void> close() {
            return this.rabbitMQClient.stop();
        }
    }

    /* loaded from: input_file:io/vertx/rabbitmq/tests/RabbitMQClientBuiltInReconnectTest$RabbitMqConsumer.class */
    public static class RabbitMqConsumer {
        private final RabbitMQClient client;

        public RabbitMqConsumer(Vertx vertx) {
            this.client = RabbitMQClient.create(vertx, new RabbitMQOptions());
        }

        public void listen() {
            this.client.start().onFailure(th -> {
                RabbitMQClientBuiltInReconnectTest.LOGGER.error("Fail to connect to RabbitMQ ", th);
            });
        }
    }

    public RabbitMQClientBuiltInReconnectTest() {
        LOGGER.info("Constructing");
        this.mapper = new ObjectMapper();
        this.network = Network.newNetwork();
        this.networkedRabbitmq = new GenericContainer(DockerImageName.parse("rabbitmq:3.8.6-alpine")).withExposedPorts(new Integer[]{5672}).withNetwork(this.network);
        this.toxiproxy = new ToxiproxyContainer(DockerImageName.parse("shopify/toxiproxy:2.1.4")).withNetwork(this.network);
    }

    @Before
    public void setup() throws InterruptedException {
        LOGGER.info("Starting");
        this.networkedRabbitmq.start();
        this.toxiproxy.start();
        Thread.sleep(5000L);
    }

    @After
    public void shutdown() {
        this.networkedRabbitmq.stop();
        this.toxiproxy.stop();
        LOGGER.info("Shutdown");
    }

    @Test(timeout = 90000)
    public void testRecoverConnectionOutage(TestContext testContext) throws Exception {
        Vertx vertx = Vertx.vertx();
        createAndStartProducer(vertx);
        Async async = testContext.async();
        Handler handler = asyncResult -> {
            LOGGER.info("Got another message. Connection recovery was successful.");
            vertx.cancelTimer(this.producerTimeReference.get());
            this.producerReference.get().close();
            this.consumerReference.get().stop();
            async.complete();
        };
        this.handlerReference.set(asyncResult2 -> {
            vertx.executeBlocking(() -> {
                LOGGER.info("Got a message, Shutdown rabbitmq.");
                this.networkedRabbitmq.stop();
                return null;
            }).compose(obj -> {
                return vertx.executeBlocking(() -> {
                    LOGGER.info("Restore RabbitMQ and wait for one more message.");
                    this.networkedRabbitmq.start();
                    this.handlerReference.set(handler);
                    return null;
                });
            });
        });
        createAndStartConsumer(vertx, this.consumerReference, str -> {
            Assert.assertEquals(TEST_MESSAGE, str);
            LOGGER.warn("Received message: {}", str);
            this.handlerReference.get().handle(Future.succeededFuture());
        });
        LOGGER.info("Await message from rabbitmq.");
    }

    private void createAndStartProducer(Vertx vertx) {
        RabbitMQMessageProducer rabbitMQMessageProducer = new RabbitMQMessageProducer(RabbitMQClient.create(vertx, getRabbitMQOptions()), TEST_EXCHANGE);
        rabbitMQMessageProducer.setUp().onFailure(th -> {
            LOGGER.error("Failed to start consumer: ", th);
        }).onSuccess(r8 -> {
            LOGGER.info("Setting up periodic send");
            vertx.setPeriodic(1000L, l -> {
                rabbitMQMessageProducer.send(TEST_MESSAGE);
            });
            this.producerReference.set(rabbitMQMessageProducer);
        });
    }

    private void createAndStartConsumer(Vertx vertx, AtomicReference<RabbitMQClient> atomicReference, Consumer<String> consumer) {
        Future<RabbitMQClient> createConsumer = createConsumer(vertx, consumer);
        Objects.requireNonNull(atomicReference);
        createConsumer.onSuccess((v1) -> {
            r1.set(v1);
        }).onFailure(th -> {
            LOGGER.error("Failed to start consumer: ", th);
        });
    }

    private <T> Future<RabbitMQClient> createConsumer(Vertx vertx, Consumer<String> consumer) {
        LOGGER.info("Registering RabbitMQ message consumer for exchange {}...", TEST_EXCHANGE);
        RabbitMQClient create = RabbitMQClient.create(vertx, getRabbitMQOptions());
        return create.start().compose(r7 -> {
            LOGGER.info("Consumer client started");
            return create.exchangeDeclare(TEST_EXCHANGE, DEFAULT_RABBITMQ_EXCHANGE_TYPE, false, true);
        }).compose(r72 -> {
            LOGGER.info("Exchange declared by consumer");
            return create.queueDeclare(TEST_QUEUE, false, true, true);
        }).compose(declareOk -> {
            LOGGER.info("Queue declared by consumer");
            return create.queueBind(TEST_QUEUE, TEST_EXCHANGE, "");
        }).compose(r4 -> {
            LOGGER.info("Queue bound to exchange");
            return create.basicConsumer(TEST_QUEUE);
        }).map(rabbitMQConsumer -> {
            return rabbitMQConsumer.handler(rabbitMQMessage -> {
                String str = (String) decode(rabbitMQMessage.body().getBytes(), String.class);
                LOGGER.debug("Received value {} of type {} on RabbitMQ message queue for exchange {}", new Object[]{str, String.class.getName(), TEST_EXCHANGE});
                consumer.accept(str);
            });
        }).map(rabbitMQConsumer2 -> {
            return rabbitMQConsumer2.exceptionHandler(th -> {
                LOGGER.warn("Exception in RabbitMQ consumer", th);
            });
        }).map(rabbitMQConsumer3 -> {
            return create;
        }).onSuccess(rabbitMQClient -> {
            LOGGER.debug("Registered RabbitMQ message consumer for exchange {}", TEST_EXCHANGE);
        });
    }

    private RabbitMQOptions getRabbitMQOptions() {
        RabbitMQOptions rabbitMQOptions = new RabbitMQOptions();
        ToxiproxyContainer.ContainerProxy proxy = this.toxiproxy.getProxy(this.networkedRabbitmq, 5672);
        rabbitMQOptions.setHost(proxy.getContainerIpAddress());
        rabbitMQOptions.setPort(proxy.getProxyPort());
        rabbitMQOptions.setConnectionTimeout(1000);
        rabbitMQOptions.setNetworkRecoveryInterval(1000L);
        rabbitMQOptions.setRequestedHeartbeat(1);
        rabbitMQOptions.setAutomaticRecoveryEnabled(true);
        rabbitMQOptions.setReconnectAttempts(0);
        return rabbitMQOptions;
    }

    public byte[] encode(Object obj) {
        try {
            return this.mapper.writeValueAsBytes(obj);
        } catch (Exception e) {
            throw new EncodeException(e.getMessage());
        }
    }

    public <T> T decode(byte[] bArr, Class<T> cls) {
        try {
            return (T) this.mapper.readValue(bArr, cls);
        } catch (Exception e) {
            throw new DecodeException("Failed to decode: " + e.getMessage());
        }
    }
}
