package org.apache.pulsar.broker.service;

import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.NetworkErrorTestBase;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/broker/service/ZkSessionExpireTest.class */
public class ZkSessionExpireTest extends NetworkErrorTestBase {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(ZkSessionExpireTest.class);
    private Consumer<ServiceConfiguration> settings;

    @Override // org.apache.pulsar.broker.service.NetworkErrorTestBase
    @AfterMethod(alwaysRun = true)
    public void cleanup() throws Exception {
        super.cleanup();
    }

    private void setupWithSettings(Consumer<ServiceConfiguration> consumer) throws Exception {
        this.settings = consumer;
        super.setup();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.pulsar.broker.service.NetworkErrorTestBase
    public void setConfigDefaults(ServiceConfiguration serviceConfiguration, String str, int i) {
        super.setConfigDefaults(serviceConfiguration, str, i);
        this.settings.accept(serviceConfiguration);
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "settings")
    public Object[][] settings() {
        return new Object[]{new Object[]{false, NetworkErrorTestBase.PreferBrokerModularLoadManager.class}, new Object[]{true, NetworkErrorTestBase.PreferBrokerModularLoadManager.class}};
    }

    @Test(timeOut = 600000, dataProvider = "settings")
    public void testTopicUnloadAfterSessionRebuild(boolean z, Class cls) throws Exception {
        setupWithSettings(serviceConfiguration -> {
            serviceConfiguration.setManagedLedgerMaxEntriesPerLedger(1);
            serviceConfiguration.setSystemTopicEnabled(z);
            serviceConfiguration.setTopicLevelPoliciesEnabled(z);
            if (cls != null) {
                serviceConfiguration.setLoadManagerClassName(cls.getName());
            }
        });
        String newUniqueName = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
        this.admin1.topics().createNonPartitionedTopic(newUniqueName);
        this.admin1.topics().createSubscription(newUniqueName, "s1", MessageId.earliest);
        setPreferBroker(this.pulsar1);
        this.admin1.namespaces().unload("public/default");
        this.admin2.namespaces().unload("public/default");
        Awaitility.await().atMost(Duration.ofSeconds(20L)).untilAsserted(() -> {
            Assert.assertEquals(getAvailableBrokers(this.pulsar1).size(), 2);
            Assert.assertEquals(getAvailableBrokers(this.pulsar2).size(), 2);
        });
        ProducerImpl create = this.client1.newProducer(Schema.STRING).topic(newUniqueName).sendTimeout(10, TimeUnit.SECONDS).create();
        Assert.assertNotNull((Topic) ((Optional) this.pulsar1.getBrokerService().getTopic(newUniqueName, false).join()).get());
        clearPreferBroker();
        this.metadataZKProxy.rejectAllConnections();
        this.metadataZKProxy.disconnectFrontChannels();
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(getAvailableBrokers(this.pulsar2).size(), 1);
        });
        CompletableFuture sendAsync = create.sendAsync("broker1_msg1");
        Producer create2 = this.client2.newProducer(Schema.STRING).topic(newUniqueName).sendTimeout(10, TimeUnit.SECONDS).create();
        CompletableFuture sendAsync2 = create2.sendAsync("broker2_msg1");
        Awaitility.await().untilAsserted(() -> {
            CompletableFuture topic = this.pulsar2.getBrokerService().getTopic(newUniqueName, false);
            Assert.assertNotNull(topic);
            Assert.assertTrue(topic.isDone() && !topic.isCompletedExceptionally());
            Optional optional = (Optional) topic.join();
            Assert.assertTrue((optional == null || optional.isEmpty()) ? false : true);
        });
        Topic topic = (Topic) ((Optional) this.pulsar1.getBrokerService().getTopic(newUniqueName, false).join()).get();
        Topic topic2 = (Topic) ((Optional) this.pulsar2.getBrokerService().getTopic(newUniqueName, false).join()).get();
        Assert.assertNotNull(topic);
        Assert.assertNotNull(topic2);
        CompletableFuture sendAsync3 = create.sendAsync("broker1_msg2");
        CompletableFuture sendAsync4 = create2.sendAsync("broker2_msg2");
        try {
            sendAsync.join();
            sendAsync3.join();
            create.getClientCnx();
            Assert.fail("expected a publish error");
        } catch (Exception e) {
        }
        sendAsync2.join();
        sendAsync4.join();
        this.metadataZKProxy.unRejectAllConnections();
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals(getAvailableBrokers(this.pulsar1).size(), 2);
            Assert.assertEquals(getAvailableBrokers(this.pulsar2).size(), 2);
        });
        Awaitility.await().atMost(Duration.ofSeconds(10L)).untilAsserted(() -> {
            CompletableFuture topic3 = this.pulsar1.getBrokerService().getTopic(newUniqueName, false);
            log.info("broker 1 topics {}", this.pulsar1.getBrokerService().getTopics().keys());
            log.info("broker 2 topics {}", this.pulsar2.getBrokerService().getTopics().keys());
            log.info("broker 1 bundles {}", this.pulsar1.getNamespaceService().getOwnershipCache().getOwnedBundles().keySet().stream().map(namespaceBundle -> {
                return namespaceBundle.getNamespaceObject().toString() + "/" + namespaceBundle.getBundleRange();
            }).filter(str -> {
                return str.contains("public/default");
            }).collect(Collectors.toList()));
            log.info("broker 2 bundles {}", this.pulsar2.getNamespaceService().getOwnershipCache().getOwnedBundles().keySet().stream().map(namespaceBundle2 -> {
                return namespaceBundle2.getNamespaceObject().toString() + "/" + namespaceBundle2.getBundleRange();
            }).filter(str2 -> {
                return str2.contains("public/default");
            }).collect(Collectors.toList()));
            Logger logger = log;
            Object[] objArr = new Object[4];
            objArr[0] = topic3;
            objArr[1] = topic3 == null ? "null" : Boolean.valueOf(topic3.isDone());
            objArr[2] = topic3;
            objArr[3] = topic3 == null ? "null" : Boolean.valueOf(topic3.isCompletedExceptionally());
            logger.info("future: {}, isDone: {}, isCompletedExceptionally: {}", objArr);
            Assert.assertTrue(topic3 == null || !this.pulsar1.getBrokerService().getTopics().containsKey(newUniqueName) || (topic3.isDone() && !topic3.isCompletedExceptionally() && ((Optional) topic3.get()).isEmpty()) || topic3.isCompletedExceptionally());
        });
        Assert.assertNotNull((Topic) ((Optional) this.pulsar2.getBrokerService().getTopic(newUniqueName, false).join()).get());
        CompletableFuture sendAsync5 = create.sendAsync("broker1_msg3");
        CompletableFuture sendAsync6 = create2.sendAsync("broker2_msg3");
        sendAsync5.join();
        sendAsync6.join();
        log.info("msgBacklog: {}", Long.valueOf(((SubscriptionStats) this.admin2.topics().getStats(newUniqueName).getSubscriptions().get("s1")).getMsgBacklog()));
        create.close();
        create2.close();
        this.admin2.topics().delete(newUniqueName, false);
    }
}
