package org.apache.bookkeeper.mledger.impl;

import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Generated;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.admin.NamespacesTest;
import org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.awaitility.Awaitility;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(groups = {"broker-api"})
/* loaded from: input_file:org/apache/bookkeeper/mledger/impl/NonEntryCacheKeySharedSubscriptionV30Test.class */
public class NonEntryCacheKeySharedSubscriptionV30Test extends ProducerConsumerBase {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(NonEntryCacheKeySharedSubscriptionV30Test.class);

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeClass(alwaysRun = true)
    protected void setup() throws Exception {
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterClass(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    public void doInitConf() throws Exception {
        super.doInitConf();
        this.conf.setManagedLedgerCacheSizeMB(0);
        this.conf.setManagedLedgerMaxEntriesPerLedger(50000);
    }

    @Test(timeOut = NamespacesTest.THREE_MINUTE_MILLIS, invocationCount = 1)
    public void testRecentJoinQueueIsInOrderAfterRewind() throws Exception {
        String newUniqueName = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
        DefaultThreadFactory defaultThreadFactory = new DefaultThreadFactory(BrokerTestUtil.newUniqueName("thread"));
        this.admin.topics().createNonPartitionedTopic(newUniqueName);
        this.admin.topics().createSubscription(newUniqueName, "my-sub", MessageId.earliest);
        Producer create = this.pulsarClient.newProducer(Schema.INT32).topic(newUniqueName).enableBatching(false).create();
        try {
            AtomicInteger atomicInteger = new AtomicInteger();
            for (int i = 0; i < 300; i++) {
                int andIncrement = atomicInteger.getAndIncrement();
                create.newMessage().key(String.valueOf(andIncrement)).value(Integer.valueOf(andIncrement)).send();
            }
            Consumer subscribe = this.pulsarClient.newConsumer(Schema.INT32).topic(new String[]{newUniqueName}).subscriptionName("my-sub").receiverQueueSize(100).subscriptionType(SubscriptionType.Key_Shared).consumerName("c1").subscribe();
            Consumer subscribe2 = this.pulsarClient.newConsumer(Schema.INT32).topic(new String[]{newUniqueName}).subscriptionName("my-sub").receiverQueueSize(100).subscriptionType(SubscriptionType.Key_Shared).consumerName("c2").subscribe();
            Consumer subscribe3 = this.pulsarClient.newConsumer(Schema.INT32).topic(new String[]{newUniqueName}).subscriptionName("my-sub").receiverQueueSize(100).subscriptionType(SubscriptionType.Key_Shared).consumerName("c3").subscribe();
            PersistentTopic persistentTopic = (PersistentTopic) ((Optional) this.pulsar.getBrokerService().getTopic(newUniqueName, false).join()).get();
            ManagedLedgerImpl managedLedger = persistentTopic.getManagedLedger();
            PersistentStickyKeyDispatcherMultipleConsumers dispatcher = persistentTopic.getSubscription("my-sub").getDispatcher();
            ManagedCursorImpl managedCursorImpl = managedLedger.getCursors().get("my-sub");
            dispatcher.setSortRecentlyJoinedConsumersIfNeeded(false);
            ackAllMessages(subscribe, subscribe2);
            PositionImpl markDeletedPosition = managedCursorImpl.getMarkDeletedPosition();
            PositionImpl readPosition = managedCursorImpl.getReadPosition();
            PositionImpl lastConfirmedEntry = managedLedger.getLastConfirmedEntry();
            Assert.assertTrue(readPosition.compareTo(lastConfirmedEntry) >= 0);
            PositionImpl nextValidPosition = managedLedger.getNextValidPosition(markDeletedPosition);
            log.info("md-pos {}:{}", Long.valueOf(markDeletedPosition.getLedgerId()), Long.valueOf(markDeletedPosition.getEntryId()));
            log.info("rd-pos {}:{}", Long.valueOf(readPosition.getLedgerId()), Long.valueOf(readPosition.getEntryId()));
            log.info("lac-pos {}:{}", Long.valueOf(lastConfirmedEntry.getLedgerId()), Long.valueOf(lastConfirmedEntry.getEntryId()));
            log.info("first-waiting-ack-pos {}:{}", Long.valueOf(nextValidPosition.getLedgerId()), Long.valueOf(nextValidPosition.getEntryId()));
            LedgerHandle ledgerHandle = managedLedger.currentLedger;
            Assert.assertEquals(nextValidPosition.getLedgerId(), ledgerHandle.getId());
            LedgerHandle ledgerHandle2 = (LedgerHandle) Mockito.spy(ledgerHandle);
            CountDownLatch countDownLatch = new CountDownLatch(1);
            AtomicBoolean atomicBoolean = new AtomicBoolean();
            Answer answer = invocationOnMock -> {
                if (((Long) invocationOnMock.getArguments()[0]).longValue() != nextValidPosition.getEntryId()) {
                    return invocationOnMock.callRealMethod();
                }
                atomicBoolean.set(true);
                CompletableFuture completableFuture = new CompletableFuture();
                defaultThreadFactory.newThread(() -> {
                    try {
                        countDownLatch.await();
                        ((CompletableFuture) invocationOnMock.callRealMethod()).thenAccept(ledgerEntries -> {
                            completableFuture.complete(ledgerEntries);
                        }).exceptionally(th -> {
                            completableFuture.completeExceptionally(th);
                            return null;
                        });
                    } catch (Throwable th2) {
                        completableFuture.completeExceptionally(th2);
                    }
                }).start();
                return completableFuture;
            };
            ((LedgerHandle) Mockito.doAnswer(answer).when(ledgerHandle2)).readAsync(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong());
            ((LedgerHandle) Mockito.doAnswer(answer).when(ledgerHandle2)).readUnconfirmedAsync(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong());
            managedLedger.currentLedger = ledgerHandle2;
            AtomicBoolean atomicBoolean2 = new AtomicBoolean(true);
            new Thread(() -> {
                while (atomicBoolean2.get()) {
                    int andIncrement2 = atomicInteger.getAndIncrement();
                    create.newMessage().key(String.valueOf(andIncrement2)).value(Integer.valueOf(andIncrement2)).sendAsync();
                    sleep(100);
                }
            }).start();
            subscribe3.close();
            Awaitility.await().until(() -> {
                return Boolean.valueOf(atomicBoolean.get());
            });
            subscribe.close();
            subscribe2.close();
            ArrayList arrayList = new ArrayList();
            for (int i2 = 0; i2 < 40; i2++) {
                arrayList.add(this.pulsarClient.newConsumer(Schema.INT32).topic(new String[]{newUniqueName}).subscriptionName("my-sub").subscriptionType(SubscriptionType.Key_Shared).subscribeAsync());
                if (i2 == 10) {
                    for (int i3 = 0; i3 < 300; i3++) {
                        int andIncrement2 = atomicInteger.getAndIncrement();
                        create.newMessage().key(String.valueOf(andIncrement2)).value(Integer.valueOf(andIncrement2)).send();
                    }
                    Consumer consumer = (Consumer) ((CompletableFuture) arrayList.get(0)).join();
                    ackAllMessages(consumer);
                    new Thread(() -> {
                        while (atomicBoolean2.get()) {
                            try {
                                ackAllMessages(consumer);
                            } catch (Exception e) {
                                throw new RuntimeException(e);
                            }
                        }
                    }).start();
                }
                log.info("recent-joined-consumers {} {}", Integer.valueOf(i2), Integer.valueOf(dispatcher.getRecentlyJoinedConsumers().size()));
                if (dispatcher.getRecentlyJoinedConsumers().size() > 0) {
                    PositionImpl markDeletedPosition2 = managedCursorImpl.getMarkDeletedPosition();
                    PositionImpl readPosition2 = managedCursorImpl.getReadPosition();
                    PositionImpl lastConfirmedEntry2 = managedLedger.getLastConfirmedEntry();
                    Assert.assertTrue(readPosition.compareTo(lastConfirmedEntry) >= 0);
                    PositionImpl nextValidPosition2 = managedLedger.getNextValidPosition(markDeletedPosition);
                    if (readPosition2.compareTo(nextValidPosition) > 0) {
                        atomicBoolean2.set(false);
                        log.info("consumer-index: {}", Integer.valueOf(i2));
                        log.info("md-pos-2 {}:{}", Long.valueOf(markDeletedPosition2.getLedgerId()), Long.valueOf(markDeletedPosition2.getEntryId()));
                        log.info("rd-pos-2 {}:{}", Long.valueOf(readPosition2.getLedgerId()), Long.valueOf(readPosition2.getEntryId()));
                        log.info("lac-pos-2 {}:{}", Long.valueOf(lastConfirmedEntry2.getLedgerId()), Long.valueOf(lastConfirmedEntry2.getEntryId()));
                        log.info("first-waiting-ack-pos-2 {}:{}", Long.valueOf(nextValidPosition2.getLedgerId()), Long.valueOf(nextValidPosition2.getEntryId()));
                        countDownLatch.countDown();
                    } else {
                        sleep(1000);
                    }
                }
            }
            ((CompletableFuture) arrayList.get(arrayList.size() - 1)).join();
            synchronized (dispatcher) {
                Assert.assertTrue(verifyMapItemsAreInOrder(dispatcher.getRecentlyJoinedConsumers()));
            }
            create.close();
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                ((Consumer) ((CompletableFuture) it.next()).join()).close();
            }
            this.admin.topics().delete(newUniqueName, false);
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
            throw th;
        }
    }

    private void sleep(int i) {
        try {
            Thread.sleep(i);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    private boolean verifyMapItemsAreInOrder(LinkedHashMap<org.apache.pulsar.broker.service.Consumer, PositionImpl> linkedHashMap) {
        boolean z = false;
        PositionImpl positionImpl = null;
        PositionImpl positionImpl2 = null;
        Iterator<Map.Entry<org.apache.pulsar.broker.service.Consumer, PositionImpl>> it = linkedHashMap.entrySet().iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            Map.Entry<org.apache.pulsar.broker.service.Consumer, PositionImpl> next = it.next();
            if (positionImpl == null) {
                positionImpl = next.getValue();
            } else {
                positionImpl2 = next.getValue();
            }
            if (positionImpl != null && positionImpl2 != null) {
                if (positionImpl.compareTo(positionImpl2) > 0) {
                    z = true;
                    break;
                }
                positionImpl = positionImpl2;
            }
        }
        return !z;
    }
}
