package io.vertx.tests.eventbus;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.eventbus.ReplyException;
import io.vertx.core.eventbus.ReplyFailure;
import io.vertx.core.spi.cluster.ClusterManager;
import io.vertx.core.spi.cluster.RegistrationUpdateEvent;
import io.vertx.test.core.TestUtils;
import io.vertx.test.fakecluster.FakeClusterManager;
import io.vertx.tests.eventbus.EventBusTestBase;
import io.vertx.tests.shareddata.AsyncMapTest;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import org.hamcrest.CoreMatchers;
import org.junit.Test;

/* loaded from: input_file:io/vertx/tests/eventbus/ClusteredEventBusTestBase.class */
public class ClusteredEventBusTestBase extends EventBusTestBase {
    protected static final String ADDRESS1 = "some-address1";

    /* renamed from: io.vertx.tests.eventbus.ClusteredEventBusTestBase$1MyVerticle, reason: invalid class name */
    /* loaded from: input_file:io/vertx/tests/eventbus/ClusteredEventBusTestBase$1MyVerticle.class */
    class C1MyVerticle extends AbstractVerticle {
        final String pingServerAddress;
        final String pingClientAddress;
        final /* synthetic */ AtomicInteger val$count;

        C1MyVerticle(String str, String str2, AtomicInteger atomicInteger) {
            this.val$count = atomicInteger;
            this.pingServerAddress = str;
            this.pingClientAddress = str2;
        }

        public void start(Promise<Void> promise) throws Exception {
            this.vertx.eventBus().consumer(this.pingServerAddress, message -> {
                message.reply("pong");
            }).completion().onComplete(promise);
        }

        public void stop(Promise<Void> promise) throws Exception {
            Future request = this.vertx.eventBus().request(this.pingClientAddress, "ping");
            ClusteredEventBusTestBase clusteredEventBusTestBase = ClusteredEventBusTestBase.this;
            AtomicInteger atomicInteger = this.val$count;
            request.onComplete(clusteredEventBusTestBase.onSuccess(message -> {
                ClusteredEventBusTestBase.this.assertEquals("pong", message.body());
                atomicInteger.incrementAndGet();
                this.vertx.setPeriodic(10L, l -> {
                    if (atomicInteger.get() == 2) {
                        promise.complete();
                    }
                });
            }));
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.vertx.test.core.VertxTestBase
    public ClusterManager getClusterManager() {
        return new FakeClusterManager();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.vertx.test.core.VertxTestBase
    public Future<Vertx> clusteredVertx(VertxOptions vertxOptions, ClusterManager clusterManager) {
        return super.clusteredVertx(vertxOptions, clusterManager).onSuccess(vertx -> {
            EventBusTestBase.ImmutableObjectCodec immutableObjectCodec = new EventBusTestBase.ImmutableObjectCodec();
            vertx.eventBus().registerCodec(immutableObjectCodec);
            vertx.eventBus().codecSelector(obj -> {
                if (obj instanceof EventBusTestBase.ImmutableObject) {
                    return immutableObjectCodec.name();
                }
                return null;
            });
            vertx.eventBus().clusterSerializableChecker(str -> {
                return Boolean.valueOf(str.startsWith(AsyncMapTest.class.getName()));
            });
            vertx.eventBus().serializableChecker(str2 -> {
                return Boolean.valueOf(((Boolean) EventBus.DEFAULT_SERIALIZABLE_CHECKER.apply(str2)).booleanValue() || str2.startsWith(AsyncMapTest.class.getName()));
            });
        });
    }

    @Override // io.vertx.tests.eventbus.EventBusTestBase
    protected boolean shouldImmutableObjectBeCopied() {
        return true;
    }

    @Override // io.vertx.tests.eventbus.EventBusTestBase
    protected Vertx[] vertices(int i) {
        if (this.vertices == null) {
            startNodes(i);
        }
        Vertx[] vertxArr = new Vertx[i];
        for (int i2 = 0; i2 < i; i2++) {
            vertxArr[i2] = this.vertices[i2];
        }
        return vertxArr;
    }

    @Test
    public void testRegisterRemote1() {
        startNodes(2);
        String randomUnicodeString = TestUtils.randomUnicodeString(100);
        this.vertices[0].eventBus().consumer(ADDRESS1).handler(message -> {
            assertEquals(randomUnicodeString, message.body());
            testComplete();
        }).completion().onComplete(asyncResult -> {
            assertTrue(asyncResult.succeeded());
            this.vertices[1].eventBus().send(ADDRESS1, randomUnicodeString);
        });
        await();
    }

    @Test
    public void testRegisterRemote2() {
        startNodes(2);
        String randomUnicodeString = TestUtils.randomUnicodeString(100);
        this.vertices[0].eventBus().consumer(ADDRESS1, message -> {
            assertEquals(randomUnicodeString, message.body());
            testComplete();
        }).completion().onComplete(asyncResult -> {
            assertTrue(asyncResult.succeeded());
            this.vertices[1].eventBus().send(ADDRESS1, randomUnicodeString);
        });
        await();
    }

    @Test
    public void testMessageBodyInterceptor() throws Exception {
        String randomUnicodeString = TestUtils.randomUnicodeString(13);
        startNodes(2);
        waitFor(2);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this.vertices[0].eventBus().registerCodec(new EventBusTestBase.StringLengthCodec()).consumer("whatever", message -> {
            assertEquals(randomUnicodeString.length(), ((Integer) message.body()).intValue());
            complete();
        }).completion().onComplete(asyncResult -> {
            countDownLatch.countDown();
        });
        awaitLatch(countDownLatch);
        EventBusTestBase.StringLengthCodec stringLengthCodec = new EventBusTestBase.StringLengthCodec();
        this.vertices[1].eventBus().registerCodec(stringLengthCodec).addOutboundInterceptor(deliveryContext -> {
            if ("whatever".equals(deliveryContext.message().address())) {
                assertEquals(randomUnicodeString, deliveryContext.body());
                complete();
            }
            deliveryContext.next();
        }).send("whatever", randomUnicodeString, new DeliveryOptions().setCodecName(stringLengthCodec.name()));
        await();
    }

    @Test
    public void testClusteredUnregistration() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(3);
        startNodes(2, () -> {
            return new WrappedClusterManager(getClusterManager()) { // from class: io.vertx.tests.eventbus.ClusteredEventBusTestBase.1
                @Override // io.vertx.tests.eventbus.WrappedClusterManager
                public void registrationsUpdated(RegistrationUpdateEvent registrationUpdateEvent) {
                    super.registrationsUpdated(registrationUpdateEvent);
                    if (registrationUpdateEvent.address().equals("foo") && registrationUpdateEvent.registrations().isEmpty()) {
                        countDownLatch.countDown();
                    }
                }
            };
        });
        MessageConsumer consumer = this.vertices[0].eventBus().consumer("foo", message -> {
            message.reply(message.body());
        });
        consumer.completion().onComplete(onSuccess(r9 -> {
            this.vertices[0].eventBus().request("foo", "echo").onComplete(onSuccess(message2 -> {
                assertEquals("echo", message2.body());
                this.vertices[1].eventBus().request("foo", "echo").onComplete(onSuccess(message2 -> {
                    assertEquals("echo", message2.body());
                    consumer.unregister().onComplete(onSuccess(r3 -> {
                        countDownLatch.countDown();
                    }));
                }));
            }));
        }));
        awaitLatch(countDownLatch);
        this.vertices[1].eventBus().request("foo", "echo").onComplete(onFailure(th -> {
            assertThat(th, CoreMatchers.is(CoreMatchers.instanceOf(ReplyException.class)));
            assertEquals(ReplyFailure.NO_HANDLERS, ((ReplyException) th).failureType());
            this.vertices[0].eventBus().request("foo", "echo").onComplete(onFailure(th -> {
                assertThat(th, CoreMatchers.is(CoreMatchers.instanceOf(ReplyException.class)));
                assertEquals(ReplyFailure.NO_HANDLERS, ((ReplyException) th).failureType());
                testComplete();
            }));
        }));
        await();
    }

    @Test
    public void testMessagingInStopMethod() throws Exception {
        startNodes(2);
        AtomicInteger atomicInteger = new AtomicInteger();
        waitFor(2);
        this.vertices[0].deployVerticle(new C1MyVerticle("foo", "bar", atomicInteger)).onComplete(onSuccess(str -> {
            this.vertices[1].deployVerticle(new C1MyVerticle("bar", "foo", atomicInteger)).onComplete(onSuccess(str -> {
                this.vertices[0].close().onComplete(onSuccess(r3 -> {
                    complete();
                }));
                this.vertices[1].close().onComplete(onSuccess(r32 -> {
                    complete();
                }));
            }));
        }));
        await();
    }
}
