package org.apache.pulsar.broker.service;

import com.google.common.collect.Sets;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.naming.TopicVersion;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/broker/service/InactiveTopicDeleteTest.class */
public class InactiveTopicDeleteTest extends BrokerTestBase {
    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod
    protected void setup() throws Exception {
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testDeleteWhenNoSubscriptions() throws Exception {
        this.conf.setBrokerDeleteInactiveTopicsMode(InactiveTopicDeleteMode.delete_when_no_subscriptions);
        this.conf.setBrokerDeleteInactiveTopicsFrequencySeconds(1);
        super.baseSetup();
        Producer create = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/testDeleteWhenNoSubscriptions").create();
        this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/testDeleteWhenNoSubscriptions"}).subscriptionName("sub").subscribe().close();
        create.close();
        Awaitility.await().untilAsserted(() -> {
            Assert.assertTrue(this.admin.topics().getList("prop/ns-abc").contains("persistent://prop/ns-abc/testDeleteWhenNoSubscriptions"));
        });
        this.admin.topics().deleteSubscription("persistent://prop/ns-abc/testDeleteWhenNoSubscriptions", "sub");
        Awaitility.await().untilAsserted(() -> {
            Assert.assertFalse(this.admin.topics().getList("prop/ns-abc").contains("persistent://prop/ns-abc/testDeleteWhenNoSubscriptions"));
        });
    }

    @Test
    public void testDeleteAndCleanZkNode() throws Exception {
        this.conf.setBrokerDeleteInactiveTopicsMode(InactiveTopicDeleteMode.delete_when_no_subscriptions);
        this.conf.setBrokerDeleteInactivePartitionedTopicMetadataEnabled(true);
        this.conf.setBrokerDeleteInactiveTopicsFrequencySeconds(1);
        super.baseSetup();
        this.admin.topics().createPartitionedTopic("persistent://prop/ns-abc/testDeleteWhenNoSubscriptions", 5);
        this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/testDeleteWhenNoSubscriptions").create().close();
        this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/testDeleteWhenNoSubscriptions"}).subscriptionName("sub").subscribe().close();
        Awaitility.await().untilAsserted(() -> {
            Assert.assertTrue(this.admin.topics().getPartitionedTopicList("prop/ns-abc").contains("persistent://prop/ns-abc/testDeleteWhenNoSubscriptions"));
        });
        this.admin.topics().deleteSubscription("persistent://prop/ns-abc/testDeleteWhenNoSubscriptions", "sub");
        Awaitility.await().untilAsserted(() -> {
            Assert.assertFalse(this.admin.topics().getPartitionedTopicList("prop/ns-abc").contains("persistent://prop/ns-abc/testDeleteWhenNoSubscriptions"));
        });
    }

    @Test
    public void testWhenSubPartitionNotDelete() throws Exception {
        this.conf.setBrokerDeleteInactiveTopicsMode(InactiveTopicDeleteMode.delete_when_no_subscriptions);
        this.conf.setBrokerDeleteInactivePartitionedTopicMetadataEnabled(true);
        this.conf.setBrokerDeleteInactiveTopicsFrequencySeconds(1);
        super.baseSetup();
        TopicName.get("persistent://prop/ns-abc/testDeleteWhenNoSubscriptions");
        this.admin.topics().createPartitionedTopic("persistent://prop/ns-abc/testDeleteWhenNoSubscriptions", 5);
        this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/testDeleteWhenNoSubscriptions").create().close();
        this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/testDeleteWhenNoSubscriptions"}).subscriptionName("sub").subscribe().close();
        Thread.sleep(2000L);
        Assert.assertTrue(this.admin.topics().getPartitionedTopicList("prop/ns-abc").contains("persistent://prop/ns-abc/testDeleteWhenNoSubscriptions"));
        this.admin.topics().deleteSubscription("persistent://prop/ns-abc/testDeleteWhenNoSubscriptions", "sub");
        Awaitility.await().untilAsserted(() -> {
            Assert.assertFalse(this.admin.topics().getPartitionedTopicList("prop/ns-abc").contains("persistent://prop/ns-abc/testDeleteWhenNoSubscriptions"));
        });
    }

    @Test
    public void testNotEnabledDeleteZkNode() throws Exception {
        this.conf.setBrokerDeleteInactiveTopicsMode(InactiveTopicDeleteMode.delete_when_no_subscriptions);
        this.conf.setBrokerDeleteInactiveTopicsFrequencySeconds(1);
        this.conf.setBrokerDeleteInactiveTopicsEnabled(true);
        super.baseSetup();
        this.admin.topics().createPartitionedTopic("persistent://prop/ns-abc/testNotEnabledDeleteZkNode1", 5);
        this.admin.topics().createNonPartitionedTopic("persistent://prop/ns-abc/testNotEnabledDeleteZkNode2");
        this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/testNotEnabledDeleteZkNode1").create().close();
        this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/testNotEnabledDeleteZkNode2").create().close();
        this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/testNotEnabledDeleteZkNode1"}).subscriptionName("sub").subscribe().close();
        this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/testNotEnabledDeleteZkNode2"}).subscriptionName("sub2").subscribe().close();
        Awaitility.await().untilAsserted(() -> {
            Assert.assertTrue(this.admin.topics().getList("prop/ns-abc").contains("persistent://prop/ns-abc/testNotEnabledDeleteZkNode2"));
        });
        Assert.assertTrue(this.admin.topics().getPartitionedTopicList("prop/ns-abc").contains("persistent://prop/ns-abc/testNotEnabledDeleteZkNode1"));
        this.admin.topics().deleteSubscription("persistent://prop/ns-abc/testNotEnabledDeleteZkNode1", "sub");
        this.admin.topics().deleteSubscription("persistent://prop/ns-abc/testNotEnabledDeleteZkNode2", "sub2");
        Awaitility.await().untilAsserted(() -> {
            Assert.assertFalse(this.admin.topics().getList("prop/ns-abc").contains("persistent://prop/ns-abc/testNotEnabledDeleteZkNode2"));
        });
        Assert.assertTrue(this.admin.topics().getPartitionedTopicList("prop/ns-abc").contains("persistent://prop/ns-abc/testNotEnabledDeleteZkNode1"));
        Assert.assertFalse(this.admin.topics().getList("prop/ns-abc").contains("persistent://prop/ns-abc/testNotEnabledDeleteZkNode2"));
    }

    @Test(timeOut = 20000)
    public void testTopicPolicyUpdateAndClean() throws Exception {
        List<String> asList = Arrays.asList("prop/ns-abc2", "prop/ns-abc3");
        this.conf.setBrokerDeleteInactiveTopicsEnabled(true);
        this.conf.setBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(1000);
        this.conf.setBrokerDeleteInactiveTopicsMode(InactiveTopicDeleteMode.delete_when_no_subscriptions);
        InactiveTopicPolicies inactiveTopicPolicies = new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 1000, true);
        super.baseSetup();
        for (String str : asList) {
            this.admin.namespaces().createNamespace(str);
            this.admin.namespaces().setNamespaceReplicationClusters(str, Sets.newHashSet(new String[]{"test"}));
        }
        Iterator it = Arrays.asList("persistent://prop/ns-abc/testDeletePolicyUpdate", "persistent://prop/ns-abc2/testDeletePolicyUpdate", "persistent://prop/ns-abc3/testDeletePolicyUpdate").iterator();
        while (it.hasNext()) {
            this.admin.topics().createNonPartitionedTopic((String) it.next());
        }
        InactiveTopicPolicies inactiveTopicPolicies2 = new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 1, true);
        this.admin.namespaces().setInactiveTopicPolicies("prop/ns-abc", inactiveTopicPolicies2);
        inactiveTopicPolicies2.setInactiveTopicDeleteMode(InactiveTopicDeleteMode.delete_when_subscriptions_caught_up);
        this.admin.namespaces().setInactiveTopicPolicies("prop/ns-abc2", inactiveTopicPolicies2);
        inactiveTopicPolicies2.setInactiveTopicDeleteMode(InactiveTopicDeleteMode.delete_when_no_subscriptions);
        this.admin.namespaces().setInactiveTopicPolicies("prop/ns-abc3", inactiveTopicPolicies2);
        Awaitility.await().until(() -> {
            return Boolean.valueOf(((PersistentTopic) ((Optional) this.pulsar.getBrokerService().getTopic("persistent://prop/ns-abc/testDeletePolicyUpdate", false).get()).get()).getInactiveTopicPolicies().isDeleteWhileInactive());
        });
        InactiveTopicPolicies inactiveTopicPolicies3 = ((PersistentTopic) ((Optional) this.pulsar.getBrokerService().getTopic("persistent://prop/ns-abc/testDeletePolicyUpdate", false).get()).get()).getInactiveTopicPolicies();
        Assert.assertTrue(inactiveTopicPolicies3.isDeleteWhileInactive());
        Assert.assertEquals(inactiveTopicPolicies3.getInactiveTopicDeleteMode(), InactiveTopicDeleteMode.delete_when_no_subscriptions);
        Assert.assertEquals(inactiveTopicPolicies3.getMaxInactiveDurationSeconds(), 1);
        Assert.assertEquals(inactiveTopicPolicies3, this.admin.namespaces().getInactiveTopicPolicies("prop/ns-abc"));
        this.admin.namespaces().removeInactiveTopicPolicies("prop/ns-abc");
        Awaitility.await().until(() -> {
            return Boolean.valueOf(((PersistentTopic) ((Optional) this.pulsar.getBrokerService().getTopic("persistent://prop/ns-abc/testDeletePolicyUpdate", false).get()).get()).getInactiveTopicPolicies().getMaxInactiveDurationSeconds() == 1000);
        });
        Assert.assertEquals(((PersistentTopic) ((Optional) this.pulsar.getBrokerService().getTopic("persistent://prop/ns-abc/testDeletePolicyUpdate", false).get()).get()).getInactiveTopicPolicies(), inactiveTopicPolicies);
        InactiveTopicPolicies inactiveTopicPolicies4 = ((PersistentTopic) ((Optional) this.pulsar.getBrokerService().getTopic("persistent://prop/ns-abc2/testDeletePolicyUpdate", false).get()).get()).getInactiveTopicPolicies();
        Assert.assertTrue(inactiveTopicPolicies4.isDeleteWhileInactive());
        Assert.assertEquals(inactiveTopicPolicies4.getInactiveTopicDeleteMode(), InactiveTopicDeleteMode.delete_when_subscriptions_caught_up);
        Assert.assertEquals(inactiveTopicPolicies4.getMaxInactiveDurationSeconds(), 1);
        Assert.assertEquals(inactiveTopicPolicies4, this.admin.namespaces().getInactiveTopicPolicies("prop/ns-abc2"));
        this.admin.namespaces().removeInactiveTopicPolicies("prop/ns-abc2");
        Awaitility.await().until(() -> {
            return Boolean.valueOf(((PersistentTopic) ((Optional) this.pulsar.getBrokerService().getTopic("persistent://prop/ns-abc/testDeletePolicyUpdate", false).get()).get()).getInactiveTopicPolicies().getMaxInactiveDurationSeconds() == 1000);
        });
        Assert.assertEquals(((PersistentTopic) ((Optional) this.pulsar.getBrokerService().getTopic("persistent://prop/ns-abc2/testDeletePolicyUpdate", false).get()).get()).getInactiveTopicPolicies(), inactiveTopicPolicies);
    }

    @Test(timeOut = 20000)
    public void testDeleteWhenNoSubscriptionsWithMultiConfig() throws Exception {
        List<String> asList = Arrays.asList("prop/ns-abc2", "prop/ns-abc3");
        this.conf.setBrokerDeleteInactiveTopicsEnabled(true);
        this.conf.setBrokerDeleteInactiveTopicsFrequencySeconds(1);
        super.baseSetup();
        for (String str : asList) {
            this.admin.namespaces().createNamespace(str);
            this.admin.namespaces().setNamespaceReplicationClusters(str, Sets.newHashSet(new String[]{"test"}));
        }
        List<String> asList2 = Arrays.asList("persistent://prop/ns-abc/testDeleteWhenNoSubscriptionsWithMultiConfig", "persistent://prop/ns-abc2/testDeleteWhenNoSubscriptionsWithMultiConfig", "persistent://prop/ns-abc3/testDeleteWhenNoSubscriptionsWithMultiConfig");
        HashMap hashMap = new HashMap();
        for (String str2 : asList2) {
            Producer create = this.pulsarClient.newProducer().topic(str2).create();
            String str3 = "sub" + System.currentTimeMillis();
            hashMap.put(str2, str3);
            Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{str2}).subscriptionName(str3).subscribe();
            for (int i = 0; i < 10; i++) {
                create.send("Pulsar".getBytes());
            }
            subscribe.close();
            create.close();
            Thread.sleep(1L);
        }
        InactiveTopicPolicies inactiveTopicPolicies = new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 1, true);
        this.admin.namespaces().setInactiveTopicPolicies("prop/ns-abc", inactiveTopicPolicies);
        inactiveTopicPolicies.setInactiveTopicDeleteMode(InactiveTopicDeleteMode.delete_when_subscriptions_caught_up);
        this.admin.namespaces().setInactiveTopicPolicies("prop/ns-abc2", inactiveTopicPolicies);
        Awaitility.await().until(() -> {
            return Boolean.valueOf(((PersistentTopic) ((Optional) this.pulsar.getBrokerService().getTopic("persistent://prop/ns-abc/testDeleteWhenNoSubscriptionsWithMultiConfig", false).get()).get()).getInactiveTopicPolicies().isDeleteWhileInactive());
        });
        Thread.sleep(2000L);
        Assert.assertTrue(this.admin.topics().getList("prop/ns-abc").contains("persistent://prop/ns-abc/testDeleteWhenNoSubscriptionsWithMultiConfig"));
        Assert.assertTrue(this.admin.topics().getList("prop/ns-abc2").contains("persistent://prop/ns-abc2/testDeleteWhenNoSubscriptionsWithMultiConfig"));
        Assert.assertTrue(this.admin.topics().getList("prop/ns-abc3").contains("persistent://prop/ns-abc3/testDeleteWhenNoSubscriptionsWithMultiConfig"));
        this.admin.topics().skipAllMessages("persistent://prop/ns-abc2/testDeleteWhenNoSubscriptionsWithMultiConfig", (String) hashMap.remove("persistent://prop/ns-abc2/testDeleteWhenNoSubscriptionsWithMultiConfig"));
        Awaitility.await().untilAsserted(() -> {
            Assert.assertFalse(this.admin.topics().getList("prop/ns-abc2").contains("persistent://prop/ns-abc2/testDeleteWhenNoSubscriptionsWithMultiConfig"));
        });
        for (Map.Entry entry : hashMap.entrySet()) {
            this.admin.topics().deleteSubscription((String) entry.getKey(), (String) entry.getValue());
        }
        Awaitility.await().untilAsserted(() -> {
            Assert.assertFalse(this.admin.topics().getList("prop/ns-abc").contains("persistent://prop/ns-abc/testDeleteWhenNoSubscriptionsWithMultiConfig"));
        });
        Assert.assertFalse(this.admin.topics().getList("prop/ns-abc3").contains("persistent://prop/ns-abc3/testDeleteWhenNoSubscriptionsWithMultiConfig"));
    }

    @Test
    public void testDeleteWhenNoBacklogs() throws Exception {
        this.conf.setBrokerDeleteInactiveTopicsMode(InactiveTopicDeleteMode.delete_when_subscriptions_caught_up);
        this.conf.setBrokerDeleteInactiveTopicsFrequencySeconds(1);
        super.baseSetup();
        Producer create = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/testDeleteWhenNoBacklogs").create();
        Producer create2 = this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/testDeleteWhenNoBacklogsB").create();
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://prop/ns-abc/testDeleteWhenNoBacklogs"}).subscriptionName("sub").subscribe();
        Consumer subscribe2 = this.pulsarClient.newConsumer().topicsPattern("persistent://prop/ns-abc/test.*").subscriptionName("sub2").subscribe();
        for (int i = 0; i < 10; i++) {
            create.send("Pulsar".getBytes());
            create2.send("Pulsar".getBytes());
        }
        create.close();
        create2.close();
        int i2 = 0;
        while (true) {
            Message receive = subscribe2.receive(1, TimeUnit.SECONDS);
            if (receive == null) {
                Assert.assertEquals(10 * 2, i2);
                Thread.sleep(2000L);
                Assert.assertTrue(this.admin.topics().getList("prop/ns-abc").contains("persistent://prop/ns-abc/testDeleteWhenNoBacklogs"));
                this.admin.topics().skipAllMessages("persistent://prop/ns-abc/testDeleteWhenNoBacklogs", "sub");
                Awaitility.await().untilAsserted(() -> {
                    List list = this.admin.topics().getList("prop/ns-abc");
                    Assert.assertFalse(list.contains("persistent://prop/ns-abc/testDeleteWhenNoBacklogs"));
                    Assert.assertFalse(list.contains("persistent://prop/ns-abc/testDeleteWhenNoBacklogsB"));
                });
                subscribe.close();
                subscribe2.close();
                return;
            }
            subscribe2.acknowledge(receive);
            i2++;
        }
    }

    @Test
    public void testMaxInactiveDuration() throws Exception {
        this.conf.setBrokerDeleteInactiveTopicsMode(InactiveTopicDeleteMode.delete_when_subscriptions_caught_up);
        this.conf.setBrokerDeleteInactiveTopicsFrequencySeconds(1);
        this.conf.setBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(5);
        super.baseSetup();
        this.pulsarClient.newProducer().topic("persistent://prop/ns-abc/testMaxInactiveDuration").create().close();
        Thread.sleep(2000L);
        Assert.assertTrue(this.admin.topics().getList("prop/ns-abc").contains("persistent://prop/ns-abc/testMaxInactiveDuration"));
        Thread.sleep(4000L);
        Assert.assertFalse(this.admin.topics().getList("prop/ns-abc").contains("persistent://prop/ns-abc/testMaxInactiveDuration"));
        super.internalCleanup();
    }

    @Test(timeOut = 20000)
    public void testTopicLevelInActiveTopicApi() throws Exception {
        super.baseSetup();
        String str = "persistent://prop/ns-abc/testMaxInactiveDuration-" + UUID.randomUUID().toString();
        this.admin.topics().createPartitionedTopic(str, 3);
        this.pulsarClient.newConsumer().topic(new String[]{str}).subscriptionName("my-sub").subscribe().close();
        TopicName.get(str);
        Assert.assertNull(this.admin.topics().getInactiveTopicPolicies(str));
        InactiveTopicPolicies inactiveTopicPolicies = new InactiveTopicPolicies();
        inactiveTopicPolicies.setDeleteWhileInactive(true);
        inactiveTopicPolicies.setInactiveTopicDeleteMode(InactiveTopicDeleteMode.delete_when_no_subscriptions);
        inactiveTopicPolicies.setMaxInactiveDurationSeconds(10);
        this.admin.topics().setInactiveTopicPolicies(str, inactiveTopicPolicies);
        Awaitility.await().until(() -> {
            return Boolean.valueOf(this.admin.topics().getInactiveTopicPolicies(str) != null);
        });
        Assert.assertEquals(this.admin.topics().getInactiveTopicPolicies(str), inactiveTopicPolicies);
        this.admin.topics().removeInactiveTopicPolicies(str);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNull(this.admin.topics().getInactiveTopicPolicies(str));
        });
    }

    @Test(timeOut = 30000)
    public void testTopicLevelInactivePolicyUpdateAndClean() throws Exception {
        this.conf.setBrokerDeleteInactiveTopicsEnabled(true);
        this.conf.setBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(1000);
        this.conf.setBrokerDeleteInactiveTopicsMode(InactiveTopicDeleteMode.delete_when_no_subscriptions);
        InactiveTopicPolicies inactiveTopicPolicies = new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 1000, true);
        super.baseSetup();
        String str = "persistent://prop/ns-abc/testTopicLevelInactivePolicy" + UUID.randomUUID().toString();
        String str2 = "persistent://prop/ns-abc/testTopicLevelInactivePolicy" + UUID.randomUUID().toString();
        String str3 = "persistent://prop/ns-abc/testTopicLevelInactivePolicy" + UUID.randomUUID().toString();
        List<String> asList = Arrays.asList(str, str2, str3);
        Iterator it = asList.iterator();
        while (it.hasNext()) {
            this.admin.topics().createNonPartitionedTopic((String) it.next());
        }
        for (String str4 : asList) {
            this.pulsarClient.newConsumer().topic(new String[]{str4}).subscriptionName("my-sub").subscribe().close();
            TopicName.get(str4);
        }
        InactiveTopicPolicies inactiveTopicPolicies2 = new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 1, true);
        this.admin.topics().setInactiveTopicPolicies(str, inactiveTopicPolicies2);
        inactiveTopicPolicies2.setInactiveTopicDeleteMode(InactiveTopicDeleteMode.delete_when_subscriptions_caught_up);
        this.admin.topics().setInactiveTopicPolicies(str2, inactiveTopicPolicies2);
        inactiveTopicPolicies2.setInactiveTopicDeleteMode(InactiveTopicDeleteMode.delete_when_no_subscriptions);
        this.admin.topics().setInactiveTopicPolicies(str3, inactiveTopicPolicies2);
        Awaitility.await().until(() -> {
            return Boolean.valueOf(this.admin.topics().getInactiveTopicPolicies(str) != null);
        });
        InactiveTopicPolicies inactiveTopicPolicies3 = ((PersistentTopic) ((Optional) this.pulsar.getBrokerService().getTopic(str, false).get()).get()).getInactiveTopicPolicies();
        Assert.assertTrue(inactiveTopicPolicies3.isDeleteWhileInactive());
        Assert.assertEquals(inactiveTopicPolicies3.getInactiveTopicDeleteMode(), InactiveTopicDeleteMode.delete_when_no_subscriptions);
        Assert.assertEquals(inactiveTopicPolicies3.getMaxInactiveDurationSeconds(), 1);
        Assert.assertEquals(inactiveTopicPolicies3, this.admin.topics().getInactiveTopicPolicies(str));
        this.admin.topics().removeInactiveTopicPolicies(str);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(((PersistentTopic) ((Optional) this.pulsar.getBrokerService().getTopic(str, false).get()).get()).getInactiveTopicPolicies(), inactiveTopicPolicies);
        });
        InactiveTopicPolicies inactiveTopicPolicies4 = ((PersistentTopic) ((Optional) this.pulsar.getBrokerService().getTopic(str2, false).get()).get()).getInactiveTopicPolicies();
        Assert.assertTrue(inactiveTopicPolicies4.isDeleteWhileInactive());
        Assert.assertEquals(inactiveTopicPolicies4.getInactiveTopicDeleteMode(), InactiveTopicDeleteMode.delete_when_subscriptions_caught_up);
        Assert.assertEquals(inactiveTopicPolicies4.getMaxInactiveDurationSeconds(), 1);
        Assert.assertEquals(inactiveTopicPolicies4, this.admin.topics().getInactiveTopicPolicies(str2));
        inactiveTopicPolicies2.setMaxInactiveDurationSeconds(999);
        this.admin.namespaces().setInactiveTopicPolicies("prop/ns-abc", inactiveTopicPolicies2);
        Awaitility.await().until(() -> {
            return Boolean.valueOf(inactiveTopicPolicies2.equals(((PersistentTopic) ((Optional) this.pulsar.getBrokerService().getTopic(str, false).get()).get()).getInactiveTopicPolicies()));
        });
        this.admin.topics().removeInactiveTopicPolicies(str2);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(((PersistentTopic) ((Optional) this.pulsar.getBrokerService().getTopic(str2, false).get()).get()).getInactiveTopicPolicies().getMaxInactiveDurationSeconds(), 999);
        });
    }

    @Test(timeOut = 30000)
    public void testDeleteWhenNoSubscriptionsWithTopicLevelPolicies() throws Exception {
        this.conf.setBrokerDeleteInactiveTopicsEnabled(true);
        this.conf.setBrokerDeleteInactiveTopicsFrequencySeconds(1);
        super.baseSetup();
        String str = "persistent://prop/ns-abc/test-" + String.valueOf(UUID.randomUUID());
        String str2 = "persistent://prop/ns-abc/test-" + String.valueOf(UUID.randomUUID());
        String str3 = "persistent://prop/ns-abc/test-" + String.valueOf(UUID.randomUUID());
        List<String> asList = Arrays.asList(str, str2, str3);
        HashMap hashMap = new HashMap();
        for (String str4 : asList) {
            Producer create = this.pulsarClient.newProducer().topic(str4).create();
            String str5 = "sub" + System.currentTimeMillis();
            hashMap.put(str4, str5);
            Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{str4}).subscriptionName(str5).subscribe();
            for (int i = 0; i < 10; i++) {
                create.send("Pulsar".getBytes());
            }
            subscribe.close();
            create.close();
            Thread.sleep(1L);
        }
        InactiveTopicPolicies inactiveTopicPolicies = new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 1, true);
        this.admin.topics().setInactiveTopicPolicies(str, inactiveTopicPolicies);
        inactiveTopicPolicies.setInactiveTopicDeleteMode(InactiveTopicDeleteMode.delete_when_subscriptions_caught_up);
        this.admin.topics().setInactiveTopicPolicies(str2, inactiveTopicPolicies);
        Awaitility.await().until(() -> {
            return Boolean.valueOf(this.admin.topics().getInactiveTopicPolicies(str2) != null);
        });
        Thread.sleep(2000L);
        Assert.assertTrue(this.admin.topics().getList("prop/ns-abc").contains(str));
        Assert.assertTrue(this.admin.topics().getList("prop/ns-abc").contains(str2));
        Assert.assertTrue(this.admin.topics().getList("prop/ns-abc").contains(str3));
        this.admin.topics().skipAllMessages(str2, (String) hashMap.remove(str2));
        Awaitility.await().untilAsserted(() -> {
            Assert.assertFalse(this.admin.topics().getList("prop/ns-abc").contains(str2));
        });
        for (Map.Entry entry : hashMap.entrySet()) {
            this.admin.topics().deleteSubscription((String) entry.getKey(), (String) entry.getValue());
        }
        Awaitility.await().untilAsserted(() -> {
            Assert.assertFalse(this.admin.topics().getList("prop/ns-abc").contains(str3));
        });
        Assert.assertFalse(this.admin.topics().getList("prop/ns-abc").contains(str));
    }

    @Test(timeOut = 30000)
    public void testInactiveTopicApplied() throws Exception {
        super.baseSetup();
        String str = "persistent://prop/ns-abc/test-" + String.valueOf(UUID.randomUUID());
        this.pulsarClient.newProducer().topic(str).create().close();
        Assert.assertNull(this.admin.namespaces().getInactiveTopicPolicies("prop/ns-abc"));
        Assert.assertNull(this.admin.topics().getInactiveTopicPolicies(str));
        InactiveTopicPolicies inactiveTopicPolicies = new InactiveTopicPolicies(this.conf.getBrokerDeleteInactiveTopicsMode(), this.conf.getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(), this.conf.isBrokerDeleteInactiveTopicsEnabled());
        Assert.assertEquals(this.admin.topics().getInactiveTopicPolicies(str, true), inactiveTopicPolicies);
        InactiveTopicPolicies inactiveTopicPolicies2 = new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 20, false);
        this.admin.namespaces().setInactiveTopicPolicies("prop/ns-abc", inactiveTopicPolicies2);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNotNull(this.admin.namespaces().getInactiveTopicPolicies("prop/ns-abc"));
        });
        InactiveTopicPolicies inactiveTopicPolicies3 = this.admin.topics().getInactiveTopicPolicies(str, true);
        Assert.assertEquals(inactiveTopicPolicies3.getMaxInactiveDurationSeconds(), 20);
        Assert.assertFalse(inactiveTopicPolicies3.isDeleteWhileInactive());
        Assert.assertEquals(inactiveTopicPolicies3.getInactiveTopicDeleteMode(), InactiveTopicDeleteMode.delete_when_no_subscriptions);
        this.admin.topics().setInactiveTopicPolicies(str, new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_subscriptions_caught_up, 30, false));
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNotNull(this.admin.topics().getInactiveTopicPolicies(str));
        });
        InactiveTopicPolicies inactiveTopicPolicies4 = this.admin.topics().getInactiveTopicPolicies(str, true);
        Assert.assertEquals(inactiveTopicPolicies4.getMaxInactiveDurationSeconds(), 30);
        Assert.assertFalse(inactiveTopicPolicies4.isDeleteWhileInactive());
        Assert.assertEquals(inactiveTopicPolicies4.getInactiveTopicDeleteMode(), InactiveTopicDeleteMode.delete_when_subscriptions_caught_up);
        this.admin.topics().removeInactiveTopicPolicies(str);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin.topics().getInactiveTopicPolicies(str, true), inactiveTopicPolicies2);
        });
        this.admin.namespaces().removeInactiveTopicPolicies("prop/ns-abc");
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(this.admin.topics().getInactiveTopicPolicies(str, true), inactiveTopicPolicies);
        });
    }

    @Test(timeOut = 30000)
    public void testHealthTopicInactiveNotClean() throws Exception {
        this.conf.setBrokerDeleteInactiveTopicsMode(InactiveTopicDeleteMode.delete_when_no_subscriptions);
        this.conf.setBrokerDeleteInactiveTopicsFrequencySeconds(1);
        super.baseSetup();
        String str = "persistent://" + String.valueOf(NamespaceService.getHeartbeatNamespace(this.pulsar.getBrokerId(), this.pulsar.getConfig())) + "/healthcheck";
        String str2 = "persistent://" + String.valueOf(NamespaceService.getHeartbeatNamespaceV2(this.pulsar.getBrokerId(), this.pulsar.getConfig())) + "/healthcheck";
        this.admin.brokers().healthcheck(TopicVersion.V1);
        this.admin.brokers().healthcheck(TopicVersion.V2);
        List list = (List) this.pulsar.getPulsarResources().getTopicResources().getExistingPartitions(TopicName.get(str)).get(10L, TimeUnit.SECONDS);
        List list2 = (List) this.pulsar.getPulsarResources().getTopicResources().getExistingPartitions(TopicName.get(str2)).get(10L, TimeUnit.SECONDS);
        Assert.assertTrue(list.contains(str));
        Assert.assertTrue(list2.contains(str2));
    }

    @Test
    public void testDynamicConfigurationBrokerDeleteInactiveTopicsEnabled() throws Exception {
        this.conf.setBrokerDeleteInactiveTopicsEnabled(true);
        super.baseSetup();
        this.admin.brokers().updateDynamicConfiguration("brokerDeleteInactiveTopicsEnabled", "false");
        Awaitility.await().atMost(2L, TimeUnit.SECONDS).untilAsserted(() -> {
            Assert.assertFalse(this.conf.isBrokerDeleteInactiveTopicsEnabled());
        });
    }

    @Test
    public void testDynamicConfigurationBrokerDeleteInactiveTopicsFrequencySeconds() throws Exception {
        this.conf.setBrokerDeleteInactiveTopicsFrequencySeconds(30);
        super.baseSetup();
        this.admin.brokers().updateDynamicConfiguration("brokerDeleteInactiveTopicsFrequencySeconds", "60");
        Awaitility.await().atMost(2L, TimeUnit.SECONDS).untilAsserted(() -> {
            Assert.assertEquals(this.conf.getBrokerDeleteInactiveTopicsFrequencySeconds(), 60);
        });
    }

    @Test
    public void testDynamicConfigurationBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds() throws Exception {
        this.conf.setBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(30);
        super.baseSetup();
        this.admin.brokers().updateDynamicConfiguration("brokerDeleteInactiveTopicsMaxInactiveDurationSeconds", "60");
        Awaitility.await().atMost(2L, TimeUnit.SECONDS).untilAsserted(() -> {
            Assert.assertEquals(this.conf.getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(), 60);
        });
    }

    @Test
    public void testDynamicConfigurationBrokerDeleteInactiveTopicsMode() throws Exception {
        this.conf.setBrokerDeleteInactiveTopicsMode(InactiveTopicDeleteMode.delete_when_no_subscriptions);
        super.baseSetup();
        String inactiveTopicDeleteMode = InactiveTopicDeleteMode.delete_when_subscriptions_caught_up.toString();
        this.admin.brokers().updateDynamicConfiguration("brokerDeleteInactiveTopicsMode", inactiveTopicDeleteMode);
        Awaitility.await().atMost(2L, TimeUnit.SECONDS).untilAsserted(() -> {
            Assert.assertEquals(this.conf.getBrokerDeleteInactiveTopicsMode().toString(), inactiveTopicDeleteMode);
        });
    }

    @Test
    public void testBrokerDeleteInactivePartitionedTopicMetadataEnabled() throws Exception {
        this.conf.setBrokerDeleteInactivePartitionedTopicMetadataEnabled(false);
        super.baseSetup();
        this.admin.brokers().updateDynamicConfiguration("brokerDeleteInactivePartitionedTopicMetadataEnabled", "true");
        Awaitility.await().atMost(2L, TimeUnit.SECONDS).untilAsserted(() -> {
            Assert.assertTrue(this.conf.isBrokerDeleteInactivePartitionedTopicMetadataEnabled());
        });
    }
}
