package io.vertx.micrometer;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Promise;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import io.vertx.micrometer.MicrometerMetricsTestBase;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.Fail;
import org.junit.Test;
import org.junit.runner.RunWith;

@RunWith(VertxUnitRunner.class)
/* loaded from: input_file:io/vertx/micrometer/VertxEventBusMetricsTest.class */
public class VertxEventBusMetricsTest extends MicrometerMetricsTestBase {
    @Test
    public void shouldReportEventbusMetrics(TestContext testContext) {
        this.metricsOptions.addLabels(new Label[]{Label.EB_ADDRESS, Label.EB_FAILURE, Label.CLASS_NAME});
        this.vertx = vertx(testContext).exceptionHandler(th -> {
            if (th.getMessage() == null || !th.getMessage().contains("expected failure")) {
                testContext.exceptionHandler().handle(th);
            }
        });
        int i = 2;
        Async async = testContext.async(2);
        Async async2 = testContext.async(2);
        this.vertx.deployVerticle(() -> {
            return new AbstractVerticle() { // from class: io.vertx.micrometer.VertxEventBusMetricsTest.1
                public void start(Promise<Void> promise) {
                    EventBus eventBus = this.vertx.eventBus();
                    Async async3 = async2;
                    eventBus.consumer("testSubject", message -> {
                        JsonObject jsonObject = (JsonObject) message.body();
                        try {
                            Thread.sleep(jsonObject.getLong("sleep").longValue());
                            if (jsonObject.containsKey("last")) {
                                async3.countDown();
                            }
                            if (jsonObject.getBoolean("fail").booleanValue()) {
                                throw new RuntimeException("It's ok! [expected failure]");
                            }
                        } catch (InterruptedException e) {
                            throw new RuntimeException(e);
                        }
                    });
                    async.countDown();
                }
            };
        }, new DeploymentOptions().setInstances(2));
        async.awaitSuccess();
        this.vertx.eventBus().publish("testSubject", new JsonObject("{\"fail\": false, \"sleep\": 30}"));
        this.vertx.eventBus().publish("testSubject", new JsonObject("{\"fail\": false, \"sleep\": 30}"));
        this.vertx.eventBus().publish("testSubject", new JsonObject("{\"fail\": false, \"sleep\": 10}"));
        this.vertx.eventBus().publish("testSubject", new JsonObject("{\"fail\": false, \"sleep\": 30}"));
        this.vertx.eventBus().publish("testSubject", new JsonObject("{\"fail\": true, \"sleep\": 10}"));
        this.vertx.eventBus().publish("testSubject", new JsonObject("{\"fail\": false, \"sleep\": 30}"));
        this.vertx.eventBus().publish("testSubject", new JsonObject("{\"fail\": true, \"sleep\": 10}"));
        this.vertx.eventBus().request("no handler", new JsonObject("{\"fail\": false, \"sleep\": 30}"), asyncResult -> {
        });
        this.vertx.eventBus().request("no handler", new JsonObject("{\"fail\": false, \"sleep\": 30}"), asyncResult2 -> {
        });
        this.vertx.eventBus().publish("testSubject", new JsonObject("{\"fail\": false, \"sleep\": 30, \"last\": true}"));
        async2.awaitSuccess();
        waitForValue(testContext, "vertx.eventbus.processed[address=testSubject,side=local]$COUNT", d -> {
            return d.intValue() == 8 * i;
        });
        Assertions.assertThat(listDatapoints(startsWith("vertx.eventbus"))).hasSize(10).contains(new MicrometerMetricsTestBase.Datapoint[]{dp("vertx.eventbus.handlers[address=testSubject]$VALUE", 2), dp("vertx.eventbus.pending[address=no handler,side=local]$VALUE", 0), dp("vertx.eventbus.pending[address=testSubject,side=local]$VALUE", 0), dp("vertx.eventbus.sent[address=no handler,side=local]$COUNT", 2), dp("vertx.eventbus.published[address=testSubject,side=local]$COUNT", 8), dp("vertx.eventbus.received[address=no handler,side=local]$COUNT", 2), dp("vertx.eventbus.received[address=testSubject,side=local]$COUNT", 8), dp("vertx.eventbus.delivered[address=testSubject,side=local]$COUNT", 8), dp("vertx.eventbus.reply.failures[address=no handler,failure=NO_HANDLERS]$COUNT", 2), dp("vertx.eventbus.processed[address=testSubject,side=local]$COUNT", 8.0d * 2)});
    }

    @Test
    public void shouldDiscardMessages(TestContext testContext) {
        this.vertx = vertx(testContext);
        EventBus eventBus = this.vertx.eventBus();
        MessageConsumer consumer = eventBus.consumer("foo");
        consumer.setMaxBufferedMessages(10);
        consumer.pause();
        consumer.handler(message -> {
            Fail.fail("should not be called");
        });
        for (int i = 0; i < 10; i++) {
            eventBus.send("foo", "the_message-" + i);
        }
        eventBus.send("foo", "last");
        waitForValue(testContext, "vertx.eventbus.discarded[side=local]$COUNT", d -> {
            return d.intValue() == 1;
        });
        Assertions.assertThat(listDatapoints(startsWith("vertx.eventbus"))).contains(new MicrometerMetricsTestBase.Datapoint[]{dp("vertx.eventbus.pending[side=local]$VALUE", 10)});
        consumer.unregister();
        waitForValue(testContext, "vertx.eventbus.discarded[side=local]$COUNT", d2 -> {
            return d2.intValue() == 11;
        });
        Assertions.assertThat(listDatapoints(startsWith("vertx.eventbus"))).contains(new MicrometerMetricsTestBase.Datapoint[]{dp("vertx.eventbus.pending[side=local]$VALUE", 0)});
    }
}
