package org.apache.pulsar.client.api;

import com.google.common.collect.Lists;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.MultiTopicsReaderImpl;
import org.apache.pulsar.client.impl.ReaderImpl;
import org.apache.pulsar.client.impl.TopicMessageIdImpl;
import org.apache.pulsar.common.util.RelativeTimeUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = {"flaky"})
/* loaded from: input_file:org/apache/pulsar/client/api/TopicReaderTest.class */
public class TopicReaderTest extends ProducerConsumerBase {
    private static final Logger log = LoggerFactory.getLogger(TopicReaderTest.class);

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeClass
    protected void setup() throws Exception {
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterClass(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider
    public static Object[][] variationsForExpectedPos() {
        return new Object[]{new Object[]{true, true, 10}, new Object[]{true, false, 10}, new Object[]{false, true, 10}, new Object[]{false, false, 10}, new Object[]{true, true, 100}, new Object[]{true, false, 100}, new Object[]{false, true, 100}, new Object[]{false, false, 100}};
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider
    public static Object[][] variationsForResetOnLatestMsg() {
        return new Object[]{new Object[]{true, 20}, new Object[]{false, 20}};
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider
    public static Object[][] variationsForHasMessageAvailable() {
        return new Object[]{new Object[]{true, true}, new Object[]{true, false}, new Object[]{false, true}, new Object[]{false, false}};
    }

    @Test
    public void testSimpleReader() throws Exception {
        Reader create = this.pulsarClient.newReader().topic("persistent://my-property/my-ns/testSimpleReader").startMessageId(MessageId.earliest).create();
        Producer create2 = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/testSimpleReader").create();
        for (int i = 0; i < 10; i++) {
            create2.send(("my-message-" + i).getBytes());
        }
        HashSet hashSet = new HashSet();
        for (int i2 = 0; i2 < 10; i2++) {
            String str = new String(create.readNext(1, TimeUnit.SECONDS).getData());
            log.debug("Received message: [{}]", str);
            testMessageOrderAndDuplicates(hashSet, str, "my-message-" + i2);
        }
        create.close();
        create2.close();
    }

    @Test
    public void testSimpleMultiReader() throws Exception {
        this.admin.topics().createPartitionedTopic("persistent://my-property/my-ns/testSimpleMultiReader", 3);
        Reader create = this.pulsarClient.newReader().topic("persistent://my-property/my-ns/testSimpleMultiReader").startMessageId(MessageId.earliest).create();
        Producer create2 = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/testSimpleMultiReader").create();
        for (int i = 0; i < 10; i++) {
            create2.send(("my-message-" + i).getBytes());
        }
        HashSet hashSet = new HashSet();
        for (int i2 = 0; i2 < 10; i2++) {
            Assert.assertTrue(hashSet.add(new String(create.readNext(1, TimeUnit.SECONDS).getData())));
        }
        create.close();
        create2.close();
    }

    @Test
    public void testReaderAfterMessagesWerePublished() throws Exception {
        Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/testReaderAfterMessagesWerePublished").create();
        for (int i = 0; i < 10; i++) {
            create.send(("my-message-" + i).getBytes());
        }
        Reader create2 = this.pulsarClient.newReader().topic("persistent://my-property/my-ns/testReaderAfterMessagesWerePublished").startMessageId(MessageId.earliest).create();
        HashSet hashSet = new HashSet();
        for (int i2 = 0; i2 < 10; i2++) {
            String str = new String(create2.readNext(1, TimeUnit.SECONDS).getData());
            log.debug("Received message: [{}]", str);
            testMessageOrderAndDuplicates(hashSet, str, "my-message-" + i2);
        }
        create2.close();
        create.close();
    }

    @Test
    public void testMultiReaderAfterMessagesWerePublished() throws Exception {
        this.admin.topics().createPartitionedTopic("persistent://my-property/my-ns/testMultiReaderAfterMessagesWerePublished", 3);
        Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/testMultiReaderAfterMessagesWerePublished").create();
        for (int i = 0; i < 10; i++) {
            create.send(("my-message-" + i).getBytes());
        }
        Reader create2 = this.pulsarClient.newReader().topic("persistent://my-property/my-ns/testMultiReaderAfterMessagesWerePublished").startMessageId(MessageId.earliest).create();
        HashSet hashSet = new HashSet();
        for (int i2 = 0; i2 < 10; i2++) {
            Assert.assertTrue(hashSet.add(new String(create2.readNext(1, TimeUnit.SECONDS).getData())));
        }
        create2.close();
        create.close();
    }

    @Test
    public void testMultipleReaders() throws Exception {
        Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/testMultipleReaders").create();
        for (int i = 0; i < 10; i++) {
            create.send(("my-message-" + i).getBytes());
        }
        Reader create2 = this.pulsarClient.newReader().topic("persistent://my-property/my-ns/testMultipleReaders").startMessageId(MessageId.earliest).create();
        Reader create3 = this.pulsarClient.newReader().topic("persistent://my-property/my-ns/testMultipleReaders").startMessageId(MessageId.earliest).create();
        HashSet hashSet = new HashSet();
        for (int i2 = 0; i2 < 10; i2++) {
            String str = new String(create2.readNext(1, TimeUnit.SECONDS).getData());
            log.debug("Received message: [{}]", str);
            testMessageOrderAndDuplicates(hashSet, str, "my-message-" + i2);
        }
        HashSet hashSet2 = new HashSet();
        for (int i3 = 0; i3 < 10; i3++) {
            String str2 = new String(create3.readNext(1, TimeUnit.SECONDS).getData());
            log.debug("Received message: [{}]", str2);
            testMessageOrderAndDuplicates(hashSet2, str2, "my-message-" + i3);
        }
        create2.close();
        create3.close();
        create.close();
    }

    @Test
    public void testMultiMultipleReaders() throws Exception {
        this.admin.topics().createPartitionedTopic("persistent://my-property/my-ns/testMultiMultipleReaders", 3);
        Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/testMultiMultipleReaders").create();
        for (int i = 0; i < 10; i++) {
            create.send(("my-message-" + i).getBytes());
        }
        Reader create2 = this.pulsarClient.newReader().topic("persistent://my-property/my-ns/testMultiMultipleReaders").startMessageId(MessageId.earliest).create();
        Reader create3 = this.pulsarClient.newReader().topic("persistent://my-property/my-ns/testMultiMultipleReaders").startMessageId(MessageId.earliest).create();
        HashSet hashSet = new HashSet();
        for (int i2 = 0; i2 < 10; i2++) {
            Assert.assertTrue(hashSet.add(new String(create2.readNext(1, TimeUnit.SECONDS).getData())));
        }
        HashSet hashSet2 = new HashSet();
        for (int i3 = 0; i3 < 10; i3++) {
            Assert.assertTrue(hashSet2.add(new String(create3.readNext(1, TimeUnit.SECONDS).getData())));
        }
        create2.close();
        create3.close();
        create.close();
    }

    @Test
    public void testTopicStats() throws Exception {
        Reader create = this.pulsarClient.newReader().topic("persistent://my-property/my-ns/testTopicStats").startMessageId(MessageId.earliest).create();
        Reader create2 = this.pulsarClient.newReader().topic("persistent://my-property/my-ns/testTopicStats").startMessageId(MessageId.earliest).create();
        Assert.assertEquals(this.admin.topics().getStats("persistent://my-property/my-ns/testTopicStats").getSubscriptions().size(), 2);
        create.close();
        Assert.assertEquals(this.admin.topics().getStats("persistent://my-property/my-ns/testTopicStats").getSubscriptions().size(), 1);
        create2.close();
        Assert.assertEquals(this.admin.topics().getStats("persistent://my-property/my-ns/testTopicStats").getSubscriptions().size(), 0);
    }

    @Test
    public void testMultiTopicStats() throws Exception {
        this.admin.topics().createPartitionedTopic("persistent://my-property/my-ns/testMultiTopicStats", 3);
        Reader create = this.pulsarClient.newReader().topic("persistent://my-property/my-ns/testMultiTopicStats").startMessageId(MessageId.earliest).create();
        Reader create2 = this.pulsarClient.newReader().topic("persistent://my-property/my-ns/testMultiTopicStats").startMessageId(MessageId.earliest).create();
        Assert.assertEquals(this.admin.topics().getPartitionedStats("persistent://my-property/my-ns/testMultiTopicStats", true).getSubscriptions().size(), 2);
        create.close();
        Assert.assertEquals(this.admin.topics().getPartitionedStats("persistent://my-property/my-ns/testMultiTopicStats", true).getSubscriptions().size(), 1);
        create2.close();
        Assert.assertEquals(this.admin.topics().getPartitionedStats("persistent://my-property/my-ns/testMultiTopicStats", true).getSubscriptions().size(), 0);
    }

    @Test(dataProvider = "variationsForResetOnLatestMsg")
    public void testReaderOnLatestMessage(boolean z, int i) throws Exception {
        int i2 = i / 2;
        Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/ReaderOnLatestMessage").create();
        for (int i3 = 0; i3 < i2; i3++) {
            create.send(String.format("my-message-%d", Integer.valueOf(i3)).getBytes());
        }
        ReaderBuilder startMessageId = this.pulsarClient.newReader().topic("persistent://my-property/my-ns/ReaderOnLatestMessage").startMessageId(MessageId.latest);
        if (z) {
            startMessageId.startMessageIdInclusive();
        }
        ReaderImpl create2 = startMessageId.create();
        for (int i4 = i2; i4 < i; i4++) {
            create.send(String.format("my-message-%d", Integer.valueOf(i4)).getBytes());
        }
        HashSet hashSet = new HashSet();
        for (int i5 = i2; i5 < i; i5++) {
            testMessageOrderAndDuplicates(hashSet, new String(create2.readNext().getData()), String.format("my-message-%d", Integer.valueOf(i5)));
        }
        Assert.assertTrue(create2.isConnected());
        Assert.assertEquals(create2.getConsumer().numMessagesInQueue(), 0);
        Assert.assertEquals(hashSet.size(), i2);
        create2.close();
        create.close();
    }

    @Test(dataProvider = "variationsForResetOnLatestMsg")
    public void testMultiReaderOnLatestMessage(boolean z, int i) throws Exception {
        String str = "persistent://my-property/my-ns/testMultiReaderOnLatestMessage" + System.currentTimeMillis();
        this.admin.topics().createPartitionedTopic(str, 3);
        int i2 = i / 2;
        Producer create = this.pulsarClient.newProducer().topic(str).create();
        HashSet hashSet = new HashSet();
        for (int i3 = 0; i3 < i2; i3++) {
            byte[] bytes = String.format("my-message-%d", Integer.valueOf(i3)).getBytes();
            create.send(bytes);
            hashSet.add(bytes);
        }
        ReaderBuilder startMessageId = this.pulsarClient.newReader().topic(str).startMessageId(MessageId.latest);
        if (z) {
            startMessageId.startMessageIdInclusive();
        }
        MultiTopicsReaderImpl create2 = startMessageId.create();
        for (int i4 = i2; i4 < i; i4++) {
            create.send(String.format("my-message-%d", Integer.valueOf(i4)).getBytes());
        }
        HashSet hashSet2 = new HashSet();
        for (int i5 = i2; i5 < i; i5++) {
            Message readNext = create2.readNext();
            Assert.assertFalse(hashSet.contains(readNext));
            Assert.assertTrue(hashSet2.add(new String(readNext.getData())));
        }
        Assert.assertTrue(create2.isConnected());
        Assert.assertEquals(create2.getMultiTopicsConsumer().numMessagesInQueue(), 0);
        Assert.assertEquals(hashSet2.size(), i2);
        create.close();
        create2.close();
    }

    @Test
    public void testReaderOnSpecificMessage() throws Exception {
        Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/testReaderOnSpecificMessage").create();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 10; i++) {
            arrayList.add(create.send(("my-message-" + i).getBytes()));
        }
        Reader create2 = this.pulsarClient.newReader().topic("persistent://my-property/my-ns/testReaderOnSpecificMessage").startMessageId((MessageId) arrayList.get(4)).create();
        HashSet hashSet = new HashSet();
        for (int i2 = 5; i2 < 10; i2++) {
            String str = new String(create2.readNext(1, TimeUnit.SECONDS).getData());
            log.debug("Received message: [{}]", str);
            testMessageOrderAndDuplicates(hashSet, str, "my-message-" + i2);
        }
        create2.close();
        create.close();
    }

    @Test
    public void testReaderOnSpecificMessageWithBatches() throws Exception {
        Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/testReaderOnSpecificMessageWithBatches").enableBatching(true).batchingMaxPublishDelay(100L, TimeUnit.MILLISECONDS).create();
        for (int i = 0; i < 10; i++) {
            create.sendAsync(("my-message-" + i).getBytes());
        }
        create.send("my-message-10".getBytes());
        Reader create2 = this.pulsarClient.newReader().topic("persistent://my-property/my-ns/testReaderOnSpecificMessageWithBatches").startMessageId(MessageId.earliest).create();
        MessageId messageId = null;
        for (int i2 = 0; i2 < 5; i2++) {
            messageId = create2.readNext().getMessageId();
        }
        Assert.assertEquals(messageId.getClass(), BatchMessageIdImpl.class);
        System.out.println("CREATING READER ON MSG ID: " + String.valueOf(messageId));
        Reader create3 = this.pulsarClient.newReader().topic("persistent://my-property/my-ns/testReaderOnSpecificMessageWithBatches").startMessageId(messageId).create();
        for (int i3 = 5; i3 < 11; i3++) {
            String str = new String(create3.readNext(1, TimeUnit.SECONDS).getData());
            log.debug("Received message: [{}]", str);
            Assert.assertEquals(str, "my-message-" + i3);
        }
        create.close();
    }

    @Test
    public void testECDSAEncryption() throws Exception {
        log.info("-- Starting {} test --", this.methodName);
        HashSet hashSet = new HashSet();
        Reader create = this.pulsarClient.newReader().topic("persistent://my-property/my-ns/test-reader-myecdsa-topic1").startMessageId(MessageId.latest).cryptoKeyReader(new CryptoKeyReader() { // from class: org.apache.pulsar.client.api.TopicReaderTest.1EncKeyReader
            final EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();

            public EncryptionKeyInfo getPublicKey(String str, Map<String, String> map) {
                String str2 = "./src/test/resources/certificate/public-key." + str;
                if (!Files.isReadable(Paths.get(str2, new String[0]))) {
                    Assert.fail("Certificate file " + str2 + " is not present or not readable.");
                    return null;
                }
                try {
                    this.keyInfo.setKey(Files.readAllBytes(Paths.get(str2, new String[0])));
                    return this.keyInfo;
                } catch (IOException e) {
                    Assert.fail("Failed to read certificate from " + str2);
                    return null;
                }
            }

            public EncryptionKeyInfo getPrivateKey(String str, Map<String, String> map) {
                String str2 = "./src/test/resources/certificate/private-key." + str;
                if (!Files.isReadable(Paths.get(str2, new String[0]))) {
                    Assert.fail("Certificate file " + str2 + " is not present or not readable.");
                    return null;
                }
                try {
                    this.keyInfo.setKey(Files.readAllBytes(Paths.get(str2, new String[0])));
                    return this.keyInfo;
                } catch (IOException e) {
                    Assert.fail("Failed to read certificate from " + str2);
                    return null;
                }
            }
        }).create();
        Producer create2 = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/test-reader-myecdsa-topic1").addEncryptionKey("client-ecdsa.pem").cryptoKeyReader(new CryptoKeyReader() { // from class: org.apache.pulsar.client.api.TopicReaderTest.1EncKeyReader
            final EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();

            public EncryptionKeyInfo getPublicKey(String str, Map<String, String> map) {
                String str2 = "./src/test/resources/certificate/public-key." + str;
                if (!Files.isReadable(Paths.get(str2, new String[0]))) {
                    Assert.fail("Certificate file " + str2 + " is not present or not readable.");
                    return null;
                }
                try {
                    this.keyInfo.setKey(Files.readAllBytes(Paths.get(str2, new String[0])));
                    return this.keyInfo;
                } catch (IOException e) {
                    Assert.fail("Failed to read certificate from " + str2);
                    return null;
                }
            }

            public EncryptionKeyInfo getPrivateKey(String str, Map<String, String> map) {
                String str2 = "./src/test/resources/certificate/private-key." + str;
                if (!Files.isReadable(Paths.get(str2, new String[0]))) {
                    Assert.fail("Certificate file " + str2 + " is not present or not readable.");
                    return null;
                }
                try {
                    this.keyInfo.setKey(Files.readAllBytes(Paths.get(str2, new String[0])));
                    return this.keyInfo;
                } catch (IOException e) {
                    Assert.fail("Failed to read certificate from " + str2);
                    return null;
                }
            }
        }).create();
        for (int i = 0; i < 10; i++) {
            create2.send(("my-message-" + i).getBytes());
        }
        for (int i2 = 0; i2 < 10; i2++) {
            String str = new String(create.readNext(5, TimeUnit.SECONDS).getData());
            log.debug("Received message: [{}]", str);
            testMessageOrderAndDuplicates(hashSet, str, "my-message-" + i2);
        }
        create2.close();
        create.close();
        log.info("-- Exiting {} test --", this.methodName);
    }

    @Test
    public void testMultiReaderECDSAEncryption() throws Exception {
        log.info("-- Starting {} test --", this.methodName);
        HashSet hashSet = new HashSet();
        this.admin.topics().createPartitionedTopic("persistent://my-property/my-ns/test-multi-reader-myecdsa-topic1", 3);
        Reader create = this.pulsarClient.newReader().topic("persistent://my-property/my-ns/test-multi-reader-myecdsa-topic1").startMessageId(MessageId.latest).cryptoKeyReader(new CryptoKeyReader() { // from class: org.apache.pulsar.client.api.TopicReaderTest.2EncKeyReader
            final EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();

            public EncryptionKeyInfo getPublicKey(String str, Map<String, String> map) {
                String str2 = "./src/test/resources/certificate/public-key." + str;
                if (!Files.isReadable(Paths.get(str2, new String[0]))) {
                    Assert.fail("Certificate file " + str2 + " is not present or not readable.");
                    return null;
                }
                try {
                    this.keyInfo.setKey(Files.readAllBytes(Paths.get(str2, new String[0])));
                    return this.keyInfo;
                } catch (IOException e) {
                    Assert.fail("Failed to read certificate from " + str2);
                    return null;
                }
            }

            public EncryptionKeyInfo getPrivateKey(String str, Map<String, String> map) {
                String str2 = "./src/test/resources/certificate/private-key." + str;
                if (!Files.isReadable(Paths.get(str2, new String[0]))) {
                    Assert.fail("Certificate file " + str2 + " is not present or not readable.");
                    return null;
                }
                try {
                    this.keyInfo.setKey(Files.readAllBytes(Paths.get(str2, new String[0])));
                    return this.keyInfo;
                } catch (IOException e) {
                    Assert.fail("Failed to read certificate from " + str2);
                    return null;
                }
            }
        }).create();
        Producer create2 = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/test-multi-reader-myecdsa-topic1").addEncryptionKey("client-ecdsa.pem").cryptoKeyReader(new CryptoKeyReader() { // from class: org.apache.pulsar.client.api.TopicReaderTest.2EncKeyReader
            final EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();

            public EncryptionKeyInfo getPublicKey(String str, Map<String, String> map) {
                String str2 = "./src/test/resources/certificate/public-key." + str;
                if (!Files.isReadable(Paths.get(str2, new String[0]))) {
                    Assert.fail("Certificate file " + str2 + " is not present or not readable.");
                    return null;
                }
                try {
                    this.keyInfo.setKey(Files.readAllBytes(Paths.get(str2, new String[0])));
                    return this.keyInfo;
                } catch (IOException e) {
                    Assert.fail("Failed to read certificate from " + str2);
                    return null;
                }
            }

            public EncryptionKeyInfo getPrivateKey(String str, Map<String, String> map) {
                String str2 = "./src/test/resources/certificate/private-key." + str;
                if (!Files.isReadable(Paths.get(str2, new String[0]))) {
                    Assert.fail("Certificate file " + str2 + " is not present or not readable.");
                    return null;
                }
                try {
                    this.keyInfo.setKey(Files.readAllBytes(Paths.get(str2, new String[0])));
                    return this.keyInfo;
                } catch (IOException e) {
                    Assert.fail("Failed to read certificate from " + str2);
                    return null;
                }
            }
        }).create();
        for (int i = 0; i < 10; i++) {
            create2.send(("my-message-" + i).getBytes());
        }
        for (int i2 = 0; i2 < 10; i2++) {
            String str = new String(create.readNext(5, TimeUnit.SECONDS).getData());
            Assert.assertTrue(hashSet.add(str), "Received duplicate message " + str);
        }
        create2.close();
        create.close();
    }

    @Test
    public void testDefaultCryptoKeyReader() throws Exception {
        String str = "persistent://my-property/my-ns/test-reader-default-crypto-key-reader" + System.currentTimeMillis();
        HashMap hashMap = new HashMap();
        hashMap.put("client-ecdsa.pem", "file:./src/test/resources/certificate/private-key.client-ecdsa.pem");
        hashMap.put("client-rsa.pem", "file:./src/test/resources/certificate/private-key.client-rsa.pem");
        HashMap hashMap2 = new HashMap();
        hashMap2.put("client-ecdsa.pem", "data:application/x-pem-file;base64,LS0tLS1CRUdJTiBFQyBQQVJBTUVURVJTLS0tLS0KTUlHWEFnRUJNQndHQnlxR1NNNDlBUUVDRVFELy8vLzkvLy8vLy8vLy8vLy8vLy8vTURzRUVQLy8vLzMvLy8vLwovLy8vLy8vLy8vd0VFT2gxZWNFUWVmUTkyQ1NaUEN6dVh0TURGUUFBRGcxTmFXNW5hSFZoVVhVTXdEcEVjOUEyCmVRUWhCQllmOTFLTGlac3REQ2hnZktVc1c0YlBXc2c1VzYvckU4QXRvcExkN1hxREFoRUEvLy8vL2dBQUFBQjEKb3cwYmtEaWhGUUlCQVE9PQotLS0tLUVORCBFQyBQQVJBTUVURVJTLS0tLS0KLS0tLS1CRUdJTiBFQyBQUklWQVRFIEtFWS0tLS0tCk1JSFlBZ0VCQkJEZXU5aGM4a092TDNwbCtMWVNqTHE5b0lHYU1JR1hBZ0VCTUJ3R0J5cUdTTTQ5QVFFQ0VRRC8KLy8vOS8vLy8vLy8vLy8vLy8vLy9NRHNFRVAvLy8vMy8vLy8vLy8vLy8vLy8vL3dFRU9oMWVjRVFlZlE5MkNTWgpQQ3p1WHRNREZRQUFEZzFOYVc1bmFIVmhVWFVNd0RwRWM5QTJlUVFoQkJZZjkxS0xpWnN0RENoZ2ZLVXNXNGJQCldzZzVXNi9yRThBdG9wTGQ3WHFEQWhFQS8vLy8vZ0FBQUFCMW93MGJrRGloRlFJQkFhRWtBeUlBQk9zcVBwRTgKY1k4MHB4a29nNXh3M2kyQVEweWZWM01xTXVzeGxPUW5pZ0JwCi0tLS0tRU5EIEVDIFBSSVZBVEUgS0VZLS0tLS0K");
        hashMap2.put("client-rsa.pem", "data:application/x-pem-file;base64,LS0tLS1CRUdJTiBSU0EgUFJJVkFURSBLRVktLS0tLQpNSUlFb3dJQkFBS0NBUUVBdEtXd2dxZG5UWXJPQ3YrajFNa1RXZlNIMHdDc0haWmNhOXdBVzNxUDR1dWhsQnZuCmIxMEpjRmY1Wmp6UDlCU1hLK3RIbUk4dW9OMzY4dkV2NnloVVJITTR5dVhxekN4enVBd2tRU28zOXJ6WDhQR0MKN3FkakNON0xESjNNbnFpQklyVXNTYUVQMXdyTnNCMWtJK285RVIxZTVPL3VFUEFvdFA5MzNoSFEwSjJoTUVlawpIcUw3c0JsSjk4aDZObXNpY0VhVWthcmRrMFRPWHJsa2pDK2NNZDhaYkdTY1BxSTlNMzhibW4zT0x4RlRuMXZ0Cmhwdm5YTHZDbUc0TSs2eHRZdEQrbnBjVlBadzFpMVI5MGZNczdwcFpuUmJ2OEhjL0RGZE9LVlFJZ2FtNkNEZG4KTktnVzdjN0lCTXJQMEFFbTM3SFR1MExTT2pQMk9IWGx2dmxRR1FJREFRQUJBb0lCQUFhSkZBaTJDN3UzY05yZgpBc3RZOXZWRExvTEl2SEZabGtCa3RqS1pEWW1WSXNSYitoU0NWaXdWVXJXTEw2N1I2K0l2NGVnNERlVE9BeDAwCjhwbmNYS2daVHcyd0liMS9RalIvWS9SamxhQzhsa2RtUldsaTd1ZE1RQ1pWc3lodVNqVzZQajd2cjhZRTR3b2oKRmhOaWp4RUdjZjl3V3JtTUpyemRuVFdRaVhCeW8rZVR2VVE5QlBnUEdyUmpzTVptVGtMeUFWSmZmMkRmeE81YgpJV0ZEWURKY3lZQU1DSU1RdTd2eXMvSTUwb3U2aWxiMUNPNlFNNlo3S3BQZU9vVkZQd3R6Ymg4Y2Y5eE04VU5TCmo2Si9KbWRXaGdJMzRHUzNOQTY4eFRRNlBWN3pqbmhDYytpY2NtM0pLeXpHWHdhQXBBWitFb2NlLzlqNFdLbXUKNUI0emlSMENnWUVBM2wvOU9IYmwxem15VityUnhXT0lqL2kyclR2SHp3Qm5iblBKeXVlbUw1Vk1GZHBHb2RRMwp2d0h2eVFtY0VDUlZSeG1Yb2pRNFF1UFBIczNxcDZ3RUVGUENXeENoTFNUeGxVYzg1U09GSFdVMk85OWpWN3pJCjcrSk9wREsvTXN0c3g5bkhnWGR1SkYrZ2xURnRBM0xIOE9xeWx6dTJhRlBzcHJ3S3VaZjk0UThDZ1lFQXovWngKYWtFRytQRU10UDVZUzI4Y1g1WGZqc0lYL1YyNkZzNi9zSDE2UWpVSUVkZEU1VDRmQ3Vva3hDalNpd1VjV2htbApwSEVKNVM1eHAzVllSZklTVzNqUlczcXN0SUgxdHBaaXBCNitTMHpUdUptTEpiQTNJaVdFZzJydE10N1gxdUp2CkEvYllPcWUwaE9QVHVYdVpkdFZaMG5NVEtrN0dHOE82VmtCSTdGY0NnWUVBa0RmQ21zY0pnczdKYWhsQldIbVgKekg5cHdlbStTUEtqSWMvNE5CNk4rZGdpa3gyUHAwNWhwUC9WaWhVd1lJdWZ2cy9MTm9nVllOUXJ0SGVwVW5yTgoyK1RtYkhiWmdOU3YxTGR4dDgyVWZCN3kwRnV0S3U2bGhtWEh5TmVjaG8zRmk4c2loMFYwYWlTV21ZdUhmckFICkdhaXNrRVpLbzFpaVp2UVhKSXg5TzJNQ2dZQVRCZjByOWhUWU10eXh0YzZIMy9zZGQwMUM5dGhROGdEeTB5alAKMFRxYzBkTVNKcm9EcW1JV2tvS1lldzkvYmhGQTRMVzVUQ25Xa0NBUGJIbU50RzRmZGZiWXdta0gvaGRuQTJ5MApqS2RscGZwOEdYZVVGQUdIR3gxN0ZBM3NxRnZnS1VoMGVXRWdSSFVMN3ZkUU1WRkJnSlM5M283elFNOTRmTGdQCjZjT0I4d0tCZ0ZjR1Y0R2pJMld3OWNpbGxhQzU1NE12b1NqZjhCLyswNGtYekRPaDhpWUlJek85RVVpbDFqaksKSnZ4cDRobkx6VEtXYnV4M01FV3F1ckxrWWFzNkdwS0JqdytpTk9DYXI2WWRxV0dWcU0zUlV4N1BUVWFad2tLeApVZFA2M0lmWTdpWkNJVC9RYnlIUXZJVWUyTWFpVm5IK3VseGRrSzZZNWU3Z3hjYmNrSUg0Ci0tLS0tRU5EIFJTQSBQUklWQVRFIEtFWS0tLS0tCg==");
        Reader create = this.pulsarClient.newReader().topic(str).startMessageId(MessageId.latest).defaultCryptoKeyReader("file:./src/test/resources/certificate/private-key.client-ecdsa.pem").create();
        Reader create2 = this.pulsarClient.newReader().topic(str).startMessageId(MessageId.latest).defaultCryptoKeyReader("data:application/x-pem-file;base64,LS0tLS1CRUdJTiBFQyBQQVJBTUVURVJTLS0tLS0KTUlHWEFnRUJNQndHQnlxR1NNNDlBUUVDRVFELy8vLzkvLy8vLy8vLy8vLy8vLy8vTURzRUVQLy8vLzMvLy8vLwovLy8vLy8vLy8vd0VFT2gxZWNFUWVmUTkyQ1NaUEN6dVh0TURGUUFBRGcxTmFXNW5hSFZoVVhVTXdEcEVjOUEyCmVRUWhCQllmOTFLTGlac3REQ2hnZktVc1c0YlBXc2c1VzYvckU4QXRvcExkN1hxREFoRUEvLy8vL2dBQUFBQjEKb3cwYmtEaWhGUUlCQVE9PQotLS0tLUVORCBFQyBQQVJBTUVURVJTLS0tLS0KLS0tLS1CRUdJTiBFQyBQUklWQVRFIEtFWS0tLS0tCk1JSFlBZ0VCQkJEZXU5aGM4a092TDNwbCtMWVNqTHE5b0lHYU1JR1hBZ0VCTUJ3R0J5cUdTTTQ5QVFFQ0VRRC8KLy8vOS8vLy8vLy8vLy8vLy8vLy9NRHNFRVAvLy8vMy8vLy8vLy8vLy8vLy8vL3dFRU9oMWVjRVFlZlE5MkNTWgpQQ3p1WHRNREZRQUFEZzFOYVc1bmFIVmhVWFVNd0RwRWM5QTJlUVFoQkJZZjkxS0xpWnN0RENoZ2ZLVXNXNGJQCldzZzVXNi9yRThBdG9wTGQ3WHFEQWhFQS8vLy8vZ0FBQUFCMW93MGJrRGloRlFJQkFhRWtBeUlBQk9zcVBwRTgKY1k4MHB4a29nNXh3M2kyQVEweWZWM01xTXVzeGxPUW5pZ0JwCi0tLS0tRU5EIEVDIFBSSVZBVEUgS0VZLS0tLS0K").create();
        Reader create3 = this.pulsarClient.newReader().topic(str).startMessageId(MessageId.latest).defaultCryptoKeyReader(hashMap).create();
        Reader create4 = this.pulsarClient.newReader().topic(str).startMessageId(MessageId.latest).defaultCryptoKeyReader(hashMap2).create();
        Producer create5 = this.pulsarClient.newProducer().topic(str).addEncryptionKey("client-ecdsa.pem").defaultCryptoKeyReader("file:./src/test/resources/certificate/public-key.client-ecdsa.pem").create();
        Producer create6 = this.pulsarClient.newProducer().topic(str).addEncryptionKey("client-ecdsa.pem").defaultCryptoKeyReader("data:application/x-pem-file;base64,LS0tLS1CRUdJTiBQVUJMSUMgS0VZLS0tLS0KTUlIS01JR2pCZ2NxaGtqT1BRSUJNSUdYQWdFQk1Cd0dCeXFHU000OUFRRUNFUUQvLy8vOS8vLy8vLy8vLy8vLwovLy8vTURzRUVQLy8vLzMvLy8vLy8vLy8vLy8vLy93RUVPaDFlY0VRZWZROTJDU1pQQ3p1WHRNREZRQUFEZzFOCmFXNW5hSFZoVVhVTXdEcEVjOUEyZVFRaEJCWWY5MUtMaVpzdERDaGdmS1VzVzRiUFdzZzVXNi9yRThBdG9wTGQKN1hxREFoRUEvLy8vL2dBQUFBQjFvdzBia0RpaEZRSUJBUU1pQUFUcktqNlJQSEdQTktjWktJT2NjTjR0Z0VOTQpuMWR6S2pMck1aVGtKNG9BYVE9PQotLS0tLUVORCBQVUJMSUMgS0VZLS0tLS0K").create();
        for (int i = 0; i < 10; i++) {
            create5.send(("my-message-" + i).getBytes());
        }
        for (int i2 = 10; i2 < 20; i2++) {
            create6.send(("my-message-" + i2).getBytes());
        }
        create5.close();
        create6.close();
        for (Reader reader : Lists.newArrayList(new Reader[]{create, create2})) {
            for (int i3 = 0; i3 < 20; i3++) {
                MessageImpl readNext = reader.readNext(5, TimeUnit.SECONDS);
                readNext.getEncryptionCtx().orElseThrow(() -> {
                    return new IllegalStateException("encryption-ctx not present for encrypted message");
                });
                Assert.assertEquals(new String(readNext.getData()), "my-message-" + i3);
            }
        }
        create.close();
        create2.close();
        Producer create7 = this.pulsarClient.newProducer().topic(str).addEncryptionKey("client-rsa.pem").defaultCryptoKeyReader("file:./src/test/resources/certificate/public-key.client-rsa.pem").create();
        Producer create8 = this.pulsarClient.newProducer().topic(str).addEncryptionKey("client-rsa.pem").defaultCryptoKeyReader("data:application/x-pem-file;base64,LS0tLS1CRUdJTiBQVUJMSUMgS0VZLS0tLS0KTUlJQklqQU5CZ2txaGtpRzl3MEJBUUVGQUFPQ0FROEFNSUlCQ2dLQ0FRRUF0S1d3Z3FkblRZck9DditqMU1rVApXZlNIMHdDc0haWmNhOXdBVzNxUDR1dWhsQnZuYjEwSmNGZjVaanpQOUJTWEsrdEhtSTh1b04zNjh2RXY2eWhVClJITTR5dVhxekN4enVBd2tRU28zOXJ6WDhQR0M3cWRqQ043TERKM01ucWlCSXJVc1NhRVAxd3JOc0Ixa0krbzkKRVIxZTVPL3VFUEFvdFA5MzNoSFEwSjJoTUVla0hxTDdzQmxKOThoNk5tc2ljRWFVa2FyZGswVE9YcmxrakMrYwpNZDhaYkdTY1BxSTlNMzhibW4zT0x4RlRuMXZ0aHB2blhMdkNtRzRNKzZ4dFl0RCtucGNWUFp3MWkxUjkwZk1zCjdwcFpuUmJ2OEhjL0RGZE9LVlFJZ2FtNkNEZG5OS2dXN2M3SUJNclAwQUVtMzdIVHUwTFNPalAyT0hYbHZ2bFEKR1FJREFRQUIKLS0tLS1FTkQgUFVCTElDIEtFWS0tLS0tCg==").create();
        for (int i4 = 20; i4 < 30; i4++) {
            create7.send(("my-message-" + i4).getBytes());
        }
        for (int i5 = 30; i5 < 40; i5++) {
            create8.send(("my-message-" + i5).getBytes());
        }
        create7.close();
        create8.close();
        for (Reader reader2 : Lists.newArrayList(new Reader[]{create3, create4})) {
            for (int i6 = 0; i6 < 40; i6++) {
                MessageImpl readNext2 = reader2.readNext(5, TimeUnit.SECONDS);
                readNext2.getEncryptionCtx().orElseThrow(() -> {
                    return new IllegalStateException("encryption-ctx not present for encrypted message");
                });
                Assert.assertEquals(new String(readNext2.getData()), "my-message-" + i6);
            }
        }
        create3.close();
        create4.close();
    }

    @Test
    public void testSimpleReaderReachEndOfTopic() throws Exception {
        Reader create = this.pulsarClient.newReader().topic("persistent://my-property/my-ns/testSimpleReaderReachEndOfTopic").startMessageId(MessageId.earliest).create();
        Producer create2 = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/testSimpleReaderReachEndOfTopic").create();
        Assert.assertFalse(create.hasMessageAvailable());
        for (int i = 0; i < 100; i++) {
            create2.send(("my-message-" + i).getBytes());
        }
        HashSet hashSet = new HashSet();
        int i2 = 0;
        while (create.hasMessageAvailable()) {
            String str = new String(create.readNext(1, TimeUnit.SECONDS).getData());
            log.debug("Received message: [{}]", str);
            int i3 = i2;
            i2++;
            testMessageOrderAndDuplicates(hashSet, str, "my-message-" + i3);
        }
        Assert.assertEquals(i2, 100);
        Assert.assertNull(create.readNext(1, TimeUnit.SECONDS));
        for (int i4 = 100; i4 < 200; i4++) {
            create2.send(("my-message-" + i4).getBytes());
        }
        while (create.hasMessageAvailable()) {
            String str2 = new String(create.readNext(1, TimeUnit.SECONDS).getData());
            log.debug("Received message: [{}]", str2);
            int i5 = i2;
            i2++;
            testMessageOrderAndDuplicates(hashSet, str2, "my-message-" + i5);
        }
        Assert.assertEquals(i2, 200);
        Assert.assertNull(create.readNext(1, TimeUnit.SECONDS));
        create.close();
        create2.close();
    }

    @Test
    public void testSimpleMultiReaderReachEndOfTopic() throws Exception {
        this.admin.topics().createPartitionedTopic("persistent://my-property/my-ns/testSimpleMultiReaderReachEndOfTopic", 3);
        Reader create = this.pulsarClient.newReader().topic("persistent://my-property/my-ns/testSimpleMultiReaderReachEndOfTopic").startMessageId(MessageId.earliest).create();
        Producer create2 = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/testSimpleMultiReaderReachEndOfTopic").create();
        Assert.assertFalse(create.hasMessageAvailable());
        for (int i = 0; i < 100; i++) {
            create2.send(("my-message-" + i).getBytes());
        }
        HashSet hashSet = new HashSet();
        int i2 = 0;
        while (create.hasMessageAvailable()) {
            String str = new String(create.readNext(1, TimeUnit.SECONDS).getData());
            i2++;
            Assert.assertTrue(hashSet.add(str), "Received duplicate message " + str);
        }
        Assert.assertEquals(i2, 100);
        Assert.assertNull(create.readNext(1, TimeUnit.SECONDS));
        for (int i3 = 100; i3 < 200; i3++) {
            create2.send(("my-message-" + i3).getBytes());
        }
        while (create.hasMessageAvailable()) {
            String str2 = new String(create.readNext(1, TimeUnit.SECONDS).getData());
            i2++;
            Assert.assertTrue(hashSet.add(str2), "Received duplicate message " + str2);
        }
        Assert.assertEquals(i2, 200);
        Assert.assertNull(create.readNext(1, TimeUnit.SECONDS));
        create.close();
        create2.close();
    }

    @Test
    public void testReaderReachEndOfTopicOnMessageWithBatches() throws Exception {
        Reader create = this.pulsarClient.newReader().topic("persistent://my-property/my-ns/testReaderReachEndOfTopicOnMessageWithBatches").startMessageId(MessageId.earliest).create();
        Producer create2 = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/testReaderReachEndOfTopicOnMessageWithBatches").enableBatching(true).batchingMaxPublishDelay(100L, TimeUnit.MILLISECONDS).create();
        Assert.assertFalse(create.hasMessageAvailable());
        for (int i = 0; i < 100; i++) {
            create2.sendAsync(("my-message-" + i).getBytes());
        }
        create2.send("my-message-10".getBytes());
        int i2 = 0;
        Assert.assertTrue(create.hasMessageAvailable());
        if (create.hasMessageAvailable()) {
            Message readNext = create.readNext();
            Assert.assertEquals(readNext.getMessageId().getClass(), BatchMessageIdImpl.class);
            while (readNext != null) {
                i2++;
                readNext = create.readNext(100, TimeUnit.MILLISECONDS);
            }
            Assert.assertEquals(i2, 101);
        }
        Assert.assertFalse(create.hasMessageAvailable());
        create.close();
        create2.close();
    }

    @Test
    public void testMultiReaderReachEndOfTopicOnMessageWithBatches() throws Exception {
        this.admin.topics().createPartitionedTopic("persistent://my-property/my-ns/testMultiReaderReachEndOfTopicOnMessageWithBatches", 3);
        Reader create = this.pulsarClient.newReader().topic("persistent://my-property/my-ns/testMultiReaderReachEndOfTopicOnMessageWithBatches").startMessageId(MessageId.earliest).create();
        Producer create2 = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/testMultiReaderReachEndOfTopicOnMessageWithBatches").enableBatching(true).batchingMaxPublishDelay(100L, TimeUnit.MILLISECONDS).create();
        Assert.assertFalse(create.hasMessageAvailable());
        for (int i = 0; i < 100; i++) {
            create2.sendAsync(("my-message-" + i).getBytes());
        }
        create2.send("my-message-10".getBytes());
        int i2 = 0;
        Assert.assertTrue(create.hasMessageAvailable());
        if (create.hasMessageAvailable()) {
            Message readNext = create.readNext();
            Assert.assertEquals(readNext.getMessageId().getClass(), TopicMessageIdImpl.class);
            while (readNext != null) {
                i2++;
                readNext = create.readNext(100, TimeUnit.MILLISECONDS);
            }
            Assert.assertEquals(i2, 101);
        }
        Assert.assertFalse(create.hasMessageAvailable());
        create.close();
        create2.close();
    }

    @Test
    public void testMessageAvailableAfterRestart() throws Exception {
        this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/testMessageAvailableAfterRestart"}).subscriptionName("sub1").subscribe().close();
        Reader create = this.pulsarClient.newReader().topic("persistent://my-property/use/my-ns/testMessageAvailableAfterRestart").startMessageId(MessageId.earliest).create();
        try {
            Assert.assertFalse(create.hasMessageAvailable());
            if (create != null) {
                create.close();
            }
            Producer create2 = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/testMessageAvailableAfterRestart").create();
            try {
                create2.send("my-message-1".getBytes());
                if (create2 != null) {
                    create2.close();
                }
                create = this.pulsarClient.newReader().topic("persistent://my-property/use/my-ns/testMessageAvailableAfterRestart").startMessageId(MessageId.earliest).create();
                try {
                    Assert.assertTrue(create.hasMessageAvailable());
                    if (create != null) {
                        create.close();
                    }
                    ((Topic) this.pulsar.getBrokerService().getTopicReference("persistent://my-property/use/my-ns/testMessageAvailableAfterRestart").get()).close(false).get();
                    create = this.pulsarClient.newReader().topic("persistent://my-property/use/my-ns/testMessageAvailableAfterRestart").startMessageId(MessageId.earliest).create();
                    try {
                        Assert.assertTrue(create.hasMessageAvailable());
                        Assert.assertEquals("my-message-1", new String(create.readNext().getData()));
                        Assert.assertFalse(create.hasMessageAvailable());
                        if (create != null) {
                            create.close();
                        }
                    } finally {
                    }
                } finally {
                }
            } catch (Throwable th) {
                if (create2 != null) {
                    try {
                        create2.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } finally {
            if (create != null) {
                try {
                    create.close();
                } catch (Throwable th3) {
                    th.addSuppressed(th3);
                }
            }
        }
    }

    @Test
    public void testMultiReaderMessageAvailableAfterRestart() throws Exception {
        this.admin.topics().createPartitionedTopic("persistent://my-property/use/my-ns/testMessageAvailableAfterRestart2", 3);
        this.pulsarClient.newConsumer().topic(new String[]{"persistent://my-property/use/my-ns/testMessageAvailableAfterRestart2"}).subscriptionName("sub2").subscribe().close();
        Reader create = this.pulsarClient.newReader().topic("persistent://my-property/use/my-ns/testMessageAvailableAfterRestart2").startMessageId(MessageId.earliest).create();
        try {
            Assert.assertFalse(create.hasMessageAvailable());
            if (create != null) {
                create.close();
            }
            Producer create2 = this.pulsarClient.newProducer().topic("persistent://my-property/use/my-ns/testMessageAvailableAfterRestart2").create();
            try {
                create2.send("my-message-1".getBytes());
                if (create2 != null) {
                    create2.close();
                }
                create = this.pulsarClient.newReader().topic("persistent://my-property/use/my-ns/testMessageAvailableAfterRestart2").startMessageId(MessageId.earliest).create();
                try {
                    Assert.assertTrue(create.hasMessageAvailable());
                    if (create != null) {
                        create.close();
                    }
                    this.pulsar.getBrokerService().getTopics().keys().forEach(str -> {
                        try {
                            ((Topic) this.pulsar.getBrokerService().getTopicReference(str).get()).close(false).get();
                        } catch (Exception e) {
                            Assert.fail();
                        }
                    });
                    create = this.pulsarClient.newReader().topic("persistent://my-property/use/my-ns/testMessageAvailableAfterRestart2").startMessageId(MessageId.earliest).create();
                    try {
                        Assert.assertTrue(create.hasMessageAvailable());
                        Assert.assertEquals("my-message-1", new String(create.readNext().getData()));
                        Assert.assertFalse(create.hasMessageAvailable());
                        if (create != null) {
                            create.close();
                        }
                    } finally {
                    }
                } finally {
                }
            } catch (Throwable th) {
                if (create2 != null) {
                    try {
                        create2.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } finally {
            if (create != null) {
                try {
                    create.close();
                } catch (Throwable th3) {
                    th.addSuppressed(th3);
                }
            }
        }
    }

    @Test(dataProvider = "variationsForHasMessageAvailable")
    public void testHasMessageAvailable(boolean z, boolean z2) throws Exception {
        ProducerBuilder producerBuilder = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/HasMessageAvailable");
        if (z) {
            producerBuilder.enableBatching(true).batchingMaxMessages(10);
        } else {
            producerBuilder.enableBatching(false);
        }
        Producer create = producerBuilder.create();
        CountDownLatch countDownLatch = new CountDownLatch(100);
        List<MessageId> synchronizedList = Collections.synchronizedList(new ArrayList());
        for (int i = 0; i < 100; i++) {
            create.sendAsync(String.format("msg num %d", Integer.valueOf(i)).getBytes()).whenComplete((messageId, th) -> {
                if (th != null) {
                    Assert.fail();
                } else {
                    synchronizedList.add(messageId);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        synchronizedList.sort(null);
        for (MessageId messageId2 : synchronizedList) {
            Reader create2 = z2 ? this.pulsarClient.newReader().topic("persistent://my-property/my-ns/HasMessageAvailable").startMessageId(messageId2).startMessageIdInclusive().create() : this.pulsarClient.newReader().topic("persistent://my-property/my-ns/HasMessageAvailable").startMessageId(messageId2).create();
            if (z2) {
                Assert.assertTrue(create2.hasMessageAvailable());
            } else if (messageId2 != synchronizedList.get(synchronizedList.size() - 1)) {
                Assert.assertTrue(create2.hasMessageAvailable());
            } else {
                Assert.assertFalse(create2.hasMessageAvailable());
            }
            create2.close();
        }
        create.close();
    }

    @Test(timeOut = 20000)
    public void testHasMessageAvailable() throws Exception {
        Producer create = this.pulsarClient.newProducer().enableBatching(true).batchingMaxMessages(10).batchingMaxPublishDelay(2L, TimeUnit.SECONDS).topic("persistent://my-property/my-ns/testHasMessageAvailableWithBatch").create();
        MessageIdImpl send = create.send("msg".getBytes());
        Assert.assertFalse(send instanceof BatchMessageIdImpl);
        ReaderImpl create2 = this.pulsarClient.newReader().topic("persistent://my-property/my-ns/testHasMessageAvailableWithBatch").startMessageId(send).startMessageIdInclusive().create();
        MessageIdImpl lastMessageId = create2.getConsumer().getLastMessageId();
        Assert.assertFalse(lastMessageId instanceof BatchMessageIdImpl);
        Assert.assertEquals(lastMessageId.getLedgerId(), send.getLedgerId());
        Assert.assertEquals(lastMessageId.getEntryId(), send.getEntryId());
        List lastMessageIds = create2.getConsumer().getLastMessageIds();
        Assert.assertEquals(lastMessageIds.size(), 1);
        Assert.assertEquals(((TopicMessageId) lastMessageIds.get(0)).getOwnerTopic(), "persistent://my-property/my-ns/testHasMessageAvailableWithBatch");
        MessageIdAdv messageIdAdv = (MessageIdAdv) lastMessageIds.get(0);
        Assert.assertEquals(messageIdAdv.getLedgerId(), send.getLedgerId());
        Assert.assertEquals(messageIdAdv.getEntryId(), send.getEntryId());
        create2.close();
        CountDownLatch countDownLatch = new CountDownLatch(10);
        List<MessageId> synchronizedList = Collections.synchronizedList(new ArrayList());
        for (int i = 0; i < 10; i++) {
            create.sendAsync(String.format("msg num %d", Integer.valueOf(i)).getBytes()).whenComplete((messageId, th) -> {
                if (th != null) {
                    Assert.fail();
                } else {
                    synchronizedList.add(messageId);
                }
                countDownLatch.countDown();
            });
        }
        create.flush();
        countDownLatch.await();
        create.close();
        for (MessageId messageId2 : synchronizedList) {
            ReaderImpl create3 = this.pulsarClient.newReader().topic("persistent://my-property/my-ns/testHasMessageAvailableWithBatch").startMessageId(messageId2).startMessageIdInclusive().create();
            if (messageId2 instanceof BatchMessageIdImpl) {
                Assert.assertTrue(create3.getConsumer().getLastMessageId() instanceof BatchMessageIdImpl);
                log.info("id {} instance of BatchMessageIdImpl", messageId2);
            } else {
                Assert.assertTrue(messageId2 instanceof MessageIdImpl);
                Assert.assertTrue(create3.getConsumer().getLastMessageId() instanceof MessageIdImpl);
                log.info("id {} instance of MessageIdImpl", messageId2);
            }
            create3.close();
        }
        Producer create4 = this.pulsarClient.newProducer().enableBatching(false).topic("persistent://my-property/my-ns/testHasMessageAvailableWithBatch").create();
        MessageIdImpl send2 = create4.send("non-batch".getBytes());
        Assert.assertFalse(send2 instanceof BatchMessageIdImpl);
        Assert.assertTrue(send2 instanceof MessageIdImpl);
        ReaderImpl create5 = this.pulsarClient.newReader().topic("persistent://my-property/my-ns/testHasMessageAvailableWithBatch").startMessageId(send2).create();
        MessageId lastMessageId2 = create5.getConsumer().getLastMessageId();
        Assert.assertFalse(lastMessageId2 instanceof BatchMessageIdImpl);
        Assert.assertTrue(lastMessageId2 instanceof MessageIdImpl);
        Assert.assertEquals(lastMessageId2, send2);
        create4.close();
        create5.close();
    }

    @Test
    public void testReaderNonDurableIsAbleToSeekRelativeTime() throws Exception {
        Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/ReaderNonDurableIsAbleToSeekRelativeTime").create();
        for (int i = 0; i < 10; i++) {
            create.send(String.format("msg num %d", Integer.valueOf(i)).getBytes());
        }
        Reader create2 = this.pulsarClient.newReader().topic("persistent://my-property/my-ns/ReaderNonDurableIsAbleToSeekRelativeTime").startMessageId(MessageId.earliest).create();
        Assert.assertTrue(create2.hasMessageAvailable());
        create2.seek(RelativeTimeUtil.parseRelativeTimeInSeconds("-1m"));
        Assert.assertTrue(create2.hasMessageAvailable());
        create2.close();
        create.close();
    }

    @Test
    public void testMultiReaderNonDurableIsAbleToSeekRelativeTime() throws Exception {
        this.admin.topics().createPartitionedTopic("persistent://my-property/my-ns/ReaderNonDurableIsAbleToSeekRelativeTime", 3);
        Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/ReaderNonDurableIsAbleToSeekRelativeTime").create();
        for (int i = 0; i < 10; i++) {
            create.send(String.format("msg num %d", Integer.valueOf(i)).getBytes());
        }
        Reader create2 = this.pulsarClient.newReader().topic("persistent://my-property/my-ns/ReaderNonDurableIsAbleToSeekRelativeTime").startMessageId(MessageId.earliest).create();
        Assert.assertTrue(create2.hasMessageAvailable());
        create2.seek(RelativeTimeUtil.parseRelativeTimeInSeconds("-1m"));
        Assert.assertTrue(create2.hasMessageAvailable());
        create2.close();
        create.close();
    }

    @Test
    public void testReaderIsAbleToSeekWithTimeOnBeginningOfTopic() throws Exception {
        Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/ReaderSeekWithTimeOnBeginningOfTopic").create();
        for (int i = 0; i < 10; i++) {
            create.send(String.format("msg num %d", Integer.valueOf(i)).getBytes());
        }
        ReaderImpl create2 = this.pulsarClient.newReader().topic("persistent://my-property/my-ns/ReaderSeekWithTimeOnBeginningOfTopic").startMessageId(MessageId.earliest).create();
        Assert.assertTrue(create2.hasMessageAvailable());
        HashSet hashSet = new HashSet();
        for (int i2 = 0; i2 < 10; i2++) {
            testMessageOrderAndDuplicates(hashSet, new String(create2.readNext().getData()), String.format("msg num %d", Integer.valueOf(i2)));
        }
        Assert.assertFalse(create2.hasMessageAvailable());
        create2.seek(RelativeTimeUtil.parseRelativeTimeInSeconds("-1m"));
        HashSet hashSet2 = new HashSet();
        for (int i3 = 0; i3 < 10; i3++) {
            testMessageOrderAndDuplicates(hashSet2, new String(create2.readNext().getData()), String.format("msg num %d", Integer.valueOf(i3)));
        }
        Assert.assertTrue(create2.isConnected());
        Assert.assertFalse(create2.hasMessageAvailable());
        Assert.assertEquals(create2.getConsumer().numMessagesInQueue(), 0);
        create2.close();
        create.close();
    }

    @Test
    public void testMultiReaderIsAbleToSeekWithTimeOnBeginningOfTopic() throws Exception {
        this.admin.topics().createPartitionedTopic("persistent://my-property/my-ns/MultiReaderSeekWithTimeOnBeginningOfTopic", 3);
        Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/MultiReaderSeekWithTimeOnBeginningOfTopic").create();
        for (int i = 0; i < 10; i++) {
            create.send(String.format("msg num %d", Integer.valueOf(i)).getBytes());
        }
        MultiTopicsReaderImpl create2 = this.pulsarClient.newReader().topic("persistent://my-property/my-ns/MultiReaderSeekWithTimeOnBeginningOfTopic").startMessageId(MessageId.earliest).create();
        Assert.assertTrue(create2.hasMessageAvailable());
        HashSet hashSet = new HashSet();
        for (int i2 = 0; i2 < 10; i2++) {
            String str = new String(create2.readNext().getData());
            Assert.assertTrue(hashSet.add(str), "Received duplicate message " + str);
        }
        Assert.assertFalse(create2.hasMessageAvailable());
        create2.seek(RelativeTimeUtil.parseRelativeTimeInSeconds("-1m"));
        HashSet hashSet2 = new HashSet();
        for (int i3 = 0; i3 < 10; i3++) {
            String str2 = new String(create2.readNext().getData());
            Assert.assertTrue(hashSet2.add(str2), "Received duplicate message " + str2);
        }
        Assert.assertTrue(create2.isConnected());
        Assert.assertFalse(create2.hasMessageAvailable());
        Assert.assertEquals(create2.getMultiTopicsConsumer().numMessagesInQueue(), 0);
        create2.close();
        create.close();
    }

    @Test
    public void testReaderIsAbleToSeekWithMessageIdOnMiddleOfTopic() throws Exception {
        Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/ReaderSeekWithMessageIdOnMiddleOfTopic").create();
        for (int i = 0; i < 100; i++) {
            create.send(String.format("msg num %d", Integer.valueOf(i)).getBytes());
        }
        ReaderImpl create2 = this.pulsarClient.newReader().topic("persistent://my-property/my-ns/ReaderSeekWithMessageIdOnMiddleOfTopic").startMessageId(MessageId.earliest).create();
        Assert.assertTrue(create2.hasMessageAvailable());
        MessageId messageId = null;
        HashSet hashSet = new HashSet();
        for (int i2 = 0; i2 < 100; i2++) {
            Message readNext = create2.readNext();
            testMessageOrderAndDuplicates(hashSet, new String(readNext.getData()), String.format("msg num %d", Integer.valueOf(i2)));
            if (i2 == 50) {
                messageId = readNext.getMessageId();
            }
        }
        Assert.assertFalse(create2.hasMessageAvailable());
        create2.seek(messageId);
        HashSet hashSet2 = new HashSet();
        for (int i3 = 51; i3 < 100; i3++) {
            testMessageOrderAndDuplicates(hashSet2, new String(create2.readNext().getData()), String.format("msg num %d", Integer.valueOf(i3)));
        }
        Assert.assertTrue(create2.isConnected());
        Assert.assertFalse(create2.hasMessageAvailable());
        Assert.assertEquals(create2.getConsumer().numMessagesInQueue(), 0);
        create2.close();
        create.close();
    }

    @Test
    public void testReaderIsAbleToSeekWithTimeOnMiddleOfTopic() throws Exception {
        Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/ReaderIsAbleToSeekWithTimeOnMiddleOfTopic").create();
        long currentTimeMillis = System.currentTimeMillis();
        for (int i = 0; i < 10; i++) {
            create.send(String.format("msg num %d", Integer.valueOf(i)).getBytes());
            Thread.sleep(100L);
        }
        Reader create2 = this.pulsarClient.newReader().topic("persistent://my-property/my-ns/ReaderIsAbleToSeekWithTimeOnMiddleOfTopic").startMessageId(MessageId.earliest).create();
        create2.seek(currentTimeMillis + 600);
        HashSet hashSet = new HashSet();
        for (int i2 = 6; i2 < 10; i2++) {
            testMessageOrderAndDuplicates(hashSet, new String(create2.readNext().getData()), String.format("msg num %d", Integer.valueOf(i2)));
        }
        create2.close();
        create.close();
    }

    @Test
    public void testMultiReaderIsAbleToSeekWithTimeOnMiddleOfTopic() throws Exception {
        String str = "persistent://my-property/my-ns/testMultiReaderIsAbleToSeekWithTimeOnMiddleOfTopic" + System.currentTimeMillis();
        this.admin.topics().createPartitionedTopic(str, 3);
        Producer create = this.pulsarClient.newProducer().topic(str).create();
        long j = 0;
        for (int i = 0; i < 10; i++) {
            if (i == 5) {
                j = System.currentTimeMillis();
            }
            create.send(String.format("msg num %d", Integer.valueOf(i)).getBytes());
        }
        Assert.assertTrue(j != 0);
        Reader create2 = this.pulsarClient.newReader().topic(str).startMessageId(MessageId.earliest).create();
        create2.seek(j);
        HashSet hashSet = new HashSet();
        for (int i2 = 6; i2 < 10; i2++) {
            String str2 = new String(create2.readNext(10, TimeUnit.SECONDS).getData());
            Assert.assertTrue(hashSet.add(str2), "Received duplicate message " + str2);
        }
        create2.close();
        create.close();
    }

    @Test(dataProvider = "variationsForExpectedPos")
    public void testReaderStartMessageIdAtExpectedPos(boolean z, boolean z2, int i) throws Exception {
        int nextInt = new Random().nextInt(i);
        int i2 = z2 ? nextInt : nextInt + 1;
        Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/ReaderStartMessageIdAtExpectedPos").enableBatching(z).create();
        CountDownLatch countDownLatch = new CountDownLatch(i);
        AtomicReference atomicReference = new AtomicReference();
        for (int i3 = 0; i3 < i; i3++) {
            int i4 = i3;
            create.sendAsync(String.format("msg num %d", Integer.valueOf(i3)).getBytes()).thenCompose(messageId -> {
                return FutureUtils.value(Pair.of(Integer.valueOf(i4), messageId));
            }).whenComplete((pair, th) -> {
                if (th != null) {
                    Assert.fail("send msg failed due to " + th.getMessage());
                } else if (((Integer) pair.getLeft()).intValue() == nextInt) {
                    atomicReference.set((MessageId) pair.getRight());
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        ReaderBuilder startMessageId = this.pulsarClient.newReader().topic("persistent://my-property/my-ns/ReaderStartMessageIdAtExpectedPos").startMessageId((MessageId) atomicReference.get());
        if (z2) {
            startMessageId.startMessageIdInclusive();
        }
        ReaderImpl create2 = startMessageId.create();
        HashSet hashSet = new HashSet();
        for (int i5 = i2; i5 < i; i5++) {
            testMessageOrderAndDuplicates(hashSet, new String(create2.readNext().getData()), String.format("msg num %d", Integer.valueOf(i5)));
        }
        Assert.assertTrue(create2.isConnected());
        Assert.assertEquals(create2.getConsumer().numMessagesInQueue(), 0);
        Assert.assertEquals(hashSet.size(), i - i2);
        create2.close();
        create.close();
    }

    @Test
    public void testReaderBuilderConcurrentCreate() throws Exception {
        ReaderBuilder startMessageId = this.pulsarClient.newReader().startMessageId(MessageId.earliest);
        ArrayList newArrayListWithExpectedSize = Lists.newArrayListWithExpectedSize(30);
        ArrayList newArrayListWithExpectedSize2 = Lists.newArrayListWithExpectedSize(30);
        for (int i = 0; i < 30; i++) {
            newArrayListWithExpectedSize2.add(this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/testReaderBuilderConcurrentCreate_" + i).create());
        }
        for (int i2 = 0; i2 < 30; i2++) {
            newArrayListWithExpectedSize.add(startMessageId.clone().topic("persistent://my-property/my-ns/testReaderBuilderConcurrentCreate_" + i2).createAsync());
        }
        for (int i3 = 0; i3 < 30; i3++) {
            Assert.assertEquals(((Reader) ((CompletableFuture) newArrayListWithExpectedSize.get(i3)).get()).getTopic(), "persistent://my-property/my-ns/testReaderBuilderConcurrentCreate_" + i3);
            ((Reader) ((CompletableFuture) newArrayListWithExpectedSize.get(i3)).get()).close();
            ((Producer) newArrayListWithExpectedSize2.get(i3)).close();
        }
    }

    @Test(timeOut = 10000)
    public void testMultiReaderBuilderConcurrentCreate() throws Exception {
        ReaderBuilder startMessageId = this.pulsarClient.newReader().startMessageId(MessageId.earliest);
        ArrayList newArrayListWithExpectedSize = Lists.newArrayListWithExpectedSize(30);
        ArrayList newArrayListWithExpectedSize2 = Lists.newArrayListWithExpectedSize(30);
        for (int i = 0; i < 30; i++) {
            this.admin.topics().createPartitionedTopic("persistent://my-property/my-ns/testMultiReaderBuilderConcurrentCreate_" + i, 3);
            newArrayListWithExpectedSize2.add(this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/testMultiReaderBuilderConcurrentCreate_" + i).create());
        }
        for (int i2 = 0; i2 < 30; i2++) {
            newArrayListWithExpectedSize.add(startMessageId.clone().topic("persistent://my-property/my-ns/testMultiReaderBuilderConcurrentCreate_" + i2).createAsync());
        }
        for (int i3 = 0; i3 < 30; i3++) {
            Assert.assertTrue(((Reader) ((CompletableFuture) newArrayListWithExpectedSize.get(i3)).get()).getTopic().startsWith("MultiTopicsConsumer-"));
            ((Reader) ((CompletableFuture) newArrayListWithExpectedSize.get(i3)).get()).close();
            ((Producer) newArrayListWithExpectedSize2.get(i3)).close();
        }
    }

    @Test
    public void testReaderStartInMiddleOfBatch() throws Exception {
        Producer create = this.pulsarClient.newProducer().topic("persistent://my-property/my-ns/ReaderStartInMiddleOfBatch").enableBatching(true).batchingMaxMessages(10).create();
        CountDownLatch countDownLatch = new CountDownLatch(100);
        List<MessageId> synchronizedList = Collections.synchronizedList(new ArrayList());
        for (int i = 0; i < 100; i++) {
            create.sendAsync(String.format("msg num %d", Integer.valueOf(i)).getBytes()).whenComplete((messageId, th) -> {
                if (th != null) {
                    Assert.fail();
                } else {
                    synchronizedList.add(messageId);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        for (MessageId messageId2 : synchronizedList) {
            Reader create2 = this.pulsarClient.newReader().topic("persistent://my-property/my-ns/ReaderStartInMiddleOfBatch").startMessageId(messageId2).startMessageIdInclusive().create();
            Assert.assertEquals(create2.readNext().getMessageId(), messageId2);
            create2.close();
        }
        create.close();
    }

    /* JADX WARN: Finally extract failed */
    @Test
    public void testHasMessageAvailableOnEmptyTopic() throws Exception {
        String newTopicName = newTopicName();
        Reader create = this.pulsarClient.newReader(Schema.STRING).topic(newTopicName).startMessageId(MessageId.earliest).create();
        try {
            create = this.pulsarClient.newReader(Schema.STRING).topic(newTopicName).startMessageId(MessageId.latest).create();
            try {
                Reader create2 = this.pulsarClient.newReader(Schema.STRING).topic(newTopicName).startMessageId(MessageId.latest).startMessageIdInclusive().create();
                try {
                    Assert.assertFalse(create.hasMessageAvailable());
                    Assert.assertFalse(create.hasMessageAvailable());
                    Assert.assertFalse(create2.hasMessageAvailable());
                    Producer create3 = this.pulsarClient.newProducer(Schema.STRING).topic(newTopicName).create();
                    try {
                        create3.send("hello-1");
                        Assert.assertTrue(create.hasMessageAvailable());
                        Assert.assertTrue(create.hasMessageAvailable());
                        Assert.assertTrue(create2.hasMessageAvailable());
                        Reader create4 = this.pulsarClient.newReader(Schema.STRING).topic(newTopicName).startMessageId(MessageId.latest).create();
                        try {
                            Assert.assertFalse(create4.hasMessageAvailable());
                            create3.send("hello-2");
                            Assert.assertTrue(create.hasMessageAvailable());
                            Assert.assertTrue(create.hasMessageAvailable());
                            Assert.assertTrue(create2.hasMessageAvailable());
                            Assert.assertTrue(create4.hasMessageAvailable());
                            if (Collections.singletonList(create4).get(0) != null) {
                                create4.close();
                            }
                            if (Collections.singletonList(create3).get(0) != null) {
                                create3.close();
                            }
                            if (Collections.singletonList(create2).get(0) != null) {
                                create2.close();
                            }
                            if (Collections.singletonList(create).get(0) != null) {
                                create.close();
                            }
                        } finally {
                            if (Collections.singletonList(create4).get(0) != null) {
                                create4.close();
                            }
                        }
                    } catch (Throwable th) {
                        if (Collections.singletonList(create3).get(0) != null) {
                            create3.close();
                        }
                        throw th;
                    }
                } finally {
                    if (Collections.singletonList(create2).get(0) != null) {
                        create2.close();
                    }
                }
            } finally {
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
            }
        } catch (Throwable th2) {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
            throw th2;
        }
    }
}
