package org.apache.pulsar.client.api;

import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.ObjectMethods;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.TopicMessageIdImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = {"broker-api"})
/* loaded from: input_file:org/apache/pulsar/client/api/UnloadSubscriptionTest.class */
public class UnloadSubscriptionTest extends ProducerConsumerBase {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(UnloadSubscriptionTest.class);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/pulsar/client/api/UnloadSubscriptionTest$MessagesEntry.class */
    public static final class MessagesEntry extends Record {
        private final Set<String> messageSet;
        private final Set<MessageId> messageIdSet;

        private MessagesEntry(Set<String> set, Set<MessageId> set2) {
            this.messageSet = set;
            this.messageIdSet = set2;
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, MessagesEntry.class), MessagesEntry.class, "messageSet;messageIdSet", "FIELD:Lorg/apache/pulsar/client/api/UnloadSubscriptionTest$MessagesEntry;->messageSet:Ljava/util/Set;", "FIELD:Lorg/apache/pulsar/client/api/UnloadSubscriptionTest$MessagesEntry;->messageIdSet:Ljava/util/Set;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, MessagesEntry.class), MessagesEntry.class, "messageSet;messageIdSet", "FIELD:Lorg/apache/pulsar/client/api/UnloadSubscriptionTest$MessagesEntry;->messageSet:Ljava/util/Set;", "FIELD:Lorg/apache/pulsar/client/api/UnloadSubscriptionTest$MessagesEntry;->messageIdSet:Ljava/util/Set;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, MessagesEntry.class, Object.class), MessagesEntry.class, "messageSet;messageIdSet", "FIELD:Lorg/apache/pulsar/client/api/UnloadSubscriptionTest$MessagesEntry;->messageSet:Ljava/util/Set;", "FIELD:Lorg/apache/pulsar/client/api/UnloadSubscriptionTest$MessagesEntry;->messageIdSet:Ljava/util/Set;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        public Set<String> messageSet() {
            return this.messageSet;
        }

        public Set<MessageId> messageIdSet() {
            return this.messageIdSet;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/pulsar/client/api/UnloadSubscriptionTest$ProducerAndMessageIds.class */
    public static final class ProducerAndMessageIds extends Record {
        private final Producer<String> producer;
        private final List<MessageId> messageIds;

        private ProducerAndMessageIds(Producer<String> producer, List<MessageId> list) {
            this.producer = producer;
            this.messageIds = list;
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, ProducerAndMessageIds.class), ProducerAndMessageIds.class, "producer;messageIds", "FIELD:Lorg/apache/pulsar/client/api/UnloadSubscriptionTest$ProducerAndMessageIds;->producer:Lorg/apache/pulsar/client/api/Producer;", "FIELD:Lorg/apache/pulsar/client/api/UnloadSubscriptionTest$ProducerAndMessageIds;->messageIds:Ljava/util/List;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, ProducerAndMessageIds.class), ProducerAndMessageIds.class, "producer;messageIds", "FIELD:Lorg/apache/pulsar/client/api/UnloadSubscriptionTest$ProducerAndMessageIds;->producer:Lorg/apache/pulsar/client/api/Producer;", "FIELD:Lorg/apache/pulsar/client/api/UnloadSubscriptionTest$ProducerAndMessageIds;->messageIds:Ljava/util/List;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, ProducerAndMessageIds.class, Object.class), ProducerAndMessageIds.class, "producer;messageIds", "FIELD:Lorg/apache/pulsar/client/api/UnloadSubscriptionTest$ProducerAndMessageIds;->producer:Lorg/apache/pulsar/client/api/Producer;", "FIELD:Lorg/apache/pulsar/client/api/UnloadSubscriptionTest$ProducerAndMessageIds;->messageIds:Ljava/util/List;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        public Producer<String> producer() {
            return this.producer;
        }

        public List<MessageId> messageIds() {
            return this.messageIds;
        }
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeClass(alwaysRun = true)
    protected void setup() throws Exception {
        super.internalSetup();
        super.producerBaseSetup();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    public void doInitConf() throws Exception {
        super.doInitConf();
        this.conf.setSystemTopicEnabled(false);
        this.conf.setTransactionCoordinatorEnabled(false);
        this.conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterClass(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "unloadCases")
    public Object[][] unloadCases() {
        return new Object[]{new Object[]{100, false, 1, SubscriptionType.Exclusive, 0}, new Object[]{100, false, 1, SubscriptionType.Failover, 0}, new Object[]{100, false, 1, SubscriptionType.Shared, 0}, new Object[]{100, false, 1, SubscriptionType.Key_Shared, 0}, new Object[]{100, true, 5, SubscriptionType.Exclusive, 0}, new Object[]{100, true, 5, SubscriptionType.Failover, 0}, new Object[]{100, true, 5, SubscriptionType.Shared, 0}, new Object[]{100, true, 5, SubscriptionType.Key_Shared, 0}, new Object[]{100, false, 1, SubscriptionType.Exclusive, 50}, new Object[]{100, false, 1, SubscriptionType.Failover, 50}, new Object[]{100, false, 1, SubscriptionType.Shared, 50}, new Object[]{100, false, 1, SubscriptionType.Key_Shared, 50}, new Object[]{100, true, 5, SubscriptionType.Exclusive, 50}, new Object[]{100, true, 5, SubscriptionType.Failover, 50}, new Object[]{100, true, 5, SubscriptionType.Shared, 50}, new Object[]{100, true, 5, SubscriptionType.Key_Shared, 50}};
    }

    @Test(dataProvider = "unloadCases")
    public void testSingleConsumer(int i, boolean z, int i2, SubscriptionType subscriptionType, int i3) throws Exception {
        String str = "persistent://my-property/my-ns/tp-" + String.valueOf(UUID.randomUUID());
        Consumer<String> createConsumer = createConsumer(str, "sub", subscriptionType);
        ProducerAndMessageIds createProducerAndSendMessages = createProducerAndSendMessages(str, i, z, i2);
        log.info("send message-ids:{}-{}", Integer.valueOf(createProducerAndSendMessages.messageIds.size()), toString(createProducerAndSendMessages.messageIds));
        MessagesEntry receiveAllMessages = receiveAllMessages(createConsumer);
        Assert.assertEquals(receiveAllMessages.messageSet.size(), i);
        if (i3 > 0) {
            LinkedHashSet linkedHashSet = new LinkedHashSet();
            Iterator<MessageId> it = receiveAllMessages.messageIdSet.iterator();
            for (int i4 = i3; i4 > 0; i4--) {
                linkedHashSet.add(it.next());
            }
            createConsumer.acknowledge(linkedHashSet.stream().toList());
            log.info("ack message-ids: {}", toString(linkedHashSet.stream().toList()));
        }
        getPersistentTopic(str).unloadSubscription("sub");
        MessagesEntry receiveAllMessages2 = receiveAllMessages(createConsumer);
        log.info("received message-ids for the second time: {}", toString(receiveAllMessages2.messageIdSet.stream().toList()));
        Assert.assertEquals(receiveAllMessages2.messageSet.size(), i - i3);
        createProducerAndSendMessages.producer.close();
        createConsumer.close();
        this.admin.topics().delete(str);
    }

    @Test(dataProvider = "unloadCases")
    public void testMultiConsumer(int i, boolean z, int i2, SubscriptionType subscriptionType, int i3) throws Exception {
        if (subscriptionType == SubscriptionType.Exclusive) {
            return;
        }
        String str = "persistent://my-property/my-ns/tp-" + String.valueOf(UUID.randomUUID());
        Consumer<String> createConsumer = createConsumer(str, "sub", subscriptionType);
        Consumer<String> createConsumer2 = createConsumer(str, "sub", subscriptionType);
        ProducerAndMessageIds createProducerAndSendMessages = createProducerAndSendMessages(str, i, z, i2);
        log.info("send message-ids:{}-{}", Integer.valueOf(createProducerAndSendMessages.messageIds.size()), toString(createProducerAndSendMessages.messageIds));
        MessagesEntry receiveAllMessages = receiveAllMessages(createConsumer);
        MessagesEntry receiveAllMessages2 = receiveAllMessages(createConsumer2);
        LinkedHashSet linkedHashSet = new LinkedHashSet();
        linkedHashSet.addAll(receiveAllMessages.messageSet);
        linkedHashSet.addAll(receiveAllMessages2.messageSet);
        Assert.assertEquals(linkedHashSet.size(), i);
        if (i3 > 0) {
            LinkedHashSet linkedHashSet2 = new LinkedHashSet();
            LinkedHashSet linkedHashSet3 = new LinkedHashSet();
            linkedHashSet2.addAll(receiveAllMessages.messageIdSet);
            linkedHashSet2.addAll(receiveAllMessages2.messageIdSet);
            Iterator it = linkedHashSet2.iterator();
            for (int i4 = i3; i4 > 0; i4--) {
                linkedHashSet3.add((MessageId) it.next());
            }
            createConsumer.acknowledge(linkedHashSet3.stream().toList());
            log.info("ack message-ids: {}", toString(linkedHashSet3.stream().toList()));
        }
        getPersistentTopic(str).unloadSubscription("sub");
        MessagesEntry receiveAllMessages3 = receiveAllMessages(createConsumer);
        MessagesEntry receiveAllMessages4 = receiveAllMessages(createConsumer2);
        LinkedHashSet linkedHashSet4 = new LinkedHashSet();
        linkedHashSet4.addAll(receiveAllMessages3.messageSet);
        linkedHashSet4.addAll(receiveAllMessages4.messageSet);
        LinkedHashSet linkedHashSet5 = new LinkedHashSet();
        linkedHashSet5.addAll(receiveAllMessages.messageIdSet);
        linkedHashSet5.addAll(receiveAllMessages2.messageIdSet);
        log.info("received message-ids for the second time: {}", toString(linkedHashSet5.stream().toList()));
        Assert.assertEquals(linkedHashSet4.size(), i - i3);
        createProducerAndSendMessages.producer.close();
        createConsumer.close();
        createConsumer2.close();
        this.admin.topics().delete(str);
    }

    private static String toString(List<MessageId> list) {
        ArrayList arrayList = new ArrayList(list.size());
        Iterator<MessageId> it = list.iterator();
        while (it.hasNext()) {
            MessageIdImpl messageIdImpl = (MessageId) it.next();
            MessageIdImpl innerMessageId = messageIdImpl instanceof TopicMessageIdImpl ? ((TopicMessageIdImpl) messageIdImpl).getInnerMessageId() : messageIdImpl;
            StringBuilder sb = new StringBuilder(String.valueOf(innerMessageId.getEntryId()));
            if (innerMessageId instanceof BatchMessageIdImpl) {
                BatchMessageIdImpl batchMessageIdImpl = (BatchMessageIdImpl) innerMessageId;
                sb.append("_").append(batchMessageIdImpl.getBatchIndex()).append("/").append(batchMessageIdImpl.getBatchSize());
            }
            arrayList.add(sb.toString());
        }
        return arrayList.toString();
    }

    private PersistentTopic getPersistentTopic(String str) {
        return (PersistentTopic) ((Optional) this.pulsar.getBrokerService().getTopic(str, false).join()).get();
    }

    private ProducerAndMessageIds createProducerAndSendMessages(String str, int i, boolean z, int i2) throws Exception {
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic(str).enableBatching(z).batchingMaxMessages(i2).create();
        ArrayList arrayList = new ArrayList();
        for (int i3 = 0; i3 < i; i3++) {
            arrayList.add(create.newMessage().key(String.valueOf(i3 % 10)).value(String.valueOf(i3)).sendAsync());
        }
        FutureUtil.waitForAll(arrayList).join();
        return new ProducerAndMessageIds(create, (List) arrayList.stream().map((v0) -> {
            return v0.join();
        }).collect(Collectors.toList()));
    }

    private Consumer<String> createConsumer(String str, String str2, SubscriptionType subscriptionType) throws Exception {
        return this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{str}).subscriptionName(str2).subscriptionType(subscriptionType).isAckReceiptEnabled(true).enableBatchIndexAcknowledgment(true).subscribe();
    }

    private MessagesEntry receiveAllMessages(Consumer<String> consumer) throws Exception {
        Set synchronizedSet = Collections.synchronizedSet(new LinkedHashSet());
        Set synchronizedSet2 = Collections.synchronizedSet(new LinkedHashSet());
        while (true) {
            Message receive = consumer.receive(2, TimeUnit.SECONDS);
            if (receive == null) {
                return new MessagesEntry(synchronizedSet, synchronizedSet2);
            }
            synchronizedSet2.add(receive.getMessageId());
            synchronizedSet.add((String) receive.getValue());
        }
    }
}
