package org.apache.pulsar.compaction;

import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.google.common.collect.Sets;
import io.netty.buffer.Unpooled;
import io.netty.channel.EventLoopGroup;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.IntStream;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.RawMessageImpl;
import org.apache.pulsar.client.impl.ReaderImpl;
import org.apache.pulsar.common.api.proto.MessageIdData;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = {"broker-compaction"})
/* loaded from: input_file:org/apache/pulsar/compaction/CompactedTopicTest.class */
public class CompactedTopicTest extends MockedPulsarServiceBaseTest {
    private final Random r = new Random(0);

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "batchEnabledProvider")
    public Object[][] batchEnabledProvider() {
        return new Object[]{new Object[]{Boolean.FALSE}, new Object[]{Boolean.TRUE}};
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod
    public void setup() throws Exception {
        super.internalSetup();
        this.admin.clusters().createCluster("use", ClusterData.builder().serviceUrl(this.pulsar.getWebServiceAddress()).build());
        this.admin.tenants().createTenant("my-property", new TenantInfoImpl(Sets.newHashSet(new String[]{"appid1", "appid2"}), Sets.newHashSet(new String[]{"use"})));
        this.admin.namespaces().createNamespace("my-property/use/my-ns");
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod(alwaysRun = true)
    public void cleanup() throws Exception {
        super.internalCleanup();
    }

    private Triple<Long, List<Pair<MessageIdData, Long>>, List<Pair<MessageIdData, Long>>> buildCompactedLedger(BookKeeper bookKeeper, int i) throws Exception {
        LedgerHandle createLedger = bookKeeper.createLedger(1, 1, Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE, Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD);
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        AtomicLong atomicLong = new AtomicLong(10L);
        AtomicLong atomicLong2 = new AtomicLong(0L);
        CompletableFuture.allOf((CompletableFuture[]) IntStream.range(0, i).mapToObj(i2 -> {
            ArrayList arrayList3 = new ArrayList();
            if (this.r.nextInt(10) == 1) {
                long nextInt = this.r.nextInt(10) + 1;
                arrayList3.add(new MessageIdData().setLedgerId(atomicLong.get()).setEntryId(atomicLong2.get() + 1));
                atomicLong.addAndGet(nextInt);
                atomicLong2.set(0L);
            }
            long nextInt2 = this.r.nextInt(5);
            if (nextInt2 != 0) {
                arrayList3.add(new MessageIdData().setLedgerId(atomicLong.get()).setEntryId(atomicLong2.get() + 1));
            }
            MessageIdData entryId = new MessageIdData().setLedgerId(atomicLong.get()).setEntryId(atomicLong2.addAndGet(nextInt2 + 1));
            RawMessageImpl rawMessageImpl = new RawMessageImpl(entryId, Unpooled.EMPTY_BUFFER);
            try {
                CompletableFuture completableFuture = new CompletableFuture();
                createLedger.asyncAddEntry(rawMessageImpl.serialize(), (i2, ledgerHandle, j, obj) -> {
                    if (i2 != 0) {
                        completableFuture.completeExceptionally(BKException.create(i2));
                        return;
                    }
                    arrayList.add(Pair.of(entryId, Long.valueOf(j)));
                    arrayList3.forEach(messageIdData -> {
                        arrayList2.add(Pair.of(messageIdData, Long.valueOf(j)));
                    });
                    completableFuture.complete(null);
                }, (Object) null);
                if (Collections.singletonList(rawMessageImpl).get(0) != null) {
                    rawMessageImpl.close();
                }
                return completableFuture;
            } catch (Throwable th) {
                if (Collections.singletonList(rawMessageImpl).get(0) != null) {
                    rawMessageImpl.close();
                }
                throw th;
            }
        }).toArray(i3 -> {
            return new CompletableFuture[i3];
        })).get();
        createLedger.close();
        return Triple.of(Long.valueOf(createLedger.getId()), arrayList, arrayList2);
    }

    @Test
    public void testEntryLookup() throws Exception {
        BookKeeper bookKeeper = (BookKeeper) this.pulsar.getBookKeeperClientFactory().create(this.conf, (MetadataStoreExtended) null, (EventLoopGroup) null, Optional.empty(), (Map) null).get();
        try {
            Triple<Long, List<Pair<MessageIdData, Long>>, List<Pair<MessageIdData, Long>>> buildCompactedLedger = buildCompactedLedger(bookKeeper, 500);
            List<Pair> list = (List) buildCompactedLedger.getMiddle();
            List<Pair> list2 = (List) buildCompactedLedger.getRight();
            LedgerHandle openLedger = bookKeeper.openLedger(((Long) buildCompactedLedger.getLeft()).longValue(), Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE, Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD);
            long lastAddConfirmed = openLedger.getLastAddConfirmed();
            AsyncLoadingCache createCache = CompactedTopicImpl.createCache(openLedger, 50L);
            MessageIdData messageIdData = (MessageIdData) ((Pair) list.get(0)).getLeft();
            Pair pair = (Pair) list.get(list.size() - 1);
            Assert.assertEquals((Long) CompactedTopicImpl.findStartPoint(new PositionImpl(0L, 0L), lastAddConfirmed, createCache).get(), 0L);
            Assert.assertEquals((Long) CompactedTopicImpl.findStartPoint(new PositionImpl(Long.MAX_VALUE, 0L), lastAddConfirmed, createCache).get(), -4276948922L);
            Assert.assertEquals((Long) CompactedTopicImpl.findStartPoint(new PositionImpl(messageIdData.getLedgerId(), 0L), lastAddConfirmed, createCache).get(), 0L);
            Assert.assertEquals((Long) CompactedTopicImpl.findStartPoint(new PositionImpl(((MessageIdData) pair.getLeft()).getLedgerId(), ((MessageIdData) pair.getLeft()).getEntryId() + 1), lastAddConfirmed, createCache).get(), -4276948922L);
            Collections.shuffle(list, this.r);
            Collections.shuffle(list2, this.r);
            for (Pair pair2 : list) {
                Assert.assertEquals((Long) CompactedTopicImpl.findStartPoint(new PositionImpl(((MessageIdData) pair2.getLeft()).getLedgerId(), ((MessageIdData) pair2.getLeft()).getEntryId()), lastAddConfirmed, createCache).get(), (Long) pair2.getRight());
            }
            for (Pair pair3 : list2) {
                Assert.assertEquals((Long) CompactedTopicImpl.findStartPoint(new PositionImpl(((MessageIdData) pair3.getLeft()).getLedgerId(), ((MessageIdData) pair3.getLeft()).getEntryId()), lastAddConfirmed, createCache).get(), (Long) pair3.getRight());
            }
        } finally {
            if (Collections.singletonList(bookKeeper).get(0) != null) {
                bookKeeper.close();
            }
        }
    }

    @Test
    public void testCleanupOldCompactedTopicLedger() throws Exception {
        BookKeeper bookKeeper = (BookKeeper) this.pulsar.getBookKeeperClientFactory().create(this.conf, (MetadataStoreExtended) null, (EventLoopGroup) null, Optional.empty(), (Map) null).get();
        try {
            LedgerHandle createLedger = bookKeeper.createLedger(1, 1, Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE, Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD);
            createLedger.close();
            LedgerHandle createLedger2 = bookKeeper.createLedger(1, 1, Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE, Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD);
            createLedger2.close();
            CompactedTopicImpl compactedTopicImpl = new CompactedTopicImpl(bookKeeper);
            compactedTopicImpl.newCompactedLedger(new PositionImpl(1L, 2L), createLedger.getId()).get();
            bookKeeper.openLedger(createLedger.getId(), Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE, Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD).close();
            bookKeeper.openLedger(createLedger2.getId(), Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE, Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD).close();
            PositionImpl positionImpl = new PositionImpl(1L, 3L);
            compactedTopicImpl.newCompactedLedger(positionImpl, createLedger2.getId()).get();
            bookKeeper.openLedger(createLedger.getId(), Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE, Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD).close();
            Assert.assertTrue(compactedTopicImpl.getCompactedTopicContext().isPresent());
            Assert.assertEquals(((CompactedTopicContext) compactedTopicImpl.getCompactedTopicContext().get()).getLedger().getId(), createLedger2.getId());
            Assert.assertTrue(compactedTopicImpl.getCompactionHorizon().isPresent());
            Assert.assertEquals(compactedTopicImpl.getCompactionHorizon().get(), positionImpl);
            compactedTopicImpl.deleteCompactedLedger(createLedger.getId()).join();
            try {
                bookKeeper.openLedger(createLedger.getId(), Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE, Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD).close();
                Assert.fail("Should have failed to open old ledger");
            } catch (BKException.BKNoSuchLedgerExistsException | BKException.BKNoSuchLedgerExistsOnMetadataServerException e) {
            }
            bookKeeper.openLedger(createLedger2.getId(), Compactor.COMPACTED_TOPIC_LEDGER_DIGEST_TYPE, Compactor.COMPACTED_TOPIC_LEDGER_PASSWORD).close();
            if (Collections.singletonList(bookKeeper).get(0) != null) {
                bookKeeper.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(bookKeeper).get(0) != null) {
                bookKeeper.close();
            }
            throw th;
        }
    }

    @Test(dataProvider = "batchEnabledProvider")
    public void testCompactWithEmptyMessage(boolean z) throws Exception {
        byte[] bytes = "".getBytes();
        String str = "persistent://my-property/use/my-ns/testCompactWithEmptyMessage-" + String.valueOf(UUID.randomUUID());
        this.admin.topics().createPartitionedTopic(str, 1);
        ProducerBuilder producerBuilder = this.pulsarClient.newProducer().topic(str);
        if (z) {
            producerBuilder.batchingMaxMessages(5);
        } else {
            producerBuilder.enableBatching(false);
        }
        Producer create = producerBuilder.create();
        ArrayList arrayList = new ArrayList(10);
        for (int i = 0; i < 10; i++) {
            arrayList.add(create.newMessage().keyBytes("1".getBytes(Charset.defaultCharset())).value(bytes).sendAsync());
        }
        FutureUtil.waitForAll(arrayList).get();
        this.admin.topics().triggerCompaction(str);
        Assert.assertTrue(retryStrategically(r6 -> {
            try {
                return LongRunningProcessStatus.Status.SUCCESS.equals(this.admin.topics().compactionStatus(str).status);
            } catch (PulsarAdminException e) {
                return false;
            }
        }, 10, 200L));
        arrayList.clear();
        for (int i2 = 0; i2 < 10; i2++) {
            arrayList.add(create.newMessage().key("1").value(bytes).sendAsync());
        }
        FutureUtil.waitForAll(arrayList).get();
        this.admin.topics().triggerCompaction(str);
        Assert.assertTrue(retryStrategically(r62 -> {
            try {
                return LongRunningProcessStatus.Status.SUCCESS.equals(this.admin.topics().compactionStatus(str).status);
            } catch (PulsarAdminException e) {
                return false;
            }
        }, 10, 200L));
        create.close();
    }

    @Test(timeOut = 30000)
    public void testReadMessageFromCompactedLedger() throws Exception {
        String str = "persistent://my-property/use/my-ns/testCompactWithEmptyMessage-" + String.valueOf(UUID.randomUUID());
        this.admin.topics().createPartitionedTopic(str, 1);
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic(str).enableBatching(false).create();
        for (int i = 0; i < 10; i++) {
            create.newMessage().key("1").value("test compaction msg").send();
        }
        this.admin.topics().triggerCompaction(str);
        Assert.assertTrue(retryStrategically(r6 -> {
            try {
                return LongRunningProcessStatus.Status.SUCCESS.equals(this.admin.topics().compactionStatus(str).status);
            } catch (PulsarAdminException e) {
                return false;
            }
        }, 10, 200L));
        for (int i2 = 0; i2 < 10; i2++) {
            create.newMessage().key("2").value("test compaction msg v2").send();
        }
        Reader create2 = this.pulsarClient.newReader(Schema.STRING).topic(str).subscriptionName("test").readCompacted(true).startMessageId(MessageId.earliest).create();
        int i3 = 0;
        int i4 = 0;
        while (create2.hasMessageAvailable()) {
            Message readNext = create2.readNext();
            if ("1".equals(readNext.getKey()) && "test compaction msg".equals(readNext.getValue())) {
                i3++;
            } else if ("2".equals(readNext.getKey()) && "test compaction msg v2".equals(readNext.getValue())) {
                i4++;
            }
        }
        Assert.assertEquals(i3, 1);
        Assert.assertEquals(i4, 10);
    }

    @Test
    public void testLastMessageIdForCompactedLedger() throws Exception {
        String str = "persistent://my-property/use/my-ns/testLastMessageIdForCompactedLedger-" + String.valueOf(UUID.randomUUID());
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic(str).enableBatching(false).create();
        for (int i = 0; i < 10; i++) {
            create.newMessage().key("1").value("test compaction msg").send();
        }
        this.admin.topics().triggerCompaction(str);
        Assert.assertTrue(retryStrategically(r6 -> {
            try {
                return LongRunningProcessStatus.Status.SUCCESS.equals(this.admin.topics().compactionStatus(str).status);
            } catch (PulsarAdminException e) {
                return false;
            }
        }, 10, 200L));
        PersistentTopicInternalStats internalStats = this.admin.topics().getInternalStats(str);
        this.admin.topics().unload(str);
        Assert.assertTrue(internalStats.currentLedgerSize != this.admin.topics().getInternalStats(str).currentLedgerSize);
        Optional optional = (Optional) this.pulsar.getBrokerService().getTopicIfExists(str).get();
        Assert.assertTrue(optional.isPresent());
        ManagedLedgerImpl managedLedger = ((PersistentTopic) optional.get()).getManagedLedger();
        managedLedger.maybeUpdateCursorBeforeTrimmingConsumedLedger();
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(managedLedger.getCurrentLedgerEntries(), 0L);
            Assert.assertTrue(managedLedger.getLastConfirmedEntry().getEntryId() != -1);
            Assert.assertEquals(managedLedger.getLedgersInfoAsList().size(), 1);
        });
        ReaderImpl create2 = this.pulsarClient.newReader(Schema.STRING).topic(str).subscriptionName("test").readCompacted(true).startMessageId(MessageId.earliest).create();
        Assert.assertTrue(create2.hasMessageAvailable());
        Message readNext = create2.readNext();
        Assert.assertEquals("test compaction msg", (String) readNext.getValue());
        Assert.assertEquals(create2.getConsumer().getLastMessageId(), readNext.getMessageId());
        Assert.assertFalse(create2.hasMessageAvailable());
        create2.close();
        this.admin.topics().unload(str);
        PersistentTopicInternalStats internalStats2 = this.admin.topics().getInternalStats(str);
        Assert.assertTrue(internalStats2.lastConfirmedEntry.endsWith(":-1"));
        Assert.assertTrue(internalStats2.compactedLedger.ledgerId > 0);
        Reader create3 = this.pulsarClient.newReader(Schema.STRING).topic(str).subscriptionName("test").readCompacted(true).startMessageId(MessageId.earliest).create();
        Assert.assertTrue(create3.hasMessageAvailable());
        create3.readNext();
        Assert.assertFalse(create3.hasMessageAvailable());
    }

    @Test
    public void testDoNotLossTheLastCompactedLedgerData() throws Exception {
        String str = "persistent://my-property/use/my-ns/testDoNotLossTheLastCompactedLedgerData-" + String.valueOf(UUID.randomUUID());
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic(str).blockIfQueueFull(true).maxPendingMessages(2000).enableBatching(false).create();
        CompletableFuture completableFuture = null;
        for (int i = 0; i < 2000; i++) {
            completableFuture = create.newMessage().key((i % 200)).value("Test").sendAsync();
        }
        create.flush();
        completableFuture.join();
        this.admin.topics().triggerCompaction(str);
        Awaitility.await().untilAsserted(() -> {
            PersistentTopicInternalStats internalStats = this.admin.topics().getInternalStats(str);
            Assert.assertNotEquals(Long.valueOf(internalStats.compactedLedger.ledgerId), -1);
            Assert.assertEquals(internalStats.compactedLedger.entries, 200L);
            Assert.assertEquals(((SubscriptionStats) this.admin.topics().getStats(str).getSubscriptions().get("__compaction")).getConsumers().size(), 0);
        });
        this.admin.topics().unload(str);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin.topics().getInternalStats(str).ledgers.size(), 1);
            Assert.assertEquals(((SubscriptionStats) this.admin.topics().getStats(str).getSubscriptions().get("__compaction")).getConsumers().size(), 0);
        });
        this.admin.topics().unload(str);
        create.newMessage().key("200").value("Test").send();
        this.admin.topics().triggerCompaction(str);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin.topics().getInternalStats(str).compactedLedger.entries, 201L);
        });
        Reader create2 = this.pulsarClient.newReader(Schema.STRING).topic(str).startMessageId(MessageId.earliest).readCompacted(true).create();
        int i2 = 0;
        while (create2.hasMessageAvailable()) {
            create2.readNext();
            i2++;
        }
        Assert.assertEquals(i2, 201);
        create2.close();
        create.close();
    }

    @Test
    public void testReadCompactedDataWhenLedgerRolloverKickIn() throws Exception {
        String str = "persistent://my-property/use/my-ns/testReadCompactedDataWhenLedgerRolloverKickIn-" + String.valueOf(UUID.randomUUID());
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic(str).blockIfQueueFull(true).maxPendingMessages(2000).enableBatching(false).create();
        CompletableFuture completableFuture = null;
        for (int i = 0; i < 2000; i++) {
            completableFuture = create.newMessage().key((i % 200)).value("Test").sendAsync();
        }
        create.flush();
        completableFuture.join();
        this.admin.topics().triggerCompaction(str);
        Awaitility.await().untilAsserted(() -> {
            PersistentTopicInternalStats internalStats = this.admin.topics().getInternalStats(str);
            Assert.assertNotEquals(Long.valueOf(internalStats.compactedLedger.ledgerId), -1);
            Assert.assertEquals(internalStats.compactedLedger.entries, 200L);
            Assert.assertEquals(((SubscriptionStats) this.admin.topics().getStats(str).getSubscriptions().get("__compaction")).getConsumers().size(), 0);
        });
        for (int i2 = 0; i2 < 2000; i2++) {
            completableFuture = create.newMessage().key(((i2 % 200) + 200)).value("Test").sendAsync();
        }
        create.flush();
        completableFuture.join();
        this.admin.topics().unload(str);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin.topics().getInternalStats(str).ledgers.size(), 2);
        });
        Reader create2 = this.pulsarClient.newReader(Schema.STRING).topic(str).startMessageId(MessageId.earliest).readCompacted(true).receiverQueueSize(10).create();
        for (int i3 = 0; i3 < 2000; i3++) {
            completableFuture = create.newMessage().key(((i3 % 200) + 400)).value("Test").sendAsync();
        }
        create.flush();
        completableFuture.join();
        this.admin.topics().triggerCompaction(str);
        Awaitility.await().untilAsserted(() -> {
            PersistentTopicInternalStats internalStats = this.admin.topics().getInternalStats(str);
            Assert.assertNotEquals(Long.valueOf(internalStats.compactedLedger.ledgerId), -1);
            Assert.assertEquals(internalStats.compactedLedger.entries, 600L);
            Assert.assertEquals(((SubscriptionStats) this.admin.topics().getStats(str).getSubscriptions().get("__compaction")).getConsumers().size(), 0);
        });
        int i4 = 0;
        while (create2.hasMessageAvailable()) {
            create2.readNext();
            i4++;
        }
        Assert.assertEquals(i4, 600);
        create2.close();
        create.close();
    }

    @Test(timeOut = 120000)
    public void testCompactionWithTopicUnloading() throws Exception {
        String str = "persistent://my-property/use/my-ns/testCompactionWithTopicUnloading-" + String.valueOf(UUID.randomUUID());
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic(str).blockIfQueueFull(true).maxPendingMessages(2000).enableBatching(false).create();
        CompletableFuture completableFuture = null;
        for (int i = 0; i < 2000; i++) {
            completableFuture = create.newMessage().key((i % 500)).value("Test").sendAsync();
        }
        create.flush();
        completableFuture.join();
        this.admin.topics().triggerCompaction(str);
        Awaitility.await().pollInterval(5L, TimeUnit.SECONDS).untilAsserted(() -> {
            PersistentTopicInternalStats internalStats = this.admin.topics().getInternalStats(str);
            Assert.assertNotEquals(Long.valueOf(internalStats.compactedLedger.ledgerId), -1);
            Assert.assertEquals(internalStats.compactedLedger.entries, 500L);
            Assert.assertEquals(((SubscriptionStats) this.admin.topics().getStats(str).getSubscriptions().get("__compaction")).getConsumers().size(), 0);
        });
        this.admin.topics().unload(str);
        for (int i2 = 0; i2 < 2000; i2++) {
            completableFuture = create.newMessage().key(((i2 % 500) + 500)).value("Test").sendAsync();
        }
        create.flush();
        completableFuture.join();
        this.admin.topics().triggerCompaction(str);
        Thread.sleep(100L);
        this.admin.topics().unload(str);
        this.admin.topics().triggerCompaction(str);
        Awaitility.await().pollInterval(3L, TimeUnit.SECONDS).atMost(30L, TimeUnit.SECONDS).untilAsserted(() -> {
            PersistentTopicInternalStats internalStats = this.admin.topics().getInternalStats(str);
            Assert.assertNotEquals(Long.valueOf(internalStats.compactedLedger.ledgerId), -1);
            Assert.assertEquals(internalStats.compactedLedger.entries, 1000L);
            Assert.assertEquals(((SubscriptionStats) this.admin.topics().getStats(str).getSubscriptions().get("__compaction")).getConsumers().size(), 0);
        });
        Reader create2 = this.pulsarClient.newReader(Schema.STRING).topic(str).startMessageId(MessageId.earliest).readCompacted(true).receiverQueueSize(10).create();
        int i3 = 0;
        while (create2.hasMessageAvailable()) {
            create2.readNext();
            i3++;
        }
        Assert.assertEquals(i3, 1000);
        create2.close();
        create.close();
    }

    @Test(timeOut = 30000)
    public void testReader() throws Exception {
        String str = "persistent://my-property/use/my-ns/t1";
        Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/t1").create();
        try {
            create.newMessage().key("k").value("value".getBytes()).send();
            create.newMessage().key("k").value((Object) null).send();
            this.pulsar.getCompactor().compact("persistent://my-property/use/my-ns/t1").get();
            Awaitility.await().pollInterval(3L, TimeUnit.SECONDS).atMost(30L, TimeUnit.SECONDS).untilAsserted(() -> {
                this.admin.topics().unload(str);
                Thread.sleep(100L);
                Assert.assertTrue(this.admin.topics().getInternalStats(str).lastConfirmedEntry.endsWith("-1"));
            });
            PersistentTopicInternalStats internalStats = this.admin.topics().getInternalStats("persistent://my-property/use/my-ns/t1");
            Assert.assertTrue(internalStats.lastConfirmedEntry.endsWith("-1"));
            Assert.assertEquals(internalStats.compactedLedger.size, 0L);
            Reader create2 = this.pulsarClient.newReader().topic("persistent://my-property/use/my-ns/t1").startMessageIdInclusive().startMessageId(MessageId.earliest).readCompacted(true).create();
            try {
                Assert.assertFalse(create2.hasMessageAvailable());
                if (Collections.singletonList(create2).get(0) != null) {
                    create2.close();
                }
            } catch (Throwable th) {
                if (Collections.singletonList(create2).get(0) != null) {
                    create2.close();
                }
                throw th;
            }
        } finally {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        }
    }

    @Test
    public void testHasMessageAvailableWithNullValueMessage() throws Exception {
        String str = "persistent://my-property/use/my-ns/testHasMessageAvailable-" + String.valueOf(UUID.randomUUID());
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic(str).blockIfQueueFull(true).enableBatching(false).create();
        CompletableFuture completableFuture = null;
        for (int i = 0; i < 10; i++) {
            try {
                completableFuture = create.newMessage().key(i).value(String.format("msg [%d]", Integer.valueOf(i))).sendAsync();
            } finally {
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
            }
        }
        for (int i2 = 5; i2 < 10; i2++) {
            completableFuture = create.newMessage().key(i2).value((Object) null).sendAsync();
        }
        create.flush();
        completableFuture.join();
        this.admin.topics().triggerCompaction(str);
        Awaitility.await().untilAsserted(() -> {
            PersistentTopicInternalStats internalStats = this.admin.topics().getInternalStats(str);
            Assert.assertNotEquals(Long.valueOf(internalStats.compactedLedger.ledgerId), -1);
            Assert.assertEquals(internalStats.compactedLedger.entries, 5L);
            Assert.assertEquals(((SubscriptionStats) this.admin.topics().getStats(str).getSubscriptions().get("__compaction")).getConsumers().size(), 0);
            Assert.assertEquals(internalStats.lastConfirmedEntry, ((ManagedLedgerInternalStats.CursorStats) internalStats.cursors.get("__compaction")).markDeletePosition);
        });
        Reader create2 = this.pulsarClient.newReader().topic(str).startMessageIdInclusive().startMessageId(MessageId.earliest).readCompacted(true).create();
        for (int i3 = 5; i3 < 10; i3++) {
            try {
                create2.readNext();
            } catch (Throwable th) {
                if (Collections.singletonList(create2).get(0) != null) {
                    create2.close();
                }
                throw th;
            }
        }
        Assert.assertFalse(create2.hasMessageAvailable());
        Assert.assertNull(create2.readNext(3, TimeUnit.SECONDS));
        if (Collections.singletonList(create2).get(0) != null) {
            create2.close();
        }
    }

    /* JADX WARN: Finally extract failed */
    @Test
    public void testReadCompleteMessagesDuringTopicUnloading() throws Exception {
        String str = "persistent://my-property/use/my-ns/testReadCompleteMessagesDuringTopicUnloading-" + String.valueOf(UUID.randomUUID());
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic(str).blockIfQueueFull(true).enableBatching(false).create();
        try {
            Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).subscriptionName("sub_" + String.valueOf(UUID.randomUUID())).topic(new String[]{str}).subscribe();
            try {
                Reader create2 = this.pulsarClient.newReader(Schema.STRING).topic(str).startMessageIdInclusive().startMessageId(MessageId.earliest).readCompacted(true).create();
                CompletableFuture completableFuture = null;
                for (int i = 0; i < 1000; i++) {
                    try {
                        completableFuture = create.newMessage().key(i).value(String.format("msg [%d]", Integer.valueOf(i))).sendAsync();
                    } catch (Throwable th) {
                        if (Collections.singletonList(create2).get(0) != null) {
                            create2.close();
                        }
                        throw th;
                    }
                }
                create.flush();
                completableFuture.join();
                this.admin.topics().triggerCompaction(str);
                Awaitility.await().untilAsserted(() -> {
                    PersistentTopicInternalStats internalStats = this.admin.topics().getInternalStats(str);
                    Assert.assertNotEquals(Long.valueOf(internalStats.compactedLedger.ledgerId), -1);
                    Assert.assertEquals(internalStats.compactedLedger.entries, 1000L);
                    Assert.assertEquals(((SubscriptionStats) this.admin.topics().getStats(str).getSubscriptions().get("__compaction")).getConsumers().size(), 0);
                    Assert.assertEquals(internalStats.lastConfirmedEntry, ((ManagedLedgerInternalStats.CursorStats) internalStats.cursors.get("__compaction")).markDeletePosition);
                });
                this.admin.topics().unload(str);
                for (int i2 = 0; i2 < 1000; i2++) {
                    completableFuture = create.newMessage().key((i2 + 1000)).value(String.format("msg [%d]", Integer.valueOf(i2 + 1000))).sendAsync();
                }
                create.flush();
                completableFuture.join();
                for (int i3 = 0; i3 < 500; i3++) {
                    Assert.assertEquals((String) create2.readNext().getValue(), String.format("msg [%d]", Integer.valueOf(i3)));
                }
                this.admin.topics().unload(str);
                for (int i4 = 0; i4 < 500; i4++) {
                    Assert.assertEquals((String) create2.readNext().getValue(), String.format("msg [%d]", Integer.valueOf(i4 + 500)));
                }
                this.admin.topics().unload(str);
                for (int i5 = 0; i5 < 1000; i5++) {
                    Assert.assertEquals((String) create2.readNext().getValue(), String.format("msg [%d]", Integer.valueOf(i5 + 1000)));
                }
                if (Collections.singletonList(create2).get(0) != null) {
                    create2.close();
                }
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
            } catch (Throwable th2) {
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
                throw th2;
            }
        } finally {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        }
    }

    @Test
    public void testReadCompactedLatestMessageWithInclusive() throws Exception {
        String str = "persistent://my-property/use/my-ns/testLedgerRollover-" + String.valueOf(UUID.randomUUID());
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic(str).blockIfQueueFull(true).enableBatching(false).create();
        CompletableFuture completableFuture = null;
        for (int i = 0; i < 1; i++) {
            try {
                completableFuture = create.newMessage().key(i).value(String.format("msg [%d]", Integer.valueOf(i))).sendAsync();
            } finally {
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
            }
        }
        create.flush();
        completableFuture.join();
        this.admin.topics().unload(str);
        this.admin.topics().triggerCompaction(str);
        Awaitility.await().untilAsserted(() -> {
            PersistentTopicInternalStats internalStats = this.admin.topics().getInternalStats(str);
            Assert.assertNotEquals(Long.valueOf(internalStats.compactedLedger.ledgerId), -1);
            Assert.assertEquals(internalStats.compactedLedger.entries, 1L);
            Assert.assertEquals(((SubscriptionStats) this.admin.topics().getStats(str).getSubscriptions().get("__compaction")).getConsumers().size(), 0);
            Assert.assertEquals(internalStats.lastConfirmedEntry, ((ManagedLedgerInternalStats.CursorStats) internalStats.cursors.get("__compaction")).markDeletePosition);
        });
        Awaitility.await().pollInterval(3L, TimeUnit.SECONDS).atMost(30L, TimeUnit.SECONDS).untilAsserted(() -> {
            this.admin.topics().unload(str);
            Assert.assertTrue(this.admin.topics().getInternalStats(str).lastConfirmedEntry.endsWith("-1"));
        });
        Reader create2 = this.pulsarClient.newReader().topic(str).startMessageIdInclusive().startMessageId(MessageId.latest).readCompacted(true).create();
        try {
            Assert.assertTrue(create2.hasMessageAvailable());
            Assert.assertEquals(create2.readNext().getMessageId(), completableFuture.get());
            if (Collections.singletonList(create2).get(0) != null) {
                create2.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(create2).get(0) != null) {
                create2.close();
            }
            throw th;
        }
    }
}
