package org.apache.pulsar.broker.intercept;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.OpAddEntry;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
import org.apache.pulsar.common.intercept.BrokerEntryMetadataInterceptor;
import org.apache.pulsar.common.intercept.BrokerEntryMetadataUtils;
import org.apache.pulsar.common.intercept.ManagedLedgerPayloadProcessor;
import org.apache.pulsar.common.protocol.Commands;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.Test;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImplTest.class */
public class ManagedLedgerInterceptorImplTest extends MockedBookKeeperTestCase {
    private static final Logger log = LoggerFactory.getLogger(ManagedLedgerInterceptorImplTest.class);

    /* loaded from: input_file:org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImplTest$IndexSearchPredicate.class */
    static class IndexSearchPredicate implements Predicate<Entry> {
        long indexToSearch;

        public IndexSearchPredicate(long j) {
            this.indexToSearch = -1L;
            this.indexToSearch = j;
        }

        @Override // java.util.function.Predicate
        public boolean test(Entry entry) {
            try {
                try {
                    boolean z = Commands.parseBrokerEntryMetadataIfExist(entry.getDataBuffer()).getIndex() < this.indexToSearch;
                    entry.release();
                    return z;
                } catch (Exception e) {
                    ManagedLedgerInterceptorImplTest.log.error("Error deserialize message for message position find", e);
                    entry.release();
                    return false;
                }
            } catch (Throwable th) {
                entry.release();
                throw th;
            }
        }
    }

    /* loaded from: input_file:org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImplTest$MockManagedLedgerInterceptorImpl.class */
    private class MockManagedLedgerInterceptorImpl extends ManagedLedgerInterceptorImpl {
        private final Set<BrokerEntryMetadataInterceptor> brokerEntryMetadataInterceptors;

        public MockManagedLedgerInterceptorImpl(Set<BrokerEntryMetadataInterceptor> set, Set<ManagedLedgerPayloadProcessor> set2) {
            super(set, set2);
            this.brokerEntryMetadataInterceptors = set;
        }

        public OpAddEntry beforeAddEntry(OpAddEntry opAddEntry, int i) {
            if (opAddEntry == null || i <= 0) {
                return opAddEntry;
            }
            opAddEntry.setData(Commands.addBrokerEntryMetadata(opAddEntry.getData(), this.brokerEntryMetadataInterceptors, i));
            if (opAddEntry != null) {
                throw new RuntimeException("throw exception before add entry for test");
            }
            return opAddEntry;
        }
    }

    /* loaded from: input_file:org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImplTest$TestPayloadProcessor.class */
    public static class TestPayloadProcessor implements ManagedLedgerPayloadProcessor {
        public ManagedLedgerPayloadProcessor.Processor inputProcessor() {
            return new ManagedLedgerPayloadProcessor.Processor() { // from class: org.apache.pulsar.broker.intercept.ManagedLedgerInterceptorImplTest.TestPayloadProcessor.1
                public ByteBuf process(Object obj, ByteBuf byteBuf) {
                    byte[] bytes = new String("Modified Test Message").getBytes();
                    ByteBuf wrappedBuffer = Unpooled.wrappedBuffer(bytes, 0, bytes.length);
                    byteBuf.release();
                    return wrappedBuffer.retainedDuplicate();
                }

                public void release(ByteBuf byteBuf) {
                    byteBuf.release();
                }
            };
        }

        public ManagedLedgerPayloadProcessor.Processor outputProcessor() {
            return new ManagedLedgerPayloadProcessor.Processor() { // from class: org.apache.pulsar.broker.intercept.ManagedLedgerInterceptorImplTest.TestPayloadProcessor.2
                public ByteBuf process(Object obj, ByteBuf byteBuf) {
                    byte[] bArr = new byte[byteBuf.readableBytes()];
                    byteBuf.readBytes(bArr);
                    Assert.assertTrue(new String(bArr).equals("Modified Test Message"));
                    byte[] bytes = new String("Test Message").getBytes();
                    byteBuf.release();
                    return Unpooled.wrappedBuffer(bytes, 0, bytes.length).retainedDuplicate();
                }

                public void release(ByteBuf byteBuf) {
                    byteBuf.release();
                }
            };
        }
    }

    @Test
    public void testAddBrokerEntryMetadata() throws Exception {
        ManagedLedgerInterceptorImpl managedLedgerInterceptorImpl = new ManagedLedgerInterceptorImpl(getBrokerEntryMetadataInterceptors(), (Set) null);
        ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
        managedLedgerConfig.setMaxEntriesPerLedger(2);
        managedLedgerConfig.setManagedLedgerInterceptor(managedLedgerInterceptorImpl);
        ManagedLedger open = this.factory.open("topicEntryMetadataSequenceId", managedLedgerConfig);
        ManagedCursorImpl openCursor = open.openCursor("topicEntryMetadataSequenceId");
        for (int i = 0; i < 10; i++) {
            open.addEntry(("message" + i).getBytes(), 2);
        }
        Assert.assertEquals(19L, open.getManagedLedgerInterceptor().getIndex());
        List readEntries = openCursor.readEntries(10);
        for (int i2 = 0; i2 < 10; i2++) {
            BrokerEntryMetadata parseBrokerEntryMetadataIfExist = Commands.parseBrokerEntryMetadataIfExist(((Entry) readEntries.get(i2)).getDataBuffer());
            Assert.assertNotNull(parseBrokerEntryMetadataIfExist);
            Assert.assertEquals(parseBrokerEntryMetadataIfExist.getIndex(), ((i2 + 1) * 2) - 1);
        }
        openCursor.close();
        open.close();
        this.factory.shutdown();
    }

    @Test
    public void testMessagePayloadProcessor() throws Exception {
        HashSet hashSet = new HashSet();
        hashSet.add(new TestPayloadProcessor());
        ManagedLedgerInterceptorImpl managedLedgerInterceptorImpl = new ManagedLedgerInterceptorImpl(new HashSet(), hashSet);
        ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
        managedLedgerConfig.setMaxEntriesPerLedger(2);
        managedLedgerConfig.setManagedLedgerInterceptor(managedLedgerInterceptorImpl);
        ManagedLedger open = this.factory.open("topicEntryWithPayloadProcessed", managedLedgerConfig);
        ManagedCursorImpl openCursor = open.openCursor("topicEntryWithPayloadProcessed");
        open.addEntry("Test Message".getBytes());
        this.factory.getEntryCacheManager().clear();
        Assert.assertTrue(new String(((Entry) openCursor.readEntries(1).get(0)).getData()).equals("Test Message"));
        openCursor.close();
        open.close();
        this.factory.shutdown();
        managedLedgerConfig.setManagedLedgerInterceptor((ManagedLedgerInterceptor) null);
    }

    @Test
    public void testTotalSizeCorrectIfHasInterceptor() throws Exception {
        ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
        HashSet hashSet = new HashSet();
        hashSet.add(new TestPayloadProcessor());
        managedLedgerConfig.setManagedLedgerInterceptor(new ManagedLedgerInterceptorImpl(new HashSet(), hashSet));
        managedLedgerConfig.setMaxEntriesPerLedger(2);
        ManagedLedgerImpl open = this.factory.open("ml1", managedLedgerConfig);
        ManagedCursorImpl openCursor = open.openCursor("cursor1");
        for (int i = 0; i < 5; i++) {
            openCursor.delete(open.addEntry(new byte[1]));
        }
        CompletableFuture completableFuture = new CompletableFuture();
        open.trimConsumedLedgersInBackground(completableFuture);
        completableFuture.join();
        Assert.assertEquals(open.getTotalSize(), calculatePreciseSize(open));
        openCursor.close();
        open.close();
        this.factory.getEntryCacheManager().clear();
        this.factory.shutdown();
        managedLedgerConfig.setManagedLedgerInterceptor((ManagedLedgerInterceptor) null);
    }

    public static long calculatePreciseSize(ManagedLedgerImpl managedLedgerImpl) {
        return ((Long) managedLedgerImpl.getLedgersInfo().values().stream().map(ledgerInfo -> {
            return Long.valueOf(ledgerInfo.getSize());
        }).reduce((l, l2) -> {
            return Long.valueOf(l.longValue() + l2.longValue());
        }).orElse(0L)).longValue() + managedLedgerImpl.getCurrentLedgerSize();
    }

    @Test(timeOut = 20000)
    public void testRecoveryIndex() throws Exception {
        ManagedLedgerInterceptorImpl managedLedgerInterceptorImpl = new ManagedLedgerInterceptorImpl(getBrokerEntryMetadataInterceptors(), (Set) null);
        ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
        managedLedgerConfig.setManagedLedgerInterceptor(managedLedgerInterceptorImpl);
        ManagedLedger open = this.factory.open("my_recovery_index_test_ledger", managedLedgerConfig);
        open.addEntry("dummy-entry-1".getBytes(StandardCharsets.UTF_8), 2);
        open.openCursor("c1");
        open.addEntry("dummy-entry-2".getBytes(StandardCharsets.UTF_8), 2);
        Assert.assertEquals(open.getManagedLedgerInterceptor().getIndex(), 3L);
        open.close();
        log.info("Closing ledger and reopening");
        ManagedLedgerFactoryImpl managedLedgerFactoryImpl = new ManagedLedgerFactoryImpl(this.metadataStore, this.bkc);
        try {
            ManagedLedger open2 = managedLedgerFactoryImpl.open("my_recovery_index_test_ledger", managedLedgerConfig);
            ManagedCursor openCursor = open2.openCursor("c1");
            Assert.assertEquals(open2.getNumberOfEntries(), 2L);
            Assert.assertEquals(open2.getManagedLedgerInterceptor().getIndex(), 3L);
            List readEntries = openCursor.readEntries(100);
            Assert.assertEquals(readEntries.size(), 1);
            readEntries.forEach((v0) -> {
                v0.release();
            });
            openCursor.close();
            open2.close();
            if (Collections.singletonList(managedLedgerFactoryImpl).get(0) != null) {
                managedLedgerFactoryImpl.shutdown();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(managedLedgerFactoryImpl).get(0) != null) {
                managedLedgerFactoryImpl.shutdown();
            }
            throw th;
        }
    }

    @Test
    public void testFindPositionByIndex() throws Exception {
        ManagedLedgerInterceptorImpl managedLedgerInterceptorImpl = new ManagedLedgerInterceptorImpl(getBrokerEntryMetadataInterceptors(), (Set) null);
        ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
        managedLedgerConfig.setManagedLedgerInterceptor(managedLedgerInterceptorImpl);
        managedLedgerConfig.setMaxEntriesPerLedger(5);
        ManagedLedger open = this.factory.open("my_ml_broker_entry_metadata_test_ledger", managedLedgerConfig);
        ManagedCursor openCursor = open.openCursor("c1");
        long j = -1;
        for (int i = 0; i < 5; i++) {
            j = open.addEntry("dummy-entry".getBytes(StandardCharsets.UTF_8), 2).getLedgerId();
        }
        Assert.assertEquals(open.getManagedLedgerInterceptor().getIndex(), 9L);
        for (int i2 = 0; i2 <= open.getManagedLedgerInterceptor().getIndex(); i2++) {
            Assert.assertEquals(((PositionImpl) open.asyncFindPosition(new IndexSearchPredicate(i2)).get()).getEntryId(), (i2 % 10) / 2);
        }
        long j2 = -1;
        for (int i3 = 0; i3 < 5; i3++) {
            j2 = open.addEntry("dummy-entry".getBytes(StandardCharsets.UTF_8), 2).getLedgerId();
        }
        Assert.assertEquals(open.getManagedLedgerInterceptor().getIndex(), 19L);
        Assert.assertNotEquals(Long.valueOf(j), Long.valueOf(j2));
        for (int i4 = 0; i4 <= open.getManagedLedgerInterceptor().getIndex(); i4++) {
            Assert.assertEquals(((PositionImpl) open.asyncFindPosition(new IndexSearchPredicate(i4)).get()).getEntryId(), (i4 % 10) / 2);
        }
        open.close();
        ManagedLedgerFactoryImpl managedLedgerFactoryImpl = new ManagedLedgerFactoryImpl(this.metadataStore, this.bkc);
        try {
            ManagedLedger open2 = managedLedgerFactoryImpl.open("my_ml_broker_entry_metadata_test_ledger", managedLedgerConfig);
            long j3 = -1;
            for (int i5 = 0; i5 < 5; i5++) {
                j3 = open2.addEntry("dummy-entry".getBytes(StandardCharsets.UTF_8), 2).getLedgerId();
            }
            Assert.assertEquals(open2.getManagedLedgerInterceptor().getIndex(), 29L);
            Assert.assertNotEquals(Long.valueOf(j2), Long.valueOf(j3));
            for (int i6 = 0; i6 <= open2.getManagedLedgerInterceptor().getIndex(); i6++) {
                Assert.assertEquals(((PositionImpl) open2.asyncFindPosition(new IndexSearchPredicate(i6)).get()).getEntryId(), (i6 % 10) / 2);
            }
            openCursor.close();
            open2.close();
            if (Collections.singletonList(managedLedgerFactoryImpl).get(0) != null) {
                managedLedgerFactoryImpl.shutdown();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(managedLedgerFactoryImpl).get(0) != null) {
                managedLedgerFactoryImpl.shutdown();
            }
            throw th;
        }
    }

    @Test
    public void testAddEntryFailed() throws Exception {
        ManagedLedgerInterceptorImpl managedLedgerInterceptorImpl = new ManagedLedgerInterceptorImpl(getBrokerEntryMetadataInterceptors(), (Set) null);
        ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
        managedLedgerConfig.setMaxEntriesPerLedger(2);
        managedLedgerConfig.setManagedLedgerInterceptor(managedLedgerInterceptorImpl);
        ByteBuf wrappedBuffer = Unpooled.wrappedBuffer("message".getBytes());
        ManagedLedger open = this.factory.open("testAddEntryFailed", managedLedgerConfig);
        open.terminate();
        ManagedLedgerInterceptorImpl managedLedgerInterceptor = open.getManagedLedgerInterceptor();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        try {
            open.asyncAddEntry(wrappedBuffer, 2, new AsyncCallbacks.AddEntryCallback() { // from class: org.apache.pulsar.broker.intercept.ManagedLedgerInterceptorImplTest.1
                public void addComplete(Position position, ByteBuf byteBuf, Object obj) {
                    countDownLatch.countDown();
                }

                public void addFailed(ManagedLedgerException managedLedgerException, Object obj) {
                    countDownLatch.countDown();
                }
            }, (Object) null);
            countDownLatch.await();
            Assert.assertEquals(managedLedgerInterceptor.getIndex(), -1L);
            open.close();
            this.factory.shutdown();
        } catch (Throwable th) {
            open.close();
            this.factory.shutdown();
            throw th;
        }
    }

    @Test
    public void testBeforeAddEntryWithException() throws Exception {
        MockManagedLedgerInterceptorImpl mockManagedLedgerInterceptorImpl = new MockManagedLedgerInterceptorImpl(getBrokerEntryMetadataInterceptors(), null);
        ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
        managedLedgerConfig.setMaxEntriesPerLedger(2);
        managedLedgerConfig.setManagedLedgerInterceptor(mockManagedLedgerInterceptorImpl);
        ByteBuf wrappedBuffer = Unpooled.wrappedBuffer("message".getBytes());
        ManagedLedger open = this.factory.open("testBeforeAddEntryWithException", managedLedgerConfig);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        try {
            open.asyncAddEntry(wrappedBuffer, 2, new AsyncCallbacks.AddEntryCallback() { // from class: org.apache.pulsar.broker.intercept.ManagedLedgerInterceptorImplTest.2
                public void addComplete(Position position, ByteBuf byteBuf, Object obj) {
                    countDownLatch.countDown();
                }

                public void addFailed(ManagedLedgerException managedLedgerException, Object obj) {
                    countDownLatch.countDown();
                }
            }, (Object) null);
            countDownLatch.await();
            Assert.assertEquals(wrappedBuffer.refCnt(), 1);
            open.close();
            this.factory.shutdown();
        } catch (Throwable th) {
            open.close();
            this.factory.shutdown();
            throw th;
        }
    }

    public static Set<BrokerEntryMetadataInterceptor> getBrokerEntryMetadataInterceptors() {
        HashSet hashSet = new HashSet();
        hashSet.add("org.apache.pulsar.common.intercept.AppendBrokerTimestampMetadataInterceptor");
        hashSet.add("org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor");
        return BrokerEntryMetadataUtils.loadBrokerEntryMetadataInterceptors(hashSet, Thread.currentThread().getContextClassLoader());
    }

    @Test(timeOut = 3000)
    public void testManagedLedgerPayloadInputProcessorFailure() throws Exception {
        ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
        managedLedgerConfig.setManagedLedgerInterceptor(new ManagedLedgerInterceptorImpl(Collections.emptySet(), Set.of(new ManagedLedgerPayloadProcessor() { // from class: org.apache.pulsar.broker.intercept.ManagedLedgerInterceptorImplTest.3
            public ManagedLedgerPayloadProcessor.Processor inputProcessor() {
                return new ManagedLedgerPayloadProcessor.Processor() { // from class: org.apache.pulsar.broker.intercept.ManagedLedgerInterceptorImplTest.3.1
                    public ByteBuf process(Object obj, ByteBuf byteBuf) {
                        if (byteBuf.readBoolean()) {
                            throw new RuntimeException("failed to process input payload");
                        }
                        return byteBuf;
                    }

                    public void release(ByteBuf byteBuf) {
                    }
                };
            }
        })));
        ManagedLedger open = this.factory.open("testManagedLedgerPayloadProcessorFailure", managedLedgerConfig);
        final CountDownLatch countDownLatch = new CountDownLatch(10);
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        final ArrayList arrayList = new ArrayList();
        AsyncCallbacks.AddEntryCallback addEntryCallback = new AsyncCallbacks.AddEntryCallback() { // from class: org.apache.pulsar.broker.intercept.ManagedLedgerInterceptorImplTest.4
            public void addComplete(Position position, ByteBuf byteBuf, Object obj) {
                countDownLatch.countDown();
                atomicInteger.incrementAndGet();
            }

            public void addFailed(ManagedLedgerException managedLedgerException, Object obj) {
                arrayList.add(managedLedgerException);
                countDownLatch.countDown();
            }
        };
        for (int i = 0; i < 10; i++) {
            if (i % 2 == 0) {
                open.asyncAddEntry(Unpooled.buffer().writeBoolean(true), addEntryCallback, (Object) null);
            } else {
                open.asyncAddEntry(Unpooled.buffer().writeBoolean(false), addEntryCallback, (Object) null);
            }
        }
        countDownLatch.await();
        Assert.assertEquals(arrayList.size(), 10 / 2);
        Assert.assertEquals(atomicInteger.get(), 10 / 2);
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            Assert.assertEquals(((Exception) it.next()).getCause().getMessage(), "failed to process input payload");
        }
        open.close();
    }
}
