package org.apache.pulsar.client.impl;

import com.google.common.collect.Sets;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.api.raw.MessageParser;
import org.apache.pulsar.common.api.raw.RawMessage;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pulsar/client/impl/MessageParserTest.class */
public class MessageParserTest extends MockedPulsarServiceBaseTest {
    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeClass
    public void setup() throws Exception {
        super.internalSetup();
        this.admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(this.pulsar.getWebServiceAddress()).build());
        this.admin.tenants().createTenant("my-tenant", new TenantInfoImpl(Sets.newHashSet(new String[]{"appid1", "appid2"}), Sets.newHashSet(new String[]{"test"})));
        this.admin.namespaces().createNamespace("my-tenant/my-ns", Sets.newHashSet(new String[]{"test"}));
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterClass
    public void cleanup() throws Exception {
        super.internalCleanup();
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "batchingAndCompression")
    public static Object[][] batchingAndCompression() {
        return new Object[]{new Object[]{true, CompressionType.ZLIB}, new Object[]{true, CompressionType.ZSTD}, new Object[]{true, CompressionType.SNAPPY}, new Object[]{true, CompressionType.LZ4}, new Object[]{true, CompressionType.NONE}, new Object[]{false, CompressionType.ZLIB}, new Object[]{false, CompressionType.ZSTD}, new Object[]{false, CompressionType.SNAPPY}, new Object[]{false, CompressionType.LZ4}, new Object[]{false, CompressionType.NONE}};
    }

    @Test(dataProvider = "batchingAndCompression")
    public void testParseMessages(boolean z, CompressionType compressionType) throws Exception {
        String str = "persistent://my-tenant/my-ns/message-parse-test-" + z + "-" + String.valueOf(compressionType);
        TopicName topicName = TopicName.get(str);
        Producer create = this.pulsarClient.newProducer(Schema.STRING).compressionType(compressionType).enableBatching(z).batchingMaxPublishDelay(10L, TimeUnit.SECONDS).topic(str).create();
        try {
            ManagedCursor newNonDurableCursor = ((PersistentTopic) this.pulsar.getBrokerService().getTopicReference(str).get()).getManagedLedger().newNonDurableCursor(PositionImpl.EARLIEST);
            if (z) {
                for (int i = 0; i < 9; i++) {
                    create.sendAsync("hello-" + i);
                }
                create.send("hello-9");
            } else {
                for (int i2 = 0; i2 < 10; i2++) {
                    create.send("Pulsar-" + i2);
                }
            }
            if (z) {
                Entry entry = (Entry) newNonDurableCursor.readEntriesOrWait(1).get(0);
                ArrayList arrayList = new ArrayList();
                ByteBuf dataBuffer = entry.getDataBuffer();
                try {
                    long ledgerId = entry.getLedgerId();
                    long entryId = entry.getEntryId();
                    Objects.requireNonNull(arrayList);
                    MessageParser.parseMessage(topicName, ledgerId, entryId, dataBuffer, (v1) -> {
                        r4.add(v1);
                    }, 5242880);
                    entry.release();
                    Assert.assertEquals(arrayList.size(), 10);
                    for (int i3 = 0; i3 < 10; i3++) {
                        Assert.assertEquals(((RawMessage) arrayList.get(i3)).getData(), Unpooled.wrappedBuffer(("hello-" + i3).getBytes()));
                    }
                    arrayList.forEach(rawMessage -> {
                        rawMessage.getSchemaVersion();
                        rawMessage.release();
                    });
                    Awaitility.await().atMost(3L, TimeUnit.SECONDS).untilAsserted(() -> {
                        Assert.assertEquals(dataBuffer.refCnt(), 0);
                    });
                } catch (Throwable th) {
                    entry.release();
                    throw th;
                }
            } else {
                Assert.assertEquals(newNonDurableCursor.getNumberOfEntriesInBacklog(false), 10L);
                List<Entry> readEntriesOrWait = newNonDurableCursor.readEntriesOrWait(10);
                Assert.assertEquals(readEntriesOrWait.size(), 10);
                ArrayList<ByteBuf> arrayList2 = new ArrayList();
                ArrayList arrayList3 = new ArrayList();
                for (Entry entry2 : readEntriesOrWait) {
                    arrayList2.add(entry2.getDataBuffer());
                    long ledgerId2 = entry2.getLedgerId();
                    long entryId2 = entry2.getEntryId();
                    ByteBuf dataBuffer2 = entry2.getDataBuffer();
                    Objects.requireNonNull(arrayList3);
                    MessageParser.parseMessage(topicName, ledgerId2, entryId2, dataBuffer2, (v1) -> {
                        r4.add(v1);
                    }, 5242880);
                    entry2.release();
                }
                Assert.assertEquals(arrayList3.size(), 10);
                arrayList3.forEach(rawMessage2 -> {
                    rawMessage2.getSchemaVersion();
                    rawMessage2.release();
                });
                for (ByteBuf byteBuf : arrayList2) {
                    Awaitility.await().atMost(3L, TimeUnit.SECONDS).untilAsserted(() -> {
                        Assert.assertEquals(byteBuf.refCnt(), 0);
                    });
                }
            }
        } finally {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        }
    }
}
