package org.apache.pulsar.client.impl;

import java.lang.reflect.Field;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import lombok.Generated;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.ProducerAccessMode;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.TopicMetadata;
import org.apache.pulsar.client.impl.customroute.PartialRoundRobinMessageRouterImpl;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(groups = {"broker-impl"})
/* loaded from: input_file:org/apache/pulsar/client/impl/PartialPartitionedProducerTest.class */
public class PartialPartitionedProducerTest extends ProducerConsumerBase {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(PartialPartitionedProducerTest.class);

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeClass
    public void setup() throws Exception {
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterClass(alwaysRun = true)
    public void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testPtWithSinglePartition() throws Throwable {
        String newUniqueName = BrokerTestUtil.newUniqueName("pt-with-single-routing");
        this.admin.topics().createPartitionedTopic(newUniqueName, 10);
        PartitionedProducerImpl create = this.pulsarClient.newProducer().topic(newUniqueName).enableLazyStartPartitionedProducers(true).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
        for (int i = 0; i < 10; i++) {
            try {
                create.newMessage().value("msg".getBytes()).send();
            } catch (Throwable th) {
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
                throw th;
            }
        }
        Assert.assertEquals(create.getProducers().size(), 1);
        if (Collections.singletonList(create).get(0) != null) {
            create.close();
        }
    }

    @Test
    public void testPtWithPartialPartition() throws Throwable {
        String newUniqueName = BrokerTestUtil.newUniqueName("pt-with-partial-routing");
        this.admin.topics().createPartitionedTopic(newUniqueName, 10);
        PartitionedProducerImpl create = this.pulsarClient.newProducer().topic(newUniqueName).enableLazyStartPartitionedProducers(true).enableBatching(false).messageRoutingMode(MessageRoutingMode.CustomPartition).messageRouter(new PartialRoundRobinMessageRouterImpl(3)).create();
        for (int i = 0; i < 10; i++) {
            try {
                create.newMessage().value("msg".getBytes()).send();
            } catch (Throwable th) {
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
                throw th;
            }
        }
        Assert.assertEquals(create.getProducers().size(), 3);
        if (Collections.singletonList(create).get(0) != null) {
            create.close();
        }
    }

    @Test
    public void testPtLazyLoading() throws Throwable {
        String newUniqueName = BrokerTestUtil.newUniqueName("pt-lazily");
        this.admin.topics().createPartitionedTopic(newUniqueName, 10);
        PartitionedProducerImpl create = this.pulsarClient.newProducer().topic(newUniqueName).enableLazyStartPartitionedProducers(true).enableBatching(false).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
        try {
            Supplier supplier = () -> {
                for (int i = 0; i < 10; i++) {
                    try {
                        create.newMessage().value("msg".getBytes()).send();
                    } catch (Throwable th) {
                        return false;
                    }
                }
                return true;
            };
            Assert.assertEquals(create.getProducers().size(), 1);
            Assert.assertTrue(((Boolean) supplier.get()).booleanValue());
            Assert.assertEquals(create.getProducers().size(), 10);
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
            throw th;
        }
    }

    @Test
    public void testPtLoadingNotSharedMode() throws Throwable {
        String newUniqueName = BrokerTestUtil.newUniqueName("pt-not-shared-mode");
        this.admin.topics().createPartitionedTopic(newUniqueName, 10);
        PartitionedProducerImpl create = this.pulsarClient.newProducer().topic(newUniqueName).enableLazyStartPartitionedProducers(true).enableBatching(false).accessMode(ProducerAccessMode.Exclusive).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
        try {
            Assert.assertEquals(create.getProducers().size(), 10);
            create.close();
            create = this.pulsarClient.newProducer().topic(newUniqueName).enableLazyStartPartitionedProducers(true).enableBatching(false).accessMode(ProducerAccessMode.WaitForExclusive).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
            try {
                Assert.assertEquals(create.getProducers().size(), 10);
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
            } finally {
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
            }
        } catch (Throwable th) {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
            throw th;
        }
    }

    @Test
    public void testPtUpdateWithPartialPartition() throws Throwable {
        String newUniqueName = BrokerTestUtil.newUniqueName("pt-update-with-partial-routing");
        this.admin.topics().createPartitionedTopic(newUniqueName, 2);
        Field declaredField = PartitionedProducerImpl.class.getDeclaredField("topicMetadata");
        declaredField.setAccessible(true);
        PartitionedProducerImpl create = this.pulsarClient.newProducer().topic(newUniqueName).enableLazyStartPartitionedProducers(true).enableBatching(false).messageRoutingMode(MessageRoutingMode.CustomPartition).messageRouter(new PartialRoundRobinMessageRouterImpl(3)).accessMode(ProducerAccessMode.Shared).autoUpdatePartitions(true).autoUpdatePartitionsInterval(1, TimeUnit.SECONDS).create();
        try {
            Supplier supplier = () -> {
                for (int i = 0; i < 10; i++) {
                    try {
                        create.newMessage().value("msg".getBytes()).send();
                    } catch (Throwable th) {
                        return false;
                    }
                }
                return true;
            };
            Assert.assertEquals(create.getProducers().size(), 1);
            Assert.assertTrue(((Boolean) supplier.get()).booleanValue());
            Assert.assertEquals(create.getProducers().size(), 2);
            this.admin.topics().updatePartitionedTopic(newUniqueName, 3);
            Awaitility.await().untilAsserted(() -> {
                Assert.assertEquals(((TopicMetadata) declaredField.get(create)).numPartitions(), 3);
            });
            Assert.assertEquals(create.getProducers().size(), 2);
            Assert.assertTrue(((Boolean) supplier.get()).booleanValue());
            Assert.assertEquals(create.getProducers().size(), 3);
            this.admin.topics().updatePartitionedTopic(newUniqueName, 4);
            Awaitility.await().untilAsserted(() -> {
                Assert.assertEquals(((TopicMetadata) declaredField.get(create)).numPartitions(), 4);
            });
            Assert.assertTrue(((Boolean) supplier.get()).booleanValue());
            Assert.assertEquals(create.getProducers().size(), 3);
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
            throw th;
        }
    }

    @Test
    public void testPtUpdateNotSharedMode() throws Throwable {
        String newUniqueName = BrokerTestUtil.newUniqueName("pt-update-not-shared");
        this.admin.topics().createPartitionedTopic(newUniqueName, 2);
        Field declaredField = PartitionedProducerImpl.class.getDeclaredField("topicMetadata");
        declaredField.setAccessible(true);
        PartitionedProducerImpl create = this.pulsarClient.newProducer().topic(newUniqueName).enableLazyStartPartitionedProducers(true).enableBatching(false).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).accessMode(ProducerAccessMode.Exclusive).autoUpdatePartitions(true).autoUpdatePartitionsInterval(1, TimeUnit.SECONDS).create();
        try {
            Assert.assertEquals(create.getProducers().size(), 2);
            this.admin.topics().updatePartitionedTopic(newUniqueName, 3);
            Awaitility.await().untilAsserted(() -> {
                Assert.assertEquals(((TopicMetadata) declaredField.get(create)).numPartitions(), 3);
            });
            Assert.assertEquals(create.getProducers().size(), 3);
            create.close();
            create = this.pulsarClient.newProducer().topic(newUniqueName).enableLazyStartPartitionedProducers(true).enableBatching(false).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).accessMode(ProducerAccessMode.WaitForExclusive).autoUpdatePartitions(true).autoUpdatePartitionsInterval(1, TimeUnit.SECONDS).create();
            try {
                Assert.assertEquals(create.getProducers().size(), 3);
                this.admin.topics().updatePartitionedTopic(newUniqueName, 4);
                Awaitility.await().untilAsserted(() -> {
                    Assert.assertEquals(((TopicMetadata) declaredField.get(create)).numPartitions(), 4);
                });
                Assert.assertEquals(create.getProducers().size(), 4);
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
            } finally {
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
            }
        } catch (Throwable th) {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
            throw th;
        }
    }
}
