package org.apache.pulsar.broker.service;

import com.google.common.collect.Sets;
import java.lang.invoke.SerializedLambda;
import java.lang.reflect.Method;
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.pulsar.broker.resourcegroup.ResourceGroupDispatchLimiter;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.schema.proto.Test;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.ResourceGroup;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.AssertJUnit;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = {"quarantine"})
/* loaded from: input_file:org/apache/pulsar/broker/service/ReplicatorRateLimiterTest.class */
public class ReplicatorRateLimiterTest extends ReplicatorTestBase {
    protected String methodName;
    private static final Logger log = LoggerFactory.getLogger(ReplicatorRateLimiterTest.class);

    /* loaded from: input_file:org/apache/pulsar/broker/service/ReplicatorRateLimiterTest$DispatchRateType.class */
    enum DispatchRateType {
        messageRate,
        byteRate
    }

    @BeforeMethod
    public void beforeMethod(Method method) throws Exception {
        this.methodName = method.getName();
    }

    @Override // org.apache.pulsar.broker.service.ReplicatorTestBase
    @BeforeClass(timeOut = 300000)
    public void setup() throws Exception {
        super.setup();
    }

    @Override // org.apache.pulsar.broker.service.ReplicatorTestBase
    @AfterClass(alwaysRun = true, timeOut = 300000)
    public void cleanup() throws Exception {
        super.cleanup();
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "dispatchRateType")
    public Object[][] dispatchRateProvider() {
        return new Object[]{new Object[]{DispatchRateType.messageRate}, new Object[]{DispatchRateType.byteRate}};
    }

    @Test
    public void testReplicatorRateLimiterWithOnlyTopicLevel() throws Exception {
        cleanup();
        this.config1.setDispatchThrottlingRatePerReplicatorInMsg(0);
        this.config1.setDispatchThrottlingRatePerReplicatorInByte(0L);
        setup();
        String str = "pulsar/replicatorchange-" + System.currentTimeMillis();
        String str2 = "persistent://" + str + "/testReplicatorRateLimiterWithOnlyTopicLevel";
        this.admin1.namespaces().createNamespace(str);
        this.admin1.namespaces().setNamespaceReplicationClusters(str, Sets.newHashSet(new String[]{"r1", "r2"}));
        PulsarClient build = PulsarClient.builder().serviceUrl(this.url1.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        try {
            build.newProducer().topic(str2).create().close();
            PersistentTopic persistentTopic = (PersistentTopic) this.pulsar1.getBrokerService().getOrCreateTopic(str2).get();
            AssertJUnit.assertFalse(((Replicator) persistentTopic.getReplicators().values().get(0)).getRateLimiter().isPresent());
            AssertJUnit.assertFalse(((Replicator) persistentTopic.getReplicators().values().get(0)).getResourceGroupDispatchRateLimiter().isPresent());
            DispatchRate build2 = DispatchRate.builder().dispatchThrottlingRateInMsg(10).dispatchThrottlingRateInByte(20L).ratePeriodInSecond(30).build();
            this.admin1.topics().setReplicatorDispatchRate(str2, build2);
            Awaitility.await().untilAsserted(() -> {
                Assert.assertEquals(this.admin1.topics().getReplicatorDispatchRate(str2), build2);
            });
            Assert.assertTrue(((Replicator) persistentTopic.getReplicators().values().get(0)).getRateLimiter().isPresent());
            Assert.assertEquals(((DispatchRateLimiter) ((Replicator) persistentTopic.getReplicators().values().get(0)).getRateLimiter().get()).getDispatchRateOnMsg(), 10L);
            Assert.assertEquals(((DispatchRateLimiter) ((Replicator) persistentTopic.getReplicators().values().get(0)).getRateLimiter().get()).getDispatchRateOnByte(), 20L);
            this.admin1.topics().removeReplicatorDispatchRate(str2);
            Awaitility.await().untilAsserted(() -> {
                Assert.assertNull(this.admin1.topics().getReplicatorDispatchRate(str2));
            });
            Assert.assertEquals(((DispatchRateLimiter) ((Replicator) persistentTopic.getReplicators().values().get(0)).getRateLimiter().get()).getDispatchRateOnMsg(), -1L);
            Assert.assertEquals(((DispatchRateLimiter) ((Replicator) persistentTopic.getReplicators().values().get(0)).getRateLimiter().get()).getDispatchRateOnByte(), -1L);
            String uuid = UUID.randomUUID().toString();
            ResourceGroup resourceGroup = new ResourceGroup();
            resourceGroup.setReplicationDispatchRateInBytes(10L);
            resourceGroup.setReplicationDispatchRateInMsgs(20L);
            this.admin1.resourcegroups().createResourceGroup(uuid, resourceGroup);
            Awaitility.await().untilAsserted(() -> {
                Assert.assertNotNull(this.admin1.resourcegroups().getResourceGroup(uuid));
            });
            this.admin1.topicPolicies().setResourceGroup(str2, uuid);
            Replicator replicator = (Replicator) persistentTopic.getReplicators().values().get(0);
            Awaitility.await().untilAsserted(() -> {
                Assert.assertTrue(replicator.getResourceGroupDispatchRateLimiter().isPresent());
                ResourceGroupDispatchLimiter resourceGroupDispatchLimiter = (ResourceGroupDispatchLimiter) replicator.getResourceGroupDispatchRateLimiter().get();
                Assert.assertEquals(resourceGroupDispatchLimiter.getDispatchRateOnByte(), resourceGroup.getReplicationDispatchRateInBytes().longValue());
                Assert.assertEquals(resourceGroupDispatchLimiter.getDispatchRateOnMsg(), resourceGroup.getReplicationDispatchRateInMsgs().longValue());
            });
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
            throw th;
        }
    }

    @Test
    public void testReplicatorRateLimiterWithOnlyNamespaceLevel() throws Exception {
        cleanup();
        this.config1.setDispatchThrottlingRatePerReplicatorInMsg(0);
        this.config1.setDispatchThrottlingRatePerReplicatorInByte(0L);
        setup();
        String str = "pulsar/replicatorchange-" + System.currentTimeMillis();
        String str2 = "persistent://" + str + "/testReplicatorRateLimiterWithOnlyNamespaceLevel";
        this.admin1.namespaces().createNamespace(str);
        this.admin1.namespaces().setNamespaceReplicationClusters(str, Sets.newHashSet(new String[]{"r1", "r2"}));
        PulsarClient build = PulsarClient.builder().serviceUrl(this.url1.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        try {
            build.newProducer().topic(str2).create().close();
            PersistentTopic persistentTopic = (PersistentTopic) this.pulsar1.getBrokerService().getOrCreateTopic(str2).get();
            AssertJUnit.assertFalse(((Replicator) persistentTopic.getReplicators().values().get(0)).getRateLimiter().isPresent());
            AssertJUnit.assertFalse(((Replicator) persistentTopic.getReplicators().values().get(0)).getResourceGroupDispatchRateLimiter().isPresent());
            DispatchRate build2 = DispatchRate.builder().dispatchThrottlingRateInMsg(10).dispatchThrottlingRateInByte(20L).ratePeriodInSecond(30).build();
            this.admin1.namespaces().setReplicatorDispatchRate(str, build2);
            Awaitility.await().untilAsserted(() -> {
                Assert.assertEquals(this.admin1.namespaces().getReplicatorDispatchRate(str), build2);
            });
            Assert.assertTrue(((Replicator) persistentTopic.getReplicators().values().get(0)).getRateLimiter().isPresent());
            Assert.assertEquals(((DispatchRateLimiter) ((Replicator) persistentTopic.getReplicators().values().get(0)).getRateLimiter().get()).getDispatchRateOnMsg(), 10L);
            Assert.assertEquals(((DispatchRateLimiter) ((Replicator) persistentTopic.getReplicators().values().get(0)).getRateLimiter().get()).getDispatchRateOnByte(), 20L);
            this.admin1.namespaces().removeReplicatorDispatchRate(str);
            Awaitility.await().untilAsserted(() -> {
                Assert.assertNull(this.admin1.namespaces().getReplicatorDispatchRate(str));
            });
            Assert.assertEquals(((DispatchRateLimiter) ((Replicator) persistentTopic.getReplicators().values().get(0)).getRateLimiter().get()).getDispatchRateOnMsg(), -1L);
            Assert.assertEquals(((DispatchRateLimiter) ((Replicator) persistentTopic.getReplicators().values().get(0)).getRateLimiter().get()).getDispatchRateOnByte(), -1L);
            String uuid = UUID.randomUUID().toString();
            ResourceGroup resourceGroup = new ResourceGroup();
            resourceGroup.setReplicationDispatchRateInBytes(10L);
            resourceGroup.setReplicationDispatchRateInMsgs(20L);
            this.admin1.resourcegroups().createResourceGroup(uuid, resourceGroup);
            Awaitility.await().untilAsserted(() -> {
                Assert.assertNotNull(this.admin1.resourcegroups().getResourceGroup(uuid));
            });
            this.admin1.namespaces().setNamespaceResourceGroup(str, uuid);
            Replicator replicator = (Replicator) persistentTopic.getReplicators().values().get(0);
            Awaitility.await().untilAsserted(() -> {
                Assert.assertTrue(replicator.getResourceGroupDispatchRateLimiter().isPresent());
                ResourceGroupDispatchLimiter resourceGroupDispatchLimiter = (ResourceGroupDispatchLimiter) replicator.getResourceGroupDispatchRateLimiter().get();
                Assert.assertEquals(resourceGroupDispatchLimiter.getDispatchRateOnByte(), resourceGroup.getReplicationDispatchRateInBytes().longValue());
                Assert.assertEquals(resourceGroupDispatchLimiter.getDispatchRateOnMsg(), resourceGroup.getReplicationDispatchRateInMsgs().longValue());
            });
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
            throw th;
        }
    }

    @Test
    public void testReplicatorRateLimiterWithOnlyBrokerLevel() throws Exception {
        cleanup();
        this.config1.setDispatchThrottlingRatePerReplicatorInMsg(0);
        this.config1.setDispatchThrottlingRatePerReplicatorInByte(0L);
        setup();
        String str = "pulsar/replicatorchange-" + System.currentTimeMillis();
        String str2 = "persistent://" + str + "/testReplicatorRateLimiterWithOnlyBrokerLevel";
        this.admin1.namespaces().createNamespace(str);
        this.admin1.namespaces().setNamespaceReplicationClusters(str, Sets.newHashSet(new String[]{"r1", "r2"}));
        PulsarClient build = PulsarClient.builder().serviceUrl(this.url1.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        try {
            build.newProducer().topic(str2).create().close();
            PersistentTopic persistentTopic = (PersistentTopic) this.pulsar1.getBrokerService().getOrCreateTopic(str2).get();
            AssertJUnit.assertFalse(((Replicator) persistentTopic.getReplicators().values().get(0)).getRateLimiter().isPresent());
            this.admin1.brokers().updateDynamicConfiguration("dispatchThrottlingRatePerReplicatorInMsg", "10");
            this.admin1.brokers().updateDynamicConfiguration("dispatchThrottlingRatePerReplicatorInByte", "20");
            Awaitility.await().untilAsserted(() -> {
                Assert.assertTrue(this.admin1.brokers().getAllDynamicConfigurations().containsKey("dispatchThrottlingRatePerReplicatorInByte"));
                Assert.assertEquals((String) this.admin1.brokers().getAllDynamicConfigurations().get("dispatchThrottlingRatePerReplicatorInMsg"), "10");
                Assert.assertEquals((String) this.admin1.brokers().getAllDynamicConfigurations().get("dispatchThrottlingRatePerReplicatorInByte"), "20");
            });
            Assert.assertTrue(((Replicator) persistentTopic.getReplicators().values().get(0)).getRateLimiter().isPresent());
            Assert.assertEquals(((DispatchRateLimiter) ((Replicator) persistentTopic.getReplicators().values().get(0)).getRateLimiter().get()).getDispatchRateOnMsg(), 10L);
            Assert.assertEquals(((DispatchRateLimiter) ((Replicator) persistentTopic.getReplicators().values().get(0)).getRateLimiter().get()).getDispatchRateOnByte(), 20L);
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
            throw th;
        }
    }

    @Test
    public void testReplicatorRatePriority() throws Exception {
        cleanup();
        this.config1.setDispatchThrottlingRatePerReplicatorInMsg(100);
        this.config1.setDispatchThrottlingRatePerReplicatorInByte(200L);
        setup();
        String str = "pulsar/replicatorchange-" + System.currentTimeMillis();
        String str2 = "persistent://" + str + "/ratechange";
        this.admin1.namespaces().createNamespace(str);
        this.admin1.namespaces().setNamespaceReplicationClusters(str, Sets.newHashSet(new String[]{"r1", "r2"}));
        PulsarClient build = PulsarClient.builder().serviceUrl(this.url1.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        try {
            build.newProducer().topic(str2).create().close();
            PersistentTopic persistentTopic = (PersistentTopic) this.pulsar1.getBrokerService().getOrCreateTopic(str2).get();
            Assert.assertTrue(((Replicator) persistentTopic.getReplicators().values().get(0)).getRateLimiter().isPresent());
            Assert.assertEquals(((DispatchRateLimiter) ((Replicator) persistentTopic.getReplicators().values().get(0)).getRateLimiter().get()).getDispatchRateOnMsg(), 100L);
            Assert.assertEquals(((DispatchRateLimiter) ((Replicator) persistentTopic.getReplicators().values().get(0)).getRateLimiter().get()).getDispatchRateOnByte(), 200L);
            DispatchRate build2 = DispatchRate.builder().dispatchThrottlingRateInMsg(50).dispatchThrottlingRateInByte(60L).ratePeriodInSecond(60).build();
            this.admin1.namespaces().setReplicatorDispatchRate(str, build2);
            Awaitility.await().untilAsserted(() -> {
                Assert.assertEquals(this.admin1.namespaces().getReplicatorDispatchRate(str), build2);
            });
            Assert.assertEquals(((DispatchRateLimiter) ((Replicator) persistentTopic.getReplicators().values().get(0)).getRateLimiter().get()).getDispatchRateOnMsg(), 50L);
            Assert.assertEquals(((DispatchRateLimiter) ((Replicator) persistentTopic.getReplicators().values().get(0)).getRateLimiter().get()).getDispatchRateOnByte(), 60L);
            DispatchRate build3 = DispatchRate.builder().dispatchThrottlingRateInMsg(10).dispatchThrottlingRateInByte(20L).ratePeriodInSecond(30).build();
            this.admin1.topics().setReplicatorDispatchRate(str2, build3);
            Awaitility.await().untilAsserted(() -> {
                Assert.assertEquals(this.admin1.topics().getReplicatorDispatchRate(str2), build3);
            });
            Assert.assertEquals(((DispatchRateLimiter) ((Replicator) persistentTopic.getReplicators().values().get(0)).getRateLimiter().get()).getDispatchRateOnMsg(), 10L);
            Assert.assertEquals(((DispatchRateLimiter) ((Replicator) persistentTopic.getReplicators().values().get(0)).getRateLimiter().get()).getDispatchRateOnByte(), 20L);
            DispatchRate build4 = DispatchRate.builder().dispatchThrottlingRateInMsg(500).dispatchThrottlingRateInByte(600L).ratePeriodInSecond(700).build();
            this.admin1.namespaces().setReplicatorDispatchRate(str, build4);
            Awaitility.await().untilAsserted(() -> {
                Assert.assertEquals(this.admin1.namespaces().getReplicatorDispatchRate(str), build4);
            });
            Assert.assertEquals(((DispatchRateLimiter) ((Replicator) persistentTopic.getReplicators().values().get(0)).getRateLimiter().get()).getDispatchRateOnByte(), 20L);
            this.admin1.topics().removeReplicatorDispatchRate(str2);
            Awaitility.await().untilAsserted(() -> {
                Assert.assertNull(this.admin1.topics().getReplicatorDispatchRate(str2));
            });
            Assert.assertEquals(((DispatchRateLimiter) ((Replicator) persistentTopic.getReplicators().values().get(0)).getRateLimiter().get()).getDispatchRateOnMsg(), 500L);
            Assert.assertEquals(((DispatchRateLimiter) ((Replicator) persistentTopic.getReplicators().values().get(0)).getRateLimiter().get()).getDispatchRateOnByte(), 600L);
            this.admin1.namespaces().setReplicatorDispatchRate(str, (DispatchRate) null);
            Awaitility.await().untilAsserted(() -> {
                Assert.assertEquals(((DispatchRateLimiter) ((Replicator) persistentTopic.getReplicators().values().get(0)).getRateLimiter().get()).getDispatchRateOnMsg(), 100L);
            });
            Assert.assertEquals(((DispatchRateLimiter) ((Replicator) persistentTopic.getReplicators().values().get(0)).getRateLimiter().get()).getDispatchRateOnByte(), 200L);
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
            throw th;
        }
    }

    @Test
    public void testReplicatorRateLimiterDynamicallyChange() throws Exception {
        log.info("--- Starting ReplicatorTest::{} --- ", this.methodName);
        String str = "pulsar/replicatorchange-" + System.currentTimeMillis();
        String str2 = "persistent://" + str + "/ratechange";
        this.admin1.namespaces().createNamespace(str);
        this.admin1.namespaces().setNamespaceReplicationClusters(str, Sets.newHashSet(new String[]{"r1", "r2"}));
        PulsarClient build = PulsarClient.builder().serviceUrl(this.url1.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        try {
            build.newProducer().topic(str2).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create().close();
            PersistentTopic persistentTopic = (PersistentTopic) this.pulsar1.getBrokerService().getOrCreateTopic(str2).get();
            Assert.assertFalse(((Replicator) persistentTopic.getReplicators().values().get(0)).getRateLimiter().isPresent());
            this.admin1.namespaces().setReplicatorDispatchRate(str, DispatchRate.builder().dispatchThrottlingRateInMsg(100).dispatchThrottlingRateInByte(-1L).ratePeriodInSecond(360).build());
            boolean z = false;
            int i = 0;
            while (true) {
                if (i >= 5) {
                    break;
                }
                if (((Replicator) persistentTopic.getReplicators().values().get(0)).getRateLimiter().isPresent()) {
                    z = true;
                    break;
                } else {
                    if (i != 5 - 1) {
                        Thread.sleep(100L);
                    }
                    i++;
                }
            }
            Assert.assertTrue(z);
            Assert.assertEquals(((DispatchRateLimiter) ((Replicator) persistentTopic.getReplicators().values().get(0)).getRateLimiter().get()).getDispatchRateOnMsg(), 100);
            DispatchRate build2 = DispatchRate.builder().dispatchThrottlingRateInMsg(-1).dispatchThrottlingRateInByte(500).ratePeriodInSecond(360).build();
            this.admin1.namespaces().setReplicatorDispatchRate(str, build2);
            boolean z2 = false;
            int i2 = 0;
            while (true) {
                if (i2 >= 5) {
                    break;
                }
                if (((DispatchRateLimiter) ((Replicator) persistentTopic.getReplicators().values().get(0)).getRateLimiter().get()).getDispatchRateOnByte() == 500) {
                    z2 = true;
                    break;
                } else {
                    if (i2 != 5 - 1) {
                        Thread.sleep(100L);
                    }
                    i2++;
                }
            }
            Assert.assertTrue(z2);
            Assert.assertEquals(this.admin1.namespaces().getReplicatorDispatchRate(str), build2);
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
            throw th;
        }
    }

    @Test(dataProvider = "dispatchRateType")
    public void testReplicatorRateLimiterMessageNotReceivedAllMessages(DispatchRateType dispatchRateType) throws Exception {
        log.info("--- Starting ReplicatorTest::{} --- ", this.methodName);
        String str = "pulsar/replicatorbyteandmsg-" + dispatchRateType.toString() + "-" + System.currentTimeMillis();
        String str2 = "persistent://" + str + "/notReceivedAll";
        this.admin1.namespaces().createNamespace(str);
        this.admin1.namespaces().setNamespaceReplicationClusters(str, Sets.newHashSet(new String[]{"r1", "r2"}));
        this.admin1.namespaces().setReplicatorDispatchRate(str, DispatchRateType.messageRate.equals(dispatchRateType) ? DispatchRate.builder().dispatchThrottlingRateInMsg(100).dispatchThrottlingRateInByte(-1L).ratePeriodInSecond(360).build() : DispatchRate.builder().dispatchThrottlingRateInMsg(-1).dispatchThrottlingRateInByte(100L).ratePeriodInSecond(360).build());
        PulsarClient build = PulsarClient.builder().serviceUrl(this.url1.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        try {
            Producer create = build.newProducer().topic(str2).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
            PersistentTopic persistentTopic = (PersistentTopic) this.pulsar1.getBrokerService().getOrCreateTopic(str2).get();
            boolean z = false;
            int i = 0;
            while (true) {
                if (i >= 5) {
                    break;
                }
                if (((Replicator) persistentTopic.getReplicators().values().get(0)).getRateLimiter().isPresent()) {
                    z = true;
                    break;
                } else {
                    if (i != 5 - 1) {
                        Thread.sleep(100L);
                    }
                    i++;
                }
            }
            Assert.assertTrue(z);
            if (DispatchRateType.messageRate.equals(dispatchRateType)) {
                Assert.assertEquals(((DispatchRateLimiter) ((Replicator) persistentTopic.getReplicators().values().get(0)).getRateLimiter().get()).getDispatchRateOnMsg(), 100L);
            } else {
                Assert.assertEquals(((DispatchRateLimiter) ((Replicator) persistentTopic.getReplicators().values().get(0)).getRateLimiter().get()).getDispatchRateOnByte(), 100L);
            }
            build = PulsarClient.builder().serviceUrl(this.url2.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
            try {
                AtomicInteger atomicInteger = new AtomicInteger(0);
                Consumer subscribe = build.newConsumer().topic(new String[]{str2}).subscriptionName("sub2-in-cluster2").messageListener((consumer, message) -> {
                    Assert.assertNotNull(message, "Message cannot be null");
                    log.debug("Received message [{}] in the listener", new String(message.getData()));
                    atomicInteger.incrementAndGet();
                }).subscribe();
                for (int i2 = 0; i2 < 500; i2++) {
                    create.send(new byte[80]);
                }
                log.info("Received message number: [{}]", Integer.valueOf(atomicInteger.get()));
                Assert.assertTrue(atomicInteger.get() < 200);
                subscribe.close();
                create.close();
                if (Collections.singletonList(build).get(0) != null) {
                    build.close();
                }
            } finally {
                if (Collections.singletonList(build).get(0) != null) {
                    build.close();
                }
            }
        } catch (Throwable th) {
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
            throw th;
        }
    }

    @Test
    public void testReplicatorRateLimiterMessageReceivedAllMessages() throws Exception {
        log.info("--- Starting ReplicatorTest::{} --- ", this.methodName);
        String str = "pulsar/replicatormsg-" + System.currentTimeMillis();
        String str2 = "persistent://" + str + "/notReceivedAll";
        this.admin1.namespaces().createNamespace(str);
        this.admin1.namespaces().setNamespaceReplicationClusters(str, Sets.newHashSet(new String[]{"r1", "r2"}));
        this.admin1.namespaces().setReplicatorDispatchRate(str, DispatchRate.builder().dispatchThrottlingRateInMsg(100).dispatchThrottlingRateInByte(-1L).ratePeriodInSecond(360).build());
        PulsarClient build = PulsarClient.builder().serviceUrl(this.url1.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        try {
            Producer create = build.newProducer().topic(str2).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
            PersistentTopic persistentTopic = (PersistentTopic) this.pulsar1.getBrokerService().getOrCreateTopic(str2).get();
            boolean z = false;
            int i = 0;
            while (true) {
                if (i >= 5) {
                    break;
                }
                if (((Replicator) persistentTopic.getReplicators().values().get(0)).getRateLimiter().isPresent()) {
                    z = true;
                    break;
                } else {
                    if (i != 5 - 1) {
                        Thread.sleep(100L);
                    }
                    i++;
                }
            }
            Assert.assertTrue(z);
            Assert.assertEquals(((DispatchRateLimiter) ((Replicator) persistentTopic.getReplicators().values().get(0)).getRateLimiter().get()).getDispatchRateOnMsg(), 100L);
            build = PulsarClient.builder().serviceUrl(this.url2.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
            try {
                AtomicInteger atomicInteger = new AtomicInteger(0);
                Consumer subscribe = build.newConsumer().topic(new String[]{str2}).subscriptionName("sub2-in-cluster2").messageListener((consumer, message) -> {
                    Assert.assertNotNull(message, "Message cannot be null");
                    log.debug("Received message [{}] in the listener", new String(message.getData()));
                    atomicInteger.incrementAndGet();
                }).subscribe();
                for (int i2 = 0; i2 < 50; i2++) {
                    create.send(new byte[80]);
                }
                Thread.sleep(1000L);
                log.info("Received message number: [{}]", Integer.valueOf(atomicInteger.get()));
                Assert.assertEquals(atomicInteger.get(), 50);
                for (int i3 = 0; i3 < 200; i3++) {
                    create.send(new byte[80]);
                }
                Thread.sleep(1000L);
                log.info("Received message number: [{}]", Integer.valueOf(atomicInteger.get()));
                Assert.assertEquals(atomicInteger.get(), 100);
                subscribe.close();
                create.close();
                if (Collections.singletonList(build).get(0) != null) {
                    build.close();
                }
            } finally {
                if (Collections.singletonList(build).get(0) != null) {
                    build.close();
                }
            }
        } catch (Throwable th) {
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
            throw th;
        }
    }

    /* JADX WARN: Finally extract failed */
    @Test
    public void testResourceGroupReplicatorRateLimiter() throws Exception {
        String str = "pulsar/replicatormsg-" + System.currentTimeMillis();
        String str2 = "persistent://" + str + "/" + String.valueOf(UUID.randomUUID());
        this.admin1.namespaces().createNamespace(str);
        this.admin1.namespaces().setNamespaceReplicationClusters(str, Sets.newHashSet(new String[]{"r1", "r2"}));
        String uuid = UUID.randomUUID().toString();
        ResourceGroup resourceGroup = new ResourceGroup();
        resourceGroup.setReplicationDispatchRateInMsgs(Long.valueOf(100));
        this.admin1.resourcegroups().createResourceGroup(uuid, resourceGroup);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNotNull(this.admin1.resourcegroups().getResourceGroup(uuid));
        });
        this.admin1.namespaces().setNamespaceResourceGroup(str, uuid);
        PulsarClient build = PulsarClient.builder().serviceUrl(this.url1.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
        try {
            Producer create = build.newProducer().topic(str2).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
            try {
                build = PulsarClient.builder().serviceUrl(this.url2.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
                try {
                    AtomicInteger atomicInteger = new AtomicInteger(0);
                    Consumer subscribe = build.newConsumer().topic(new String[]{str2}).subscriptionName("sub2-in-cluster2").messageListener((consumer, message) -> {
                        Assert.assertNotNull(message, "Message cannot be null");
                        log.debug("Received message [{}] in the listener", new String(message.getData()));
                        atomicInteger.incrementAndGet();
                    }).subscribe();
                    for (int i = 0; i < 500; i++) {
                        try {
                            create.send(new byte[80]);
                        } catch (Throwable th) {
                            if (Collections.singletonList(subscribe).get(0) != null) {
                                subscribe.close();
                            }
                            throw th;
                        }
                    }
                    Assert.assertTrue(atomicInteger.get() < 100 * 2);
                    if (Collections.singletonList(subscribe).get(0) != null) {
                        subscribe.close();
                    }
                    if (Collections.singletonList(build).get(0) != null) {
                        build.close();
                    }
                    if (Collections.singletonList(create).get(0) != null) {
                        create.close();
                    }
                } finally {
                    if (Collections.singletonList(build).get(0) != null) {
                        build.close();
                    }
                }
            } catch (Throwable th2) {
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
                throw th2;
            }
        } finally {
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
        }
    }

    /* JADX WARN: Finally extract failed */
    @Test
    public void testReplicatorRateLimiterByBytes() throws Exception {
        String str = "pulsar/replicatormsg-" + System.currentTimeMillis();
        String str2 = "persistent://" + str + "/RateLimiterByBytes";
        this.admin1.namespaces().createNamespace(str);
        this.admin1.namespaces().setNamespaceReplicationClusters(str, Sets.newHashSet(new String[]{"r1", "r2"}));
        this.admin1.namespaces().setReplicatorDispatchRate(str, DispatchRate.builder().dispatchThrottlingRateInMsg(-1).dispatchThrottlingRateInByte(400L).ratePeriodInSecond(360).build());
        PulsarClient build = PulsarClient.builder().serviceUrl(this.url1.toString()).build();
        try {
            Producer create = build.newProducer().topic(str2).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
            try {
                PersistentTopic persistentTopic = (PersistentTopic) this.pulsar1.getBrokerService().getOrCreateTopic(str2).get();
                Awaitility.await().untilAsserted(() -> {
                    Assert.assertTrue(((Replicator) persistentTopic.getReplicators().values().get(0)).getRateLimiter().isPresent());
                });
                Assert.assertEquals(((DispatchRateLimiter) ((Replicator) persistentTopic.getReplicators().values().get(0)).getRateLimiter().get()).getDispatchRateOnByte(), 400L);
                build = PulsarClient.builder().serviceUrl(this.url2.toString()).build();
                try {
                    AtomicInteger atomicInteger = new AtomicInteger(0);
                    Consumer subscribe = build.newConsumer().topic(new String[]{str2}).subscriptionName("sub2-in-cluster2").messageListener((consumer, message) -> {
                        Assert.assertNotNull(message, "Message cannot be null");
                        log.debug("Received message [{}] in the listener", new String(message.getData()));
                        atomicInteger.incrementAndGet();
                    }).subscribe();
                    for (int i = 0; i < 20 * 100; i++) {
                        try {
                            create.send(new byte[100]);
                        } catch (Throwable th) {
                            if (Collections.singletonList(subscribe).get(0) != null) {
                                subscribe.close();
                            }
                            throw th;
                        }
                    }
                    Awaitility.await().pollDelay(5L, TimeUnit.SECONDS).untilAsserted(() -> {
                        Assertions.assertThat(atomicInteger.get()).isLessThan(6);
                    });
                    if (Collections.singletonList(subscribe).get(0) != null) {
                        subscribe.close();
                    }
                    if (Collections.singletonList(build).get(0) != null) {
                        build.close();
                    }
                    if (Collections.singletonList(create).get(0) != null) {
                        create.close();
                    }
                } finally {
                    if (Collections.singletonList(build).get(0) != null) {
                        build.close();
                    }
                }
            } catch (Throwable th2) {
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
                throw th2;
            }
        } finally {
            if (Collections.singletonList(build).get(0) != null) {
                build.close();
            }
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 789143561:
                if (implMethodName.equals("lambda$testReplicatorRateLimiterByBytes$9cf5480e$1")) {
                    z = false;
                    break;
                }
                break;
            case 800424074:
                if (implMethodName.equals("lambda$testResourceGroupReplicatorRateLimiter$7c7b6d72$1")) {
                    z = 3;
                    break;
                }
                break;
            case 1071577314:
                if (implMethodName.equals("lambda$testReplicatorRateLimiterMessageNotReceivedAllMessages$26ef232c$1")) {
                    z = 2;
                    break;
                }
                break;
            case 1747212594:
                if (implMethodName.equals("lambda$testReplicatorRateLimiterMessageReceivedAllMessages$7c7b6d72$1")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case SHARED_VALUE:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/pulsar/client/api/MessageListener") && serializedLambda.getFunctionalInterfaceMethodName().equals("received") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V") && serializedLambda.getImplClass().equals("org/apache/pulsar/broker/service/ReplicatorRateLimiterTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/concurrent/atomic/AtomicInteger;Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V")) {
                    AtomicInteger atomicInteger = (AtomicInteger) serializedLambda.getCapturedArg(0);
                    return (consumer, message) -> {
                        Assert.assertNotNull(message, "Message cannot be null");
                        log.debug("Received message [{}] in the listener", new String(message.getData()));
                        atomicInteger.incrementAndGet();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/pulsar/client/api/MessageListener") && serializedLambda.getFunctionalInterfaceMethodName().equals("received") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V") && serializedLambda.getImplClass().equals("org/apache/pulsar/broker/service/ReplicatorRateLimiterTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/concurrent/atomic/AtomicInteger;Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V")) {
                    AtomicInteger atomicInteger2 = (AtomicInteger) serializedLambda.getCapturedArg(0);
                    return (consumer2, message2) -> {
                        Assert.assertNotNull(message2, "Message cannot be null");
                        log.debug("Received message [{}] in the listener", new String(message2.getData()));
                        atomicInteger2.incrementAndGet();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/pulsar/client/api/MessageListener") && serializedLambda.getFunctionalInterfaceMethodName().equals("received") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V") && serializedLambda.getImplClass().equals("org/apache/pulsar/broker/service/ReplicatorRateLimiterTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/concurrent/atomic/AtomicInteger;Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V")) {
                    AtomicInteger atomicInteger3 = (AtomicInteger) serializedLambda.getCapturedArg(0);
                    return (consumer3, message3) -> {
                        Assert.assertNotNull(message3, "Message cannot be null");
                        log.debug("Received message [{}] in the listener", new String(message3.getData()));
                        atomicInteger3.incrementAndGet();
                    };
                }
                break;
            case Test.TestMessage.INTFIELD_FIELD_NUMBER /* 3 */:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/pulsar/client/api/MessageListener") && serializedLambda.getFunctionalInterfaceMethodName().equals("received") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V") && serializedLambda.getImplClass().equals("org/apache/pulsar/broker/service/ReplicatorRateLimiterTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/concurrent/atomic/AtomicInteger;Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V")) {
                    AtomicInteger atomicInteger4 = (AtomicInteger) serializedLambda.getCapturedArg(0);
                    return (consumer4, message4) -> {
                        Assert.assertNotNull(message4, "Message cannot be null");
                        log.debug("Received message [{}] in the listener", new String(message4.getData()));
                        atomicInteger4.incrementAndGet();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
