package org.apache.pulsar.client.impl;

import java.nio.charset.StandardCharsets;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.Generated;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.Producer;
import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.HandlerState;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(groups = {"broker-api"})
/* loaded from: input_file:org/apache/pulsar/client/impl/ProducerReconnectionTest.class */
public class ProducerReconnectionTest extends ProducerConsumerBase {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(ProducerReconnectionTest.class);

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeClass(alwaysRun = true)
    protected void setup() throws Exception {
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterClass(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test
    public void testConcurrencyReconnectAndClose() throws Exception {
        String newUniqueName = BrokerTestUtil.newUniqueName("persistent://public/default/tp_");
        this.admin.topics().createNonPartitionedTopic(newUniqueName);
        PulsarClientImpl pulsarClientImpl = this.pulsarClient;
        ProducerBuilderImpl producerBuilderImpl = pulsarClientImpl.newProducer().blockIfQueueFull(false).maxPendingMessages(1).producerName("p1").enableBatching(true).topic(newUniqueName);
        CompletableFuture completableFuture = new CompletableFuture();
        final AtomicBoolean atomicBoolean = new AtomicBoolean();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final CountDownLatch countDownLatch2 = new CountDownLatch(1);
        ProducerImpl<byte[]> producerImpl = new ProducerImpl<byte[]>(pulsarClientImpl, newUniqueName, producerBuilderImpl.getConf(), completableFuture, -1, Schema.BYTES, null, Optional.empty()) { // from class: org.apache.pulsar.client.impl.ProducerReconnectionTest.1
            ConnectionHandler initConnectionHandler() {
                ConnectionHandler connectionHandler = (ConnectionHandler) Mockito.spy(super.initConnectionHandler());
                AtomicBoolean atomicBoolean2 = atomicBoolean;
                CountDownLatch countDownLatch3 = countDownLatch;
                CountDownLatch countDownLatch4 = countDownLatch2;
                ((ConnectionHandler) Mockito.doAnswer(invocationOnMock -> {
                    boolean booleanValue = ((Boolean) invocationOnMock.callRealMethod()).booleanValue();
                    if (atomicBoolean2.get()) {
                        ProducerReconnectionTest.log.info("[testConcurrencyReconnectAndClose] verified state for reconnection");
                        countDownLatch3.countDown();
                        countDownLatch4.await();
                        ProducerReconnectionTest.log.info("[testConcurrencyReconnectAndClose] reconnected");
                    }
                    return Boolean.valueOf(booleanValue);
                }).when(connectionHandler)).isValidStateForReconnection();
                return connectionHandler;
            }
        };
        log.info("[testConcurrencyReconnectAndClose] producer created");
        completableFuture.get(5L, TimeUnit.SECONDS);
        log.info("[testConcurrencyReconnectAndClose] trigger a reconnection");
        ServerCnx cnx = ((Producer) ((Topic) ((Optional) this.pulsar.getBrokerService().getTopic(newUniqueName, false).join()).get()).getProducers().values().iterator().next()).getCnx();
        atomicBoolean.set(true);
        cnx.ctx().close();
        producerImpl.sendAsync("1".getBytes(StandardCharsets.UTF_8));
        Awaitility.await().untilAsserted(() -> {
            Assert.assertNotEquals(Integer.valueOf(producerImpl.getPendingQueueSize()), 0);
        });
        countDownLatch.await();
        log.info("[testConcurrencyReconnectAndClose] producer close");
        producerImpl.closeAsync();
        Awaitility.await().untilAsserted(() -> {
            HandlerState.State state = producerImpl.getState();
            Assert.assertTrue(state == HandlerState.State.Closed || state == HandlerState.State.Closing);
        });
        countDownLatch2.countDown();
        Thread.sleep(3000L);
        HandlerState.State state = producerImpl.getState();
        log.info("producer state: {}", state);
        Assert.assertTrue(state == HandlerState.State.Closed || state == HandlerState.State.Closing);
        Assert.assertEquals(producerImpl.getPendingQueueSize(), 0);
        producerImpl.close();
        this.admin.topics().delete(newUniqueName);
    }
}
