package org.apache.pulsar.broker.resourcegroup;

import com.google.common.collect.Sets;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.resourcegroup.ResourceGroup;
import org.apache.pulsar.broker.resourcegroup.ResourceGroupService;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.resource.usage.ResourceUsage;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ResourceGroup;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Ignore;
import org.testng.annotations.Test;

@Ignore
/* loaded from: input_file:org/apache/pulsar/broker/resourcegroup/ResourceGroupUsageAggregationOnTopicLevelTest.class */
public class ResourceGroupUsageAggregationOnTopicLevelTest extends ProducerConsumerBase {
    private static final Logger log = LoggerFactory.getLogger(ResourceGroupUsageAggregationOnTopicLevelTest.class);
    private final String TenantName = "pulsar-test";
    private final String NsName = "test";
    private final String TenantAndNsName = "pulsar-test/test";
    private final String TestProduceConsumeTopicName = "/test/prod-cons-topic";
    private final String PRODUCE_CONSUME_PERSISTENT_TOPIC = "persistent://pulsar-test/test/test/prod-cons-topic";
    private final String PRODUCE_CONSUME_NON_PERSISTENT_TOPIC = "non-persistent://pulsar-test/test/test/prod-cons-topic";

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeClass
    protected void setup() throws Exception {
        super.internalSetup();
        this.conf.setAllowAutoTopicCreation(true);
        this.admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(this.brokerUrl.toString()).build());
        this.admin.tenants().createTenant("pulsar-test", new TenantInfoImpl(Sets.newHashSet(new String[]{"fakeAdminRole"}), Sets.newHashSet(new String[]{"test"})));
        this.admin.namespaces().createNamespace("pulsar-test/test");
        this.admin.namespaces().setNamespaceReplicationClusters("pulsar-test/test", Sets.newHashSet(new String[]{"test"}));
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterClass(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test(enabled = false)
    public void testPersistentTopicProduceConsumeUsageOnRG() throws Exception {
        testProduceConsumeUsageOnRG("persistent://pulsar-test/test/test/prod-cons-topic");
    }

    @Test(enabled = false)
    public void testNonPersistentTopicProduceConsumeUsageOnRG() throws Exception {
        testProduceConsumeUsageOnRG("non-persistent://pulsar-test/test/test/prod-cons-topic");
    }

    /* JADX WARN: Finally extract failed */
    private void testProduceConsumeUsageOnRG(String str) throws Exception {
        ResourceQuotaCalculator resourceQuotaCalculator = new ResourceQuotaCalculator() { // from class: org.apache.pulsar.broker.resourcegroup.ResourceGroupUsageAggregationOnTopicLevelTest.1
            public boolean needToReportLocalUsage(long j, long j2, long j3, long j4, long j5) {
                return false;
            }

            public long computeLocalQuota(long j, long j2, long[] jArr) {
                return 0L;
            }
        };
        ResourceUsageTopicTransportManager resourceUsageTopicTransportManager = new ResourceUsageTopicTransportManager(this.pulsar);
        try {
            final ResourceGroupService resourceGroupService = new ResourceGroupService(this.pulsar, TimeUnit.MILLISECONDS, resourceUsageTopicTransportManager, resourceQuotaCalculator);
            try {
                final String str2 = "runProduceConsume";
                ResourceUsagePublisher resourceUsagePublisher = new ResourceUsagePublisher() { // from class: org.apache.pulsar.broker.resourcegroup.ResourceGroupUsageAggregationOnTopicLevelTest.2
                    public String getID() {
                        return resourceGroupService.resourceGroupGet(str2).resourceGroupName;
                    }

                    public void fillResourceUsage(ResourceUsage resourceUsage) {
                        resourceGroupService.resourceGroupGet(str2).rgFillResourceUsage(resourceUsage);
                    }
                };
                ResourceUsageConsumer resourceUsageConsumer = new ResourceUsageConsumer() { // from class: org.apache.pulsar.broker.resourcegroup.ResourceGroupUsageAggregationOnTopicLevelTest.3
                    public String getID() {
                        return resourceGroupService.resourceGroupGet(str2).resourceGroupName;
                    }

                    public void acceptResourceUsage(String str3, ResourceUsage resourceUsage) {
                        resourceGroupService.resourceGroupGet(str2).rgResourceUsageListener(str3, resourceUsage);
                    }
                };
                ResourceGroup resourceGroup = new ResourceGroup();
                resourceGroup.setPublishRateInBytes(1500L);
                resourceGroup.setPublishRateInMsgs(100);
                resourceGroup.setDispatchRateInBytes(4000L);
                resourceGroup.setPublishRateInMsgs(500);
                resourceGroupService.resourceGroupCreate("runProduceConsume", resourceGroup, resourceUsagePublisher, resourceUsageConsumer);
                Assert.assertNotNull(resourceGroupService.resourceGroupGet("runProduceConsume"));
                Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{str}).subscriptionName("my-subscription").subscriptionType(SubscriptionType.Shared).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
                try {
                    Producer create = this.pulsarClient.newProducer().topic(str).create();
                    try {
                        TopicName topicName = TopicName.get(str);
                        resourceGroupService.unRegisterTopic(topicName);
                        resourceGroupService.registerTopic("runProduceConsume", topicName);
                        int i = 0;
                        int i2 = 0;
                        for (int i3 = 0; i3 < 10; i3++) {
                            byte[] bytes = String.format("Hi, ix=%s", Integer.valueOf(i3)).getBytes();
                            create.send(bytes);
                            i += bytes.length;
                            i2++;
                        }
                        verifyStats(resourceGroupService, str, "runProduceConsume", i, i2, 0, 0, true, false);
                        int i4 = 0;
                        int i5 = 0;
                        while (i5 < i2) {
                            i4 += ((byte[]) subscribe.receive().getValue()).length;
                            i5++;
                        }
                        verifyStats(resourceGroupService, str, "runProduceConsume", i, i2, i4, i5, true, true);
                        if (Collections.singletonList(create).get(0) != null) {
                            create.close();
                        }
                        if (Collections.singletonList(subscribe).get(0) != null) {
                            subscribe.close();
                        }
                        if (Collections.singletonList(resourceGroupService).get(0) != null) {
                            resourceGroupService.close();
                        }
                    } catch (Throwable th) {
                        if (Collections.singletonList(create).get(0) != null) {
                            create.close();
                        }
                        throw th;
                    }
                } catch (Throwable th2) {
                    if (Collections.singletonList(subscribe).get(0) != null) {
                        subscribe.close();
                    }
                    throw th2;
                }
            } catch (Throwable th3) {
                if (Collections.singletonList(resourceGroupService).get(0) != null) {
                    resourceGroupService.close();
                }
                throw th3;
            }
        } finally {
            if (Collections.singletonList(resourceUsageTopicTransportManager).get(0) != null) {
                resourceUsageTopicTransportManager.close();
            }
        }
    }

    private void verifyStats(ResourceGroupService resourceGroupService, String str, String str2, int i, int i2, int i3, int i4, boolean z, boolean z2) throws PulsarAdminException {
        BrokerService brokerService = this.pulsar.getBrokerService();
        Awaitility.await().untilAsserted(() -> {
            TopicStatsImpl topicStatsImpl = (TopicStatsImpl) brokerService.getTopicStats().get(str);
            Assert.assertNotNull(topicStatsImpl);
            if (z) {
                Assert.assertTrue(topicStatsImpl.bytesInCounter >= ((long) i));
                Assert.assertEquals(i2, topicStatsImpl.msgInCounter);
            }
            if (z2) {
                Assert.assertTrue(topicStatsImpl.bytesOutCounter >= ((long) i3));
                Assert.assertEquals(i4, topicStatsImpl.msgOutCounter);
            }
        });
        if (i2 > 0 || i4 > 0) {
            resourceGroupService.aggregateResourceGroupLocalUsages();
            ResourceGroup.BytesAndMessagesCount rGUsage = resourceGroupService.getRGUsage(str2, ResourceGroup.ResourceGroupMonitoringClass.Publish, ResourceGroupService.ResourceGroupUsageStatsType.Cumulative);
            ResourceGroup.BytesAndMessagesCount rGUsage2 = resourceGroupService.getRGUsage(str2, ResourceGroup.ResourceGroupMonitoringClass.Dispatch, ResourceGroupService.ResourceGroupUsageStatsType.Cumulative);
            if (z) {
                Assert.assertTrue(rGUsage.bytes >= ((long) i));
                Assert.assertEquals(i2, rGUsage.messages);
            }
            if (z2) {
                Assert.assertTrue(rGUsage2.bytes >= ((long) i3));
                Assert.assertEquals(i4, rGUsage2.messages);
            }
        }
    }
}
