package io.vertx.tests.eventbus;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.Completable;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.ThreadingModel;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.impl.clustered.NodeSelector;
import io.vertx.core.impl.VertxBootstrapImpl;
import io.vertx.core.spi.cluster.ClusteredNode;
import io.vertx.test.core.VertxTestBase;
import io.vertx.test.fakecluster.FakeClusterManager;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import org.junit.Test;

/* loaded from: input_file:io/vertx/tests/eventbus/MessageQueueOnWorkerThreadTest.class */
public class MessageQueueOnWorkerThreadTest extends VertxTestBase {
    private Vertx vertx;

    /* loaded from: input_file:io/vertx/tests/eventbus/MessageQueueOnWorkerThreadTest$CustomNodeSelector.class */
    private static class CustomNodeSelector implements NodeSelector {
        ClusteredNode clusterManager;
        String nodeId;

        private CustomNodeSelector() {
        }

        public void init(ClusteredNode clusteredNode) {
            this.clusterManager = clusteredNode;
        }

        public void eventBusStarted() {
            this.nodeId = this.clusterManager.getNodeId();
        }

        public void selectForSend(String str, Completable<String> completable) {
            try {
                TimeUnit.NANOSECONDS.sleep(150L);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            completable.succeed(this.nodeId);
        }

        public void selectForPublish(String str, Completable<Iterable<String>> completable) {
            throw new UnsupportedOperationException();
        }
    }

    /* loaded from: input_file:io/vertx/tests/eventbus/MessageQueueOnWorkerThreadTest$SenderVerticle.class */
    private class SenderVerticle extends AbstractVerticle {
        final boolean worker;
        int count;

        SenderVerticle(boolean z, int i) {
            this.worker = z;
            this.count = i;
        }

        public void start() {
            sendMessage();
        }

        void sendMessage() {
            if (this.worker) {
                this.vertx.executeBlocking(() -> {
                    if (this.count <= 0) {
                        return false;
                    }
                    this.vertx.eventBus().send("foo", "bar");
                    this.count--;
                    return true;
                }).onComplete(MessageQueueOnWorkerThreadTest.this.onSuccess(bool -> {
                    if (bool.booleanValue()) {
                        this.vertx.runOnContext(r3 -> {
                            sendMessage();
                        });
                    }
                }));
            } else if (this.count > 0) {
                this.vertx.eventBus().send("foo", "bar");
                this.count--;
                this.vertx.runOnContext(r3 -> {
                    sendMessage();
                });
            }
        }
    }

    @Override // io.vertx.test.core.VertxTestBase, io.vertx.test.core.AsyncTestBase
    public void setUp() throws Exception {
        super.setUp();
        this.vertx = (Vertx) new VertxBootstrapImpl().init().clusterManager(new FakeClusterManager()).clusterNodeSelector(new CustomNodeSelector()).clusteredVertx().await();
    }

    @Test
    public void testWorkerContext() throws Exception {
        test(true);
    }

    @Test
    public void testExecuteBlocking() throws Exception {
        test(false);
    }

    private void test(boolean z) throws Exception {
        int i = 20;
        int i2 = 100;
        waitFor(20 * 100);
        this.vertx.eventBus().consumer("foo", message -> {
            complete();
        }).completion().onComplete(onSuccess(r9 -> {
            this.vertx.deployVerticle(() -> {
                return new SenderVerticle(z, i2);
            }, new DeploymentOptions().setThreadingModel(ThreadingModel.WORKER).setInstances(i));
        }));
        await(5L, TimeUnit.SECONDS);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.vertx.test.core.VertxTestBase, io.vertx.test.core.AsyncTestBase
    public void tearDown() throws Exception {
        try {
            if (this.vertx != null) {
                close(Collections.singletonList(this.vertx));
            }
        } finally {
            super.tearDown();
        }
    }
}
