package org.onetwo.ext.ons.consumer;

import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.exception.MQClientException;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.message.MessageExt;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.onetwo.boot.utils.BootUtils;
import org.onetwo.common.exception.BaseException;
import org.onetwo.common.log.JFishLoggerFactory;
import org.onetwo.common.reflect.ReflectUtils;
import org.onetwo.common.spring.SpringUtils;
import org.onetwo.common.utils.StringUtils;
import org.onetwo.ext.alimq.BatchConsumContext;
import org.onetwo.ext.alimq.ConsumContext;
import org.onetwo.ext.ons.ListenerType;
import org.onetwo.ext.ons.ONSProperties;
import org.onetwo.ext.ons.ONSUtils;
import org.onetwo.ext.ons.exception.ImpossibleConsumeException;
import org.onetwo.ext.ons.exception.MessageConsumedException;
import org.slf4j.Logger;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.util.Assert;

/* loaded from: input_file:org/onetwo/ext/ons/consumer/ONSPushConsumerStarter.class */
public class ONSPushConsumerStarter implements InitializingBean, DisposableBean {

    @Autowired
    private ApplicationContext applicationContext;
    private ONSProperties onsProperties;
    private DelegateMessageService delegateMessageService;

    @Autowired
    private List<ConsumerProcessor> consumerProcessors;
    private final Logger logger = ONSUtils.getONSLogger();
    private List<Consumer> consumers = Lists.newArrayList();

    /* loaded from: input_file:org/onetwo/ext/ons/consumer/ONSPushConsumerStarter$ConsumerScanner.class */
    public class ConsumerScanner {
        private List<ConsumerProcessor> consumerProcessors;

        public ConsumerScanner(ApplicationContext applicationContext, List<ConsumerProcessor> list) {
            this.consumerProcessors = list;
        }

        public Map<String, ConsumerMeta> findConsumers() {
            HashMap newHashMap = Maps.newHashMap();
            this.consumerProcessors.forEach(consumerProcessor -> {
                consumerProcessor.parse(newHashMap);
            });
            return newHashMap;
        }
    }

    public void setOnsProperties(ONSProperties oNSProperties) {
        this.onsProperties = oNSProperties;
    }

    public void setDelegateMessageService(DelegateMessageService delegateMessageService) {
        this.delegateMessageService = delegateMessageService;
    }

    public void afterPropertiesSet() throws Exception {
        this.logger.info("ons consumer init. namesrvAddr: {}", this.onsProperties.getOnsAddr());
        Map<String, ConsumerMeta> findConsumers = new ConsumerScanner(this.applicationContext, this.consumerProcessors).findConsumers();
        BootUtils.asyncInit(() -> {
            findConsumers.entrySet().forEach(entry -> {
                try {
                    initializeConsumers((ConsumerMeta) entry.getValue());
                } catch (MQClientException | InterruptedException e) {
                    this.logger.error("mq consumer initialize error: " + e.getMessage(), e);
                    throw new BaseException("mq consumer initialize error: " + e.getMessage(), e);
                }
            });
        });
    }

    protected String resloveValue(String str) {
        return SpringUtils.resolvePlaceholders(this.applicationContext, str);
    }

    protected Consumer createConsumer(Properties properties) {
        return ONSFactory.createConsumer(properties);
    }

    private void initializeConsumers(ConsumerMeta consumerMeta) throws InterruptedException, MQClientException {
        Assert.hasText(consumerMeta.getConsumerId(), "consumerId can not be empty!");
        Assert.hasText(consumerMeta.getTopic(), "topic can not be empty!");
        this.logger.info("create mq consumerId: {}", consumerMeta.getConsumerId());
        Properties baseProperties = this.onsProperties.baseProperties();
        baseProperties.setProperty("ConsumerId", consumerMeta.getConsumerId());
        baseProperties.setProperty("MessageModel", consumerMeta.getMessageModel().name());
        if (consumerMeta.getMaxReconsumeTimes() > 0) {
            baseProperties.setProperty("maxReconsumeTimes", String.valueOf(consumerMeta.getMaxReconsumeTimes()));
        }
        if (consumerMeta.getConsumeTimeoutInMinutes() > 0) {
            baseProperties.setProperty("consumeTimeout", consumerMeta.getConsumeTimeoutInMinutes() + "");
        }
        if (consumerMeta.getComsumerProperties() != null) {
            baseProperties.putAll(consumerMeta.getComsumerProperties());
        }
        Properties properties = this.onsProperties.getConsumers().get(consumerMeta.getConsumerId());
        if (properties != null) {
            baseProperties.putAll(properties);
        }
        Consumer createConsumer = createConsumer(baseProperties);
        DefaultMQPushConsumer defaultMQPushConsumer = (DefaultMQPushConsumer) ReflectUtils.getFieldValue(createConsumer, "defaultMQPushConsumer");
        defaultMQPushConsumer.setConsumeFromWhere(consumerMeta.getConsumeFromWhere());
        defaultMQPushConsumer.setConsumeMessageBatchMaxSize(consumerMeta.getConsumeMessageBatchMaxSize());
        if (StringUtils.isNotBlank(baseProperties.getProperty(ConsumerMeta.CONSUME_TIMESTAMP_KEY))) {
            defaultMQPushConsumer.setConsumeTimestamp(baseProperties.getProperty(ConsumerMeta.CONSUME_TIMESTAMP_KEY));
        } else if (StringUtils.isNotBlank(consumerMeta.getConsumeTimestamp())) {
            defaultMQPushConsumer.setConsumeTimestamp(consumerMeta.getConsumeTimestamp());
        }
        consumerMeta.setComsumerProperties(baseProperties);
        configSpecialConsume(defaultMQPushConsumer);
        ListenerType listenerType = consumerMeta.getListenerType();
        if (listenerType == ListenerType.CUSTOM) {
            defaultMQPushConsumer.subscribe(consumerMeta.getTopic(), consumerMeta.getSubExpression());
            registerONSConsumerListener(defaultMQPushConsumer, consumerMeta);
            defaultMQPushConsumer.start();
        } else if (listenerType == ListenerType.RMQ) {
            defaultMQPushConsumer.subscribe(consumerMeta.getTopic(), consumerMeta.getSubExpression());
            defaultMQPushConsumer.registerMessageListener((MessageListenerConcurrently) consumerMeta.getConsumerAction());
            defaultMQPushConsumer.start();
        } else {
            createConsumer.subscribe(consumerMeta.getTopic(), consumerMeta.getSubExpression(), (MessageListener) consumerMeta.getConsumerAction());
            createConsumer.start();
        }
        this.logger.info("ONSConsumer[{}] started! meta: {}", consumerMeta.getConsumerId(), consumerMeta);
    }

    private void configSpecialConsume(DefaultMQPushConsumer defaultMQPushConsumer) {
        ONSProperties.ConsumeFromWhereProps specialConsume = this.onsProperties.getSpecialConsume();
        if (specialConsume.isEnabled()) {
            defaultMQPushConsumer.setConsumeFromWhere(specialConsume.getConsumeFromWhere());
            defaultMQPushConsumer.setConsumeTimestamp(specialConsume.getConsumeTimestamp());
        }
    }

    private void registerONSConsumerListener(DefaultMQPushConsumer defaultMQPushConsumer, final ConsumerMeta consumerMeta) throws MQClientException {
        defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() { // from class: org.onetwo.ext.ons.consumer.ONSPushConsumerStarter.1
            /* JADX WARN: Type inference failed for: r10v1, types: [java.lang.Throwable, org.onetwo.ext.ons.exception.MessageConsumedException] */
            /* JADX WARN: Type inference failed for: r10v2, types: [java.lang.Throwable, org.onetwo.ext.ons.exception.ImpossibleConsumeException] */
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                if (consumerMeta.getIgnoreOffSetThreshold() > 0) {
                    long messageDiff = ONSUtils.getMessageDiff(list.get(0));
                    if (messageDiff > consumerMeta.getIgnoreOffSetThreshold()) {
                        ONSPushConsumerStarter.this.logger.warn("message offset diff[{}] is greater than ignoreOffSetThreshold[{}], ignore!", Long.valueOf(messageDiff), Long.valueOf(consumerMeta.getIgnoreOffSetThreshold()));
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    }
                }
                BatchConsumContext batchConsumContext = null;
                try {
                    batchConsumContext = ONSPushConsumerStarter.this.delegateMessageService.processMessages(consumerMeta, list, consumeConcurrentlyContext);
                } catch (ImpossibleConsumeException e) {
                    ONSPushConsumerStarter.this.logAndMail("message can not be consumed and will skip: " + e.getMessage(), e);
                } catch (MessageConsumedException e2) {
                    if (ONSPushConsumerStarter.this.logger.isDebugEnabled()) {
                        ONSPushConsumerStarter.this.logger.debug("message has been consumed and will skip: " + e2.getMessage(), (Throwable) e2);
                    } else {
                        ONSPushConsumerStarter.this.logger.warn("message has been consumed and will skip: " + e2.getMessage());
                    }
                } catch (Throwable th) {
                    ConsumContext currentContext = batchConsumContext == null ? null : batchConsumContext.getCurrentContext();
                    String message = th.getMessage();
                    if (currentContext == null || !currentContext.isWillSkipConsume()) {
                        ONSPushConsumerStarter.this.logAndMail(message, th);
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }
                    ONSPushConsumerStarter.this.logAndMail("message will skip. " + message, th);
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void logAndMail(String str, Throwable th) {
        this.logger.error(str, th);
        JFishLoggerFactory.findMailLogger().error(str, th);
    }

    public void destroy() {
        this.consumers.forEach(consumer -> {
            consumer.shutdown();
        });
        this.logger.info("DefaultMQPushConsumer shutdown.");
    }
}
