package org.apache.pulsar.client.api;

import java.lang.invoke.SerializedLambda;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Collections;
import java.util.HashSet;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.bookkeeper.client.PulsarMockReadHandleInterceptor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryMBeanImpl;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers;
import org.assertj.core.api.SoftAssertions;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(groups = {"broker-impl"})
/* loaded from: input_file:org/apache/pulsar/client/api/KeySharedSubscriptionBrokerCacheTest.class */
public class KeySharedSubscriptionBrokerCacheTest extends ProducerConsumerBase {
    private static final String SUBSCRIPTION_NAME = "key_shared";
    private static final Logger log = LoggerFactory.getLogger(KeySharedSubscriptionBrokerCacheTest.class);
    private static final Random random = new Random(1);

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeClass(alwaysRun = true)
    protected void setup() throws Exception {
        super.internalSetup();
        super.producerBaseSetup();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    public void doInitConf() throws Exception {
        super.doInitConf();
        this.conf.setUnblockStuckSubscriptionEnabled(false);
        this.conf.setSubscriptionKeySharedUseConsistentHashing(true);
        this.conf.setManagedLedgerCacheSizeMB(100);
        this.conf.setManagedLedgerCacheEvictionTimeThresholdMillis(30000L);
        this.conf.setManagedLedgerCacheEvictionIntervalMs(30000L);
        this.conf.setCacheEvictionByMarkDeletedPosition(true);
        this.conf.setManagedLedgerMaxReadsInFlightSizeInMB(100L);
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterClass(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @AfterMethod(alwaysRun = true)
    public void resetAfterMethod() throws Exception {
        for (String str : this.admin.namespaces().getTopics("public/default")) {
            if (!this.pulsar.getBrokerService().isSystemTopic(str)) {
                this.admin.topics().delete(str, false);
            }
        }
        this.pulsarTestContext.getMockBookKeeper().setReadHandleInterceptor((PulsarMockReadHandleInterceptor) null);
    }

    private Producer<Integer> createProducer(String str, boolean z) throws PulsarClientException {
        return z ? this.pulsarClient.newProducer(Schema.INT32).topic(str).enableBatching(true).maxPendingMessages(2001).batcherBuilder(BatcherBuilder.KEY_BASED).create() : this.pulsarClient.newProducer(Schema.INT32).topic(str).maxPendingMessages(2001).enableBatching(false).create();
    }

    private StickyKeyConsumerSelector getSelector(String str, String str2) {
        return getStickyKeyDispatcher(str, str2).getSelector();
    }

    private PersistentStickyKeyDispatcherMultipleConsumers getStickyKeyDispatcher(String str, String str2) {
        return ((Topic) ((Optional) this.pulsar.getBrokerService().getTopicIfExists(str).get()).get()).getSubscription(str2).getDispatcher();
    }

    /* JADX WARN: Finally extract failed */
    @Test(invocationCount = 1)
    public void testReplayQueueReadsGettingCached() throws Exception {
        String newUniqueName = BrokerTestUtil.newUniqueName("testReplayQueueReadsGettingCached");
        long j = 100;
        long nanoTime = System.nanoTime();
        Producer<Integer> createProducer = createProducer(newUniqueName, false);
        try {
            this.pulsarClient.newConsumer(Schema.INT32).topic(new String[]{newUniqueName}).subscriptionName(SUBSCRIPTION_NAME).subscriptionType(SubscriptionType.Key_Shared).subscribe().close();
            Set synchronizedSet = Collections.synchronizedSet(new HashSet());
            LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
            AtomicBoolean atomicBoolean = new AtomicBoolean(true);
            HashSet hashSet = new HashSet();
            AtomicLong atomicLong = new AtomicLong(System.currentTimeMillis());
            MessageListener messageListener = (consumer, message) -> {
                atomicLong.set(System.currentTimeMillis());
                synchronized (this) {
                    String key = message.getKey();
                    if (atomicBoolean.get() && hashSet.contains(key)) {
                        linkedBlockingQueue.add(Pair.of(consumer, message));
                    } else {
                        synchronizedSet.remove(message.getValue());
                        consumer.acknowledgeAsync(message);
                    }
                }
            };
            this.pulsarTestContext.getMockBookKeeper().setReadHandleInterceptor((j2, j3, j4, ledgerEntries) -> {
                log.error("Attempting to read from BK when cache should be used. {}:{} to {}:{}", new Object[]{Long.valueOf(j2), Long.valueOf(j3), Long.valueOf(j2), Long.valueOf(j4)});
                return CompletableFuture.failedFuture(new ManagedLedgerException.NonRecoverableLedgerException("Should not read from BK since cache should be used."));
            });
            Consumer subscribe = this.pulsarClient.newConsumer(Schema.INT32).topic(new String[]{newUniqueName}).consumerName("c1").subscriptionName(SUBSCRIPTION_NAME).subscriptionType(SubscriptionType.Key_Shared).messageListener(messageListener).subscribe();
            try {
                Consumer subscribe2 = this.pulsarClient.newConsumer(Schema.INT32).topic(new String[]{newUniqueName}).consumerName("c2").subscriptionName(SUBSCRIPTION_NAME).subscriptionType(SubscriptionType.Key_Shared).messageListener(messageListener).subscribe();
                try {
                    Consumer subscribe3 = this.pulsarClient.newConsumer(Schema.INT32).topic(new String[]{newUniqueName}).consumerName("c3").subscriptionName(SUBSCRIPTION_NAME).subscriptionType(SubscriptionType.Key_Shared).messageListener(messageListener).subscribe();
                    try {
                        StickyKeyConsumerSelector selector = getStickyKeyDispatcher(newUniqueName, SUBSCRIPTION_NAME).getSelector();
                        for (int i = 0; i < 100; i++) {
                            String valueOf = String.valueOf(i);
                            if (selector.select(StickyKeyConsumerSelector.makeStickyKeyHash(valueOf.getBytes(StandardCharsets.UTF_8))).consumerName().equals("c2")) {
                                hashSet.add(valueOf);
                            }
                        }
                        subscribe2.close();
                        for (int i2 = 0; i2 < 1000; i2++) {
                            String valueOf2 = String.valueOf(random.nextInt(100));
                            synchronizedSet.add(Integer.valueOf(i2));
                            createProducer.newMessage().key(valueOf2).value(Integer.valueOf(i2)).send();
                        }
                        Consumer subscribe4 = this.pulsarClient.newConsumer(Schema.INT32).topic(new String[]{newUniqueName}).consumerName("c2").subscriptionName(SUBSCRIPTION_NAME).subscriptionType(SubscriptionType.Key_Shared).messageListener(messageListener).startPaused(true).subscribe();
                        atomicBoolean.set(false);
                        while (true) {
                            Pair pair = (Pair) linkedBlockingQueue.poll();
                            if (pair == null) {
                                break;
                            } else {
                                messageListener.received((Consumer) pair.getLeft(), (Message) pair.getRight());
                            }
                        }
                        for (int i3 = 0; i3 < 1000; i3++) {
                            String valueOf3 = String.valueOf(random.nextInt(100));
                            synchronizedSet.add(Integer.valueOf(i3));
                            createProducer.newMessage().key(valueOf3).value(Integer.valueOf(i3)).send();
                        }
                        subscribe4.resume();
                        Awaitility.await().atMost(Duration.ofSeconds(10L)).until(() -> {
                            return Boolean.valueOf(synchronizedSet.isEmpty() || System.currentTimeMillis() - atomicLong.get() > 50 * j);
                        });
                        try {
                            SoftAssertions.assertSoftly(softAssertions -> {
                                softAssertions.assertThat(synchronizedSet).as("remainingMessageValues", new Object[0]).isEmpty();
                                ManagedLedgerFactoryMBeanImpl mbean = this.pulsar.getManagedLedgerFactory().getMbean();
                                mbean.refreshStats(System.nanoTime() - nanoTime, TimeUnit.NANOSECONDS);
                                softAssertions.assertThat(mbean.getCacheHitsRate()).as("cache hits", new Object[0]).isGreaterThan(0.0d);
                                softAssertions.assertThat(mbean.getCacheMissesRate()).as("cache misses", new Object[0]).isEqualTo(0.0d);
                                softAssertions.assertThat(mbean.getNumberOfCacheEvictions()).as("cache evictions", new Object[0]).isEqualTo(0L);
                            });
                            logTopicStats(newUniqueName);
                            if (Collections.singletonList(subscribe3).get(0) != null) {
                                subscribe3.close();
                            }
                            if (Collections.singletonList(subscribe4).get(0) != null) {
                                subscribe4.close();
                            }
                            if (Collections.singletonList(subscribe).get(0) != null) {
                                subscribe.close();
                            }
                        } catch (Throwable th) {
                            logTopicStats(newUniqueName);
                            throw th;
                        }
                    } catch (Throwable th2) {
                        if (Collections.singletonList(subscribe3).get(0) != null) {
                            subscribe3.close();
                        }
                        throw th2;
                    }
                } catch (Throwable th3) {
                    if (Collections.singletonList(subscribe2).get(0) != null) {
                        subscribe2.close();
                    }
                    throw th3;
                }
            } catch (Throwable th4) {
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
                throw th4;
            }
        } finally {
            if (Collections.singletonList(createProducer).get(0) != null) {
                createProducer.close();
            }
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 776636011:
                if (implMethodName.equals("lambda$testReplayQueueReadsGettingCached$96cd0275$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case SHARED_VALUE:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/pulsar/client/api/MessageListener") && serializedLambda.getFunctionalInterfaceMethodName().equals("received") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V") && serializedLambda.getImplClass().equals("org/apache/pulsar/client/api/KeySharedSubscriptionBrokerCacheTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/concurrent/atomic/AtomicLong;Ljava/util/concurrent/atomic/AtomicBoolean;Ljava/util/Set;Ljava/util/concurrent/BlockingQueue;Ljava/util/Set;Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V")) {
                    KeySharedSubscriptionBrokerCacheTest keySharedSubscriptionBrokerCacheTest = (KeySharedSubscriptionBrokerCacheTest) serializedLambda.getCapturedArg(0);
                    AtomicLong atomicLong = (AtomicLong) serializedLambda.getCapturedArg(1);
                    AtomicBoolean atomicBoolean = (AtomicBoolean) serializedLambda.getCapturedArg(2);
                    Set set = (Set) serializedLambda.getCapturedArg(3);
                    BlockingQueue blockingQueue = (BlockingQueue) serializedLambda.getCapturedArg(4);
                    Set set2 = (Set) serializedLambda.getCapturedArg(5);
                    return (consumer, message) -> {
                        atomicLong.set(System.currentTimeMillis());
                        synchronized (this) {
                            String key = message.getKey();
                            if (atomicBoolean.get() && set.contains(key)) {
                                blockingQueue.add(Pair.of(consumer, message));
                            } else {
                                set2.remove(message.getValue());
                                consumer.acknowledgeAsync(message);
                            }
                        }
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
