package io.vertx.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Map;
import java.util.function.Supplier;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;

@RunWith(VertxUnitRunner.class)
/* loaded from: input_file:io/vertx/rabbitmq/RabbitMQPublishCodecTest.class */
public class RabbitMQPublishCodecTest {
    private final String TEST_EXCHANGE = getClass().getName() + "Exchange";
    private final String TEST_QUEUE = getClass().getName() + "Queue";
    private static final boolean DEFAULT_RABBITMQ_EXCHANGE_DURABLE = true;
    private static final boolean DEFAULT_RABBITMQ_EXCHANGE_AUTO_DELETE = false;
    private static final boolean DEFAULT_RABBITMQ_QUEUE_DURABLE = true;
    private static final boolean DEFAULT_RABBITMQ_QUEUE_EXCLUSIVE = false;
    private static final boolean DEFAULT_RABBITMQ_QUEUE_AUTO_DELETE = false;
    private final Vertx vertx;
    private RabbitMQConnection connection;
    private volatile Promise<byte[]> lastMessage;
    private RabbitMQPublisher<Object> publisher;
    private RabbitMQConsumer consumer;
    private static final Logger logger = LoggerFactory.getLogger(RabbitMQPublishCodecTest.class);
    private static final BuiltinExchangeType DEFAULT_RABBITMQ_EXCHANGE_TYPE = BuiltinExchangeType.FANOUT;
    private static final GenericContainer CONTAINER = RabbitMQBrokerProvider.getRabbitMqContainer();
    private static final byte[] HEX_ARRAY = "0123456789ABCDEF".getBytes(StandardCharsets.US_ASCII);

    public RabbitMQPublishCodecTest() throws IOException {
        logger.info("Constructing");
        this.vertx = Vertx.vertx(new VertxOptions().setWorkerPoolSize(6));
    }

    @BeforeClass
    public static void startup() {
        CONTAINER.start();
    }

    @AfterClass
    public static void shutdown() {
        CONTAINER.stop();
    }

    private RabbitMQOptions getRabbitMQOptions() {
        RabbitMQOptions rabbitMQOptions = new RabbitMQOptions();
        rabbitMQOptions.setHost(CONTAINER.getHost());
        rabbitMQOptions.setPort(CONTAINER.getMappedPort(5672).intValue());
        rabbitMQOptions.setConnectionName(getClass().getSimpleName());
        return rabbitMQOptions;
    }

    @Before
    public void setup(TestContext testContext) throws Exception {
        RabbitMQClient.connect(this.vertx, getRabbitMQOptions()).onSuccess(rabbitMQConnection -> {
            this.connection = rabbitMQConnection;
        }).onComplete(testContext.asyncAssertSuccess());
    }

    public static byte[] longToBytes(long j) {
        ByteBuffer allocate = ByteBuffer.allocate(8);
        allocate.putLong(j);
        return allocate.array();
    }

    public static byte[] intToBytes(int i) {
        ByteBuffer allocate = ByteBuffer.allocate(4);
        allocate.putInt(i);
        return allocate.array();
    }

    public static byte[] shortToBytes(short s) {
        ByteBuffer allocate = ByteBuffer.allocate(2);
        allocate.putShort(s);
        return allocate.array();
    }

    public static byte[] doubleToBytes(double d) {
        ByteBuffer allocate = ByteBuffer.allocate(8);
        allocate.putDouble(d);
        return allocate.array();
    }

    public static byte[] floatToBytes(float f) {
        ByteBuffer allocate = ByteBuffer.allocate(4);
        allocate.putFloat(f);
        return allocate.array();
    }

    public static byte[] charToBytes(char c) {
        ByteBuffer allocate = ByteBuffer.allocate(2);
        allocate.putChar(c);
        return allocate.array();
    }

    public static String bytesToHex(byte[] bArr) {
        byte[] bArr2 = new byte[bArr.length * 2];
        for (int i = 0; i < bArr.length; i++) {
            int i2 = bArr[i] & 255;
            bArr2[i * 2] = HEX_ARRAY[i2 >>> 4];
            bArr2[(i * 2) + 1] = HEX_ARRAY[i2 & 15];
        }
        return new String(bArr2, StandardCharsets.UTF_8);
    }

    private Future<Void> testTransfer(String str, Object obj, byte[] bArr) {
        logger.debug("Testing transfer of {}", obj);
        this.lastMessage = Promise.promise();
        return this.publisher.publish("", new AMQP.BasicProperties(), obj).compose(r6 -> {
            logger.debug("Published: {}", obj);
            return this.lastMessage.future();
        }).compose(bArr2 -> {
            logger.debug("Got message: {}", bArr2);
            if (Arrays.equals(bArr, bArr2)) {
                return Future.succeededFuture();
            }
            logger.info("{}: {} != {}", new Object[]{str, bArr2, bArr});
            return Future.failedFuture(str + ": " + bytesToHex(bArr2) + " != " + bytesToHex(bArr));
        });
    }

    @Test(timeout = 1200000)
    public void testRecoverConnectionOutage(TestContext testContext) throws Exception {
        Async async = testContext.async();
        createConsumer().compose(r3 -> {
            return createPublisher();
        }).compose(r7 -> {
            return testTransfer("String", "Hello", "Hello".getBytes(StandardCharsets.UTF_8));
        }).compose(r6 -> {
            return testTransfer("Null", null, new byte[0]);
        }).compose(r62 -> {
            Buffer buffer = Buffer.buffer("This is my buffer");
            return testTransfer("Buffer", buffer, buffer.getBytes());
        }).compose(r72 -> {
            JsonObject put = new JsonObject().put("key", Double.valueOf(1234.5678d));
            return testTransfer("JsonObject", put, put.toString().getBytes(StandardCharsets.UTF_8));
        }).compose(r73 -> {
            JsonArray add = new JsonArray().add(123).add("Bob");
            return testTransfer("JsonArray", add, add.toString().getBytes(StandardCharsets.UTF_8));
        }).compose(r74 -> {
            JsonArray jsonArray = new JsonArray();
            jsonArray.add(123);
            jsonArray.add("Bob");
            return testTransfer("JsonArray", jsonArray, jsonArray.toString().getBytes(StandardCharsets.UTF_8));
        }).compose(r32 -> {
            return this.consumer.cancel();
        }).compose(r33 -> {
            return this.publisher.cancel();
        }).compose(r34 -> {
            return this.connection.close();
        }).onFailure(th -> {
            logger.error("Failed: ", th);
            testContext.fail(th);
        }).onSuccess(r35 -> {
            async.complete();
        });
    }

    private Future<Void> createPublisher() {
        return this.connection.createChannelBuilder().withChannelOpenHandler(channel -> {
            channel.exchangeDeclare(this.TEST_EXCHANGE, DEFAULT_RABBITMQ_EXCHANGE_TYPE, true, false, (Map) null);
        }).createPublisher(this.TEST_EXCHANGE, (RabbitMQMessageCodec) null, new RabbitMQPublisherOptions()).onSuccess(rabbitMQPublisher -> {
            this.publisher = rabbitMQPublisher;
        }).mapEmpty();
    }

    private Future<Void> messageHandler(RabbitMQConsumer rabbitMQConsumer, RabbitMQMessage<byte[]> rabbitMQMessage) {
        logger.debug("Got message: {} ({})", rabbitMQMessage, this.lastMessage);
        try {
            this.lastMessage.complete((byte[]) rabbitMQMessage.body());
        } catch (Throwable th) {
            logger.error("Failed: ", th);
        }
        return rabbitMQMessage.basicAck();
    }

    private Future<Void> createConsumer() {
        return this.connection.createChannelBuilder().withChannelOpenHandler(channel -> {
            channel.exchangeDeclare(this.TEST_EXCHANGE, DEFAULT_RABBITMQ_EXCHANGE_TYPE, true, false, (Map) null);
            channel.queueDeclare(this.TEST_QUEUE, true, false, false, (Map) null);
            channel.queueBind(this.TEST_QUEUE, this.TEST_EXCHANGE, "", (Map) null);
        }).createConsumer(RabbitMQChannelBuilder.BYTE_ARRAY_MESSAGE_CODEC, this.TEST_QUEUE, (Supplier) null, new RabbitMQConsumerOptions(), this::messageHandler).onSuccess(rabbitMQConsumer -> {
            this.consumer = rabbitMQConsumer;
        }).mapEmpty();
    }
}
