package org.springframework.kafka.listener;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.event.ConcurrentContainerStoppedEvent;
import org.springframework.kafka.event.ConsumerStoppedEvent;
import org.springframework.kafka.support.TopicPartitionOffset;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;

/* loaded from: input_file:BOOT-INF/lib/spring-kafka-3.3.2.jar:org/springframework/kafka/listener/ConcurrentMessageListenerContainer.class */
public class ConcurrentMessageListenerContainer<K, V> extends AbstractMessageListenerContainer<K, V> {
    private final List<KafkaMessageListenerContainer<K, V>> containers;
    private final List<AsyncTaskExecutor> executors;
    private final AtomicInteger startedContainers;
    private int concurrency;
    private boolean alwaysClientIdSuffix;
    private volatile ConsumerStoppedEvent.Reason reason;

    public ConcurrentMessageListenerContainer(ConsumerFactory<? super K, ? super V> consumerFactory, ContainerProperties containerProperties) {
        super(consumerFactory, containerProperties);
        this.containers = new ArrayList();
        this.executors = new ArrayList();
        this.startedContainers = new AtomicInteger();
        this.concurrency = 1;
        this.alwaysClientIdSuffix = true;
        Assert.notNull(consumerFactory, "A ConsumerFactory must be provided");
    }

    public int getConcurrency() {
        return this.concurrency;
    }

    public void setConcurrency(int i) {
        Assert.isTrue(i > 0, "concurrency must be greater than 0");
        this.concurrency = i;
    }

    public void setAlwaysClientIdSuffix(boolean z) {
        this.alwaysClientIdSuffix = z;
    }

    public List<KafkaMessageListenerContainer<K, V>> getContainers() {
        this.lifecycleLock.lock();
        try {
            return List.copyOf(this.containers);
        } finally {
            this.lifecycleLock.unlock();
        }
    }

    @Override // org.springframework.kafka.listener.MessageListenerContainer
    public MessageListenerContainer getContainerFor(String str, int i) {
        this.lifecycleLock.lock();
        try {
            for (KafkaMessageListenerContainer<K, V> kafkaMessageListenerContainer : this.containers) {
                Collection<TopicPartition> assignedPartitions = kafkaMessageListenerContainer.getAssignedPartitions();
                if (assignedPartitions != null) {
                    for (TopicPartition topicPartition : assignedPartitions) {
                        if (topicPartition.topic().equals(str) && topicPartition.partition() == i) {
                            return kafkaMessageListenerContainer;
                        }
                    }
                }
            }
            this.lifecycleLock.unlock();
            return this;
        } finally {
            this.lifecycleLock.unlock();
        }
    }

    @Override // org.springframework.kafka.listener.MessageListenerContainer
    public Collection<TopicPartition> getAssignedPartitions() {
        this.lifecycleLock.lock();
        try {
            return this.containers.stream().map((v0) -> {
                return v0.getAssignedPartitions();
            }).filter((v0) -> {
                return Objects.nonNull(v0);
            }).flatMap((v0) -> {
                return v0.stream();
            }).toList();
        } finally {
            this.lifecycleLock.unlock();
        }
    }

    @Override // org.springframework.kafka.listener.MessageListenerContainer
    public Map<String, Collection<TopicPartition>> getAssignmentsByClientId() {
        this.lifecycleLock.lock();
        try {
            HashMap hashMap = new HashMap();
            this.containers.forEach(kafkaMessageListenerContainer -> {
                Map<String, Collection<TopicPartition>> assignmentsByClientId = kafkaMessageListenerContainer.getAssignmentsByClientId();
                if (assignmentsByClientId != null) {
                    hashMap.putAll(assignmentsByClientId);
                }
            });
            return hashMap;
        } finally {
            this.lifecycleLock.unlock();
        }
    }

    @Override // org.springframework.kafka.listener.MessageListenerContainer
    public boolean isContainerPaused() {
        this.lifecycleLock.lock();
        try {
            boolean isPauseRequested = isPauseRequested();
            if (isPauseRequested) {
                Iterator<KafkaMessageListenerContainer<K, V>> it = this.containers.iterator();
                while (it.hasNext()) {
                    if (!it.next().isContainerPaused()) {
                        return false;
                    }
                }
            }
            this.lifecycleLock.unlock();
            return isPauseRequested;
        } finally {
            this.lifecycleLock.unlock();
        }
    }

    @Override // org.springframework.kafka.listener.MessageListenerContainer
    public boolean isChildRunning() {
        this.lifecycleLock.lock();
        try {
            Iterator<KafkaMessageListenerContainer<K, V>> it = this.containers.iterator();
            while (it.hasNext()) {
                if (it.next().isRunning()) {
                    return true;
                }
            }
            if (this.startedContainers.get() > 0) {
                this.lifecycleLock.unlock();
                return true;
            }
            this.lifecycleLock.unlock();
            return false;
        } finally {
            this.lifecycleLock.unlock();
        }
    }

    @Override // org.springframework.kafka.listener.MessageListenerContainer
    public Map<String, Map<MetricName, ? extends Metric>> metrics() {
        this.lifecycleLock.lock();
        try {
            HashMap hashMap = new HashMap();
            Iterator<KafkaMessageListenerContainer<K, V>> it = this.containers.iterator();
            while (it.hasNext()) {
                hashMap.putAll(it.next().metrics());
            }
            Map<String, Map<MetricName, ? extends Metric>> unmodifiableMap = Collections.unmodifiableMap(hashMap);
            this.lifecycleLock.unlock();
            return unmodifiableMap;
        } catch (Throwable th) {
            this.lifecycleLock.unlock();
            throw th;
        }
    }

    @Override // org.springframework.kafka.listener.AbstractMessageListenerContainer
    protected void doStart() {
        if (isRunning()) {
            return;
        }
        checkTopics();
        ContainerProperties containerProperties = getContainerProperties();
        TopicPartitionOffset[] topicPartitions = containerProperties.getTopicPartitions();
        if (topicPartitions != null && this.concurrency > topicPartitions.length) {
            this.logger.warn(() -> {
                return "When specific partitions are provided, the concurrency must be less than or equal to the number of partitions; reduced from " + this.concurrency + " to " + topicPartitions.length;
            });
            this.concurrency = topicPartitions.length;
        }
        clearState();
        setRunning(true);
        for (int i = 0; i < this.concurrency; i++) {
            KafkaMessageListenerContainer<K, V> constructContainer = constructContainer(containerProperties, topicPartitions, i);
            configureChildContainer(i, constructContainer);
            if (isPauseRequested()) {
                constructContainer.pause();
            }
            constructContainer.start();
            this.containers.add(constructContainer);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v37, types: [org.springframework.core.task.AsyncTaskExecutor] */
    private void configureChildContainer(int i, KafkaMessageListenerContainer<K, V> kafkaMessageListenerContainer) {
        SimpleAsyncTaskExecutor simpleAsyncTaskExecutor;
        String beanName = getBeanName();
        String str = (beanName == null ? "consumer" : beanName) + "-" + i;
        kafkaMessageListenerContainer.setBeanName(str);
        ApplicationContext applicationContext = getApplicationContext();
        if (applicationContext != null) {
            kafkaMessageListenerContainer.setApplicationContext(applicationContext);
        }
        ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();
        if (applicationEventPublisher != null) {
            kafkaMessageListenerContainer.setApplicationEventPublisher(applicationEventPublisher);
        }
        kafkaMessageListenerContainer.setClientIdSuffix((this.concurrency > 1 || this.alwaysClientIdSuffix) ? "-" + i : "");
        kafkaMessageListenerContainer.setCommonErrorHandler(getCommonErrorHandler());
        kafkaMessageListenerContainer.setAfterRollbackProcessor(getAfterRollbackProcessor());
        kafkaMessageListenerContainer.setRecordInterceptor(getRecordInterceptor());
        kafkaMessageListenerContainer.setBatchInterceptor(getBatchInterceptor());
        kafkaMessageListenerContainer.setInterceptBeforeTx(isInterceptBeforeTx());
        kafkaMessageListenerContainer.setListenerInfo(getListenerInfo());
        kafkaMessageListenerContainer.setEmergencyStop(() -> {
            stopAbnormally(() -> {
            });
        });
        if (kafkaMessageListenerContainer.getContainerProperties().getListenerTaskExecutor() == null) {
            if (this.executors.size() > i) {
                simpleAsyncTaskExecutor = this.executors.get(i);
            } else {
                simpleAsyncTaskExecutor = new SimpleAsyncTaskExecutor(str + "-C-");
                this.executors.add(simpleAsyncTaskExecutor);
            }
            kafkaMessageListenerContainer.getContainerProperties().setListenerTaskExecutor(simpleAsyncTaskExecutor);
        }
    }

    private KafkaMessageListenerContainer<K, V> constructContainer(ContainerProperties containerProperties, @Nullable TopicPartitionOffset[] topicPartitionOffsetArr, int i) {
        return topicPartitionOffsetArr == null ? new KafkaMessageListenerContainer<>(this, this.consumerFactory, containerProperties) : new KafkaMessageListenerContainer<>(this, this.consumerFactory, containerProperties, partitionSubset(containerProperties, i));
    }

    @Nullable
    private TopicPartitionOffset[] partitionSubset(ContainerProperties containerProperties, int i) {
        TopicPartitionOffset[] topicPartitions = containerProperties.getTopicPartitions();
        if (topicPartitions == null) {
            return null;
        }
        if (this.concurrency == 1) {
            return topicPartitions;
        }
        int length = topicPartitions.length;
        if (length == this.concurrency) {
            return new TopicPartitionOffset[]{topicPartitions[i]};
        }
        int i2 = length / this.concurrency;
        return i == this.concurrency - 1 ? (TopicPartitionOffset[]) Arrays.copyOfRange(topicPartitions, i * i2, topicPartitions.length) : (TopicPartitionOffset[]) Arrays.copyOfRange(topicPartitions, i * i2, (i + 1) * i2);
    }

    @Override // org.springframework.kafka.listener.AbstractMessageListenerContainer
    protected void doStop(Runnable runnable, boolean z) {
        AtomicInteger atomicInteger = new AtomicInteger();
        if (isRunning()) {
            boolean isChildRunning = isChildRunning();
            setRunning(false);
            if (!isChildRunning) {
                runnable.run();
            }
            Iterator<KafkaMessageListenerContainer<K, V>> it = this.containers.iterator();
            while (it.hasNext()) {
                if (it.next().isRunning()) {
                    atomicInteger.incrementAndGet();
                }
            }
            for (KafkaMessageListenerContainer<K, V> kafkaMessageListenerContainer : this.containers) {
                kafkaMessageListenerContainer.setFenced(true);
                if (kafkaMessageListenerContainer.isRunning()) {
                    if (z) {
                        kafkaMessageListenerContainer.stop(() -> {
                            if (atomicInteger.decrementAndGet() <= 0) {
                                runnable.run();
                            }
                        });
                    } else {
                        kafkaMessageListenerContainer.stopAbnormally(() -> {
                            if (atomicInteger.decrementAndGet() <= 0) {
                                runnable.run();
                            }
                        });
                    }
                }
            }
            setStoppedNormally(z);
            if (this.startedContainers.get() == 0) {
                publishConcurrentContainerStoppedEvent(this.reason);
            }
        }
    }

    @Override // org.springframework.kafka.listener.MessageListenerContainer
    public void childStarted(MessageListenerContainer messageListenerContainer) {
        this.lifecycleLock.lock();
        try {
            if (this.containers.contains(messageListenerContainer)) {
                this.startedContainers.incrementAndGet();
            }
        } finally {
            this.lifecycleLock.unlock();
        }
    }

    @Override // org.springframework.kafka.listener.MessageListenerContainer
    public void childStopped(MessageListenerContainer messageListenerContainer, ConsumerStoppedEvent.Reason reason) {
        this.lifecycleLock.lock();
        try {
            if (this.containers.contains(messageListenerContainer)) {
                if (this.reason == null || reason.equals(ConsumerStoppedEvent.Reason.AUTH)) {
                    this.reason = reason;
                }
                if (this.startedContainers.decrementAndGet() == 0) {
                    if (!isRunning()) {
                        this.containers.clear();
                        publishConcurrentContainerStoppedEvent(this.reason);
                    }
                    boolean z = ConsumerStoppedEvent.Reason.AUTH.equals(this.reason) && getContainerProperties().isRestartAfterAuthExceptions();
                    this.reason = null;
                    if (z) {
                        AsyncTaskExecutor listenerTaskExecutor = getContainerProperties().getListenerTaskExecutor();
                        if (listenerTaskExecutor == null) {
                            listenerTaskExecutor = new SimpleAsyncTaskExecutor(getListenerId() + ".authRestart");
                        }
                        listenerTaskExecutor.execute(this::start);
                    }
                }
                this.lifecycleLock.unlock();
            }
        } finally {
            this.lifecycleLock.unlock();
        }
    }

    private void publishConcurrentContainerStoppedEvent(ConsumerStoppedEvent.Reason reason) {
        ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();
        if (applicationEventPublisher != null) {
            applicationEventPublisher.publishEvent((ApplicationEvent) new ConcurrentContainerStoppedEvent(this, reason));
        }
    }

    @Override // org.springframework.kafka.listener.MessageListenerContainer
    public void enforceRebalance() {
        this.lifecycleLock.lock();
        try {
            this.containers.get(0).enforceRebalance();
        } finally {
            this.lifecycleLock.unlock();
        }
    }

    @Override // org.springframework.kafka.listener.AbstractMessageListenerContainer, org.springframework.kafka.listener.MessageListenerContainer
    public void pause() {
        this.lifecycleLock.lock();
        try {
            super.pause();
            this.containers.forEach((v0) -> {
                v0.pause();
            });
        } finally {
            this.lifecycleLock.unlock();
        }
    }

    @Override // org.springframework.kafka.listener.AbstractMessageListenerContainer, org.springframework.kafka.listener.MessageListenerContainer
    public void resume() {
        this.lifecycleLock.lock();
        try {
            super.resume();
            this.containers.forEach((v0) -> {
                v0.resume();
            });
        } finally {
            this.lifecycleLock.unlock();
        }
    }

    @Override // org.springframework.kafka.listener.AbstractMessageListenerContainer, org.springframework.kafka.listener.MessageListenerContainer
    public void pausePartition(TopicPartition topicPartition) {
        this.lifecycleLock.lock();
        try {
            this.containers.stream().filter(kafkaMessageListenerContainer -> {
                return containsPartition(topicPartition, kafkaMessageListenerContainer);
            }).forEach(kafkaMessageListenerContainer2 -> {
                kafkaMessageListenerContainer2.pausePartition(topicPartition);
            });
        } finally {
            this.lifecycleLock.unlock();
        }
    }

    @Override // org.springframework.kafka.listener.AbstractMessageListenerContainer, org.springframework.kafka.listener.MessageListenerContainer
    public void resumePartition(TopicPartition topicPartition) {
        this.lifecycleLock.lock();
        try {
            this.containers.stream().filter(kafkaMessageListenerContainer -> {
                return kafkaMessageListenerContainer.isPartitionPauseRequested(topicPartition);
            }).forEach(kafkaMessageListenerContainer2 -> {
                kafkaMessageListenerContainer2.resumePartition(topicPartition);
            });
        } finally {
            this.lifecycleLock.unlock();
        }
    }

    @Override // org.springframework.kafka.listener.MessageListenerContainer
    public boolean isPartitionPaused(TopicPartition topicPartition) {
        this.lifecycleLock.lock();
        try {
            return this.containers.stream().anyMatch(kafkaMessageListenerContainer -> {
                return kafkaMessageListenerContainer.isPartitionPaused(topicPartition);
            });
        } finally {
            this.lifecycleLock.unlock();
        }
    }

    @Override // org.springframework.kafka.listener.MessageListenerContainer
    public boolean isInExpectedState() {
        this.lifecycleLock.lock();
        try {
            boolean z = isRunning() || isStoppedNormally();
            if (z) {
                Iterator<KafkaMessageListenerContainer<K, V>> it = this.containers.iterator();
                while (it.hasNext()) {
                    if (!it.next().isInExpectedState()) {
                        return false;
                    }
                }
            }
            this.lifecycleLock.unlock();
            return z;
        } finally {
            this.lifecycleLock.unlock();
        }
    }

    private boolean containsPartition(TopicPartition topicPartition, KafkaMessageListenerContainer<K, V> kafkaMessageListenerContainer) {
        Collection<TopicPartition> assignedPartitions = kafkaMessageListenerContainer.getAssignedPartitions();
        return assignedPartitions != null && assignedPartitions.contains(topicPartition);
    }

    private void clearState() {
        this.containers.clear();
        this.startedContainers.set(0);
        this.reason = null;
    }

    public String toString() {
        return "ConcurrentMessageListenerContainer [concurrency=" + this.concurrency + ", beanName=" + getBeanName() + ", running=" + isRunning() + "]";
    }
}
