package org.apache.pulsar.broker.transaction.buffer.impl;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import lombok.Generated;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.TopicFactory;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/broker/transaction/buffer/impl/TransactionPersistentTopicTest.class */
public class TransactionPersistentTopicTest extends ProducerConsumerBase {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(TransactionPersistentTopicTest.class);
    private static CountDownLatch topicInitSuccessSignal = new CountDownLatch(1);

    /* loaded from: input_file:org/apache/pulsar/broker/transaction/buffer/impl/TransactionPersistentTopicTest$MyPersistentTopic.class */
    public static class MyPersistentTopic extends PersistentTopic {
        public MyPersistentTopic(String str, ManagedLedger managedLedger, BrokerService brokerService) {
            super(str, managedLedger, brokerService);
        }

        public CompletableFuture<Void> checkDeduplicationStatus() {
            TransactionPersistentTopicTest.topicInitSuccessSignal.countDown();
            Thread.sleep(1000L);
            return CompletableFuture.completedFuture(null);
        }
    }

    /* loaded from: input_file:org/apache/pulsar/broker/transaction/buffer/impl/TransactionPersistentTopicTest$MyTopicFactory.class */
    public static class MyTopicFactory implements TopicFactory {
        public <T extends Topic> T create(String str, ManagedLedger managedLedger, BrokerService brokerService, Class<T> cls) {
            try {
                return cls == NonPersistentTopic.class ? new NonPersistentTopic(str, brokerService) : new MyPersistentTopic(str, managedLedger, brokerService);
            } catch (Exception e) {
                throw new IllegalStateException(e);
            }
        }

        public void close() throws IOException {
        }
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeClass(alwaysRun = true)
    protected void setup() throws Exception {
        this.conf.setTopicFactoryClassName(MyTopicFactory.class.getName());
        this.conf.setTransactionCoordinatorEnabled(true);
        this.conf.setBrokerDeduplicationEnabled(false);
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterClass(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testNoOrphanClosedTopicIfTxnInternalFailed() {
        String newUniqueName = BrokerTestUtil.newUniqueName("persistent://public/default/tp2");
        BrokerService brokerService = this.pulsar.getBrokerService();
        TransactionBufferProvider transactionBufferProvider = topic -> {
            AbortedTxnProcessor abortedTxnProcessor = (AbortedTxnProcessor) Mockito.mock(AbortedTxnProcessor.class);
            ((AbortedTxnProcessor) Mockito.doAnswer(invocationOnMock -> {
                topicInitSuccessSignal.await();
                return CompletableFuture.failedFuture(new RuntimeException("Mock recovery failed"));
            }).when(abortedTxnProcessor)).recoverFromSnapshot();
            Mockito.when(abortedTxnProcessor.closeAsync()).thenReturn(CompletableFuture.completedFuture(null));
            return new TopicTransactionBuffer((PersistentTopic) topic, abortedTxnProcessor);
        };
        TransactionBufferProvider transactionBufferProvider2 = this.pulsar.getTransactionBufferProvider();
        this.pulsar.setTransactionBufferProvider(transactionBufferProvider);
        CompletableFuture topic2 = brokerService.getTopic(newUniqueName, true);
        Awaitility.await().ignoreExceptions().atMost(10L, TimeUnit.SECONDS).pollInterval(200L, TimeUnit.MILLISECONDS).untilAsserted(() -> {
            Assert.assertTrue(topic2.isDone());
            Assert.assertFalse(topic2.isCompletedExceptionally());
        });
        Awaitility.await().ignoreExceptions().atMost(10L, TimeUnit.SECONDS).pollInterval(500L, TimeUnit.MILLISECONDS).untilAsserted(() -> {
            Assert.assertFalse(brokerService.getTopics().containsKey(newUniqueName));
        });
        this.pulsar.setTransactionBufferProvider(transactionBufferProvider2);
    }
}
