package io.vertx.rabbitmq.performance;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.rabbitmq.RabbitMQChannel;
import io.vertx.rabbitmq.RabbitMQConnection;
import io.vertx.rabbitmq.RabbitMQPublishOptions;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

/* loaded from: input_file:io/vertx/rabbitmq/performance/WaitOnEachMessage.class */
public class WaitOnEachMessage implements RabbitMQPublisherStresser {
    private RabbitMQConnection connection;
    private RabbitMQChannel channel;
    private final AtomicLong counter = new AtomicLong();
    private String exchange;

    @Override // io.vertx.rabbitmq.performance.RabbitMQPublisherStresser
    public String getName() {
        return "Wait on each message";
    }

    @Override // io.vertx.rabbitmq.performance.RabbitMQPublisherStresser
    public Future<Void> init(RabbitMQConnection rabbitMQConnection, String str) {
        this.connection = rabbitMQConnection;
        this.exchange = str;
        return rabbitMQConnection.createChannelBuilder().openChannel().compose(rabbitMQChannel -> {
            this.channel = rabbitMQChannel;
            return this.channel.getManagementChannel().exchangeDeclare(str, BuiltinExchangeType.FANOUT, true, false, (Map) null);
        }).compose(r3 -> {
            return this.channel.getManagementChannel().confirmSelect();
        });
    }

    @Override // io.vertx.rabbitmq.performance.RabbitMQPublisherStresser
    public Future<Void> shutdown() {
        return this.channel.close();
    }

    @Override // io.vertx.rabbitmq.performance.RabbitMQPublisherStresser
    public Future<Void> runTest(long j) {
        Promise<Void> promise = Promise.promise();
        this.counter.set(j);
        runIteration(promise);
        return promise.future();
    }

    private void runIteration(Promise<Void> promise) {
        long decrementAndGet = this.counter.decrementAndGet();
        if (decrementAndGet <= 0) {
            promise.complete();
        } else {
            this.channel.basicPublish(new RabbitMQPublishOptions().setWaitForConfirm(true), this.exchange, "", true, new AMQP.BasicProperties(), Long.toString(decrementAndGet).getBytes()).onComplete(asyncResult -> {
                if (asyncResult.succeeded()) {
                    runIteration(promise);
                } else {
                    promise.fail(asyncResult.cause());
                }
            });
        }
    }
}
