package org.onetwo.ext.rocketmq.consumer;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.onetwo.ext.rocketmq.annotation.RMQConsumer;
import org.onetwo.ext.rocketmq.annotation.RMQSubscribe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.support.AopUtils;
import org.springframework.beans.factory.BeanFactoryUtils;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.util.Assert;
import org.springframework.util.ReflectionUtils;

/* loaded from: input_file:org/onetwo/ext/rocketmq/consumer/RocketMQPushConsumerStarter.class */
public class RocketMQPushConsumerStarter implements InitializingBean, DisposableBean {
    private String namesrvAddr;

    @Autowired
    private ApplicationContext applicationContext;
    private final Logger logger = LoggerFactory.getLogger(RocketMQPushConsumerStarter.class);
    private List<DefaultMQPushConsumer> defaultMQPushConsumers = Lists.newArrayList();

    /* loaded from: input_file:org/onetwo/ext/rocketmq/consumer/RocketMQPushConsumerStarter$ConsumerScanner.class */
    public static class ConsumerScanner {
        private ApplicationContext applicationContext;

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:org/onetwo/ext/rocketmq/consumer/RocketMQPushConsumerStarter$ConsumerScanner$DelegateAppMQConsumer.class */
        public class DelegateAppMQConsumer implements AppMQConsumer<MessageExt> {
            private Object target;
            private Method consumerMethod;
            private ConsumerMeta meta;

            public DelegateAppMQConsumer(Object obj, Method method, RMQSubscribe rMQSubscribe) {
                this.target = obj;
                this.consumerMethod = method;
                this.meta = new ConsumerMeta(rMQSubscribe.groupName(), rMQSubscribe.topic(), Sets.newHashSet(rMQSubscribe.tags()), rMQSubscribe.messageModel(), rMQSubscribe.consumeFromWhere(), rMQSubscribe.ignoreOffSetThreshold());
            }

            @Override // org.onetwo.ext.rocketmq.consumer.AppMQConsumer
            public ConsumerMeta getConsumerMeta() {
                return this.meta;
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // org.onetwo.ext.rocketmq.consumer.AppMQConsumer
            public MessageExt convertMessage(MessageExt messageExt) {
                return messageExt;
            }

            @Override // org.onetwo.ext.rocketmq.consumer.AppMQConsumer
            public void doConsume(MessageExt messageExt) {
                ReflectionUtils.invokeMethod(this.consumerMethod, this.target, new Object[]{messageExt});
            }
        }

        public ConsumerScanner(ApplicationContext applicationContext) {
            this.applicationContext = applicationContext;
        }

        public List<AppMQConsumer> findConsumers() {
            return (List) this.applicationContext.getBeansWithAnnotation(RMQConsumer.class).values().stream().flatMap(obj -> {
                return findAppMQConsumer(obj).stream();
            }).collect(Collectors.toList());
        }

        private List<AppMQConsumer> findAppMQConsumer(Object obj) {
            return (List) findConsumerMethods(obj).stream().map(method -> {
                return new DelegateAppMQConsumer(obj, method, (RMQSubscribe) AnnotationUtils.findAnnotation(method, RMQSubscribe.class));
            }).collect(Collectors.toList());
        }

        private List<Method> findConsumerMethods(Object obj) {
            Class targetClass = AopUtils.getTargetClass(obj);
            ArrayList newArrayList = Lists.newArrayList();
            ReflectionUtils.doWithMethods(targetClass, method -> {
                newArrayList.add(method);
            }, method2 -> {
                if (AnnotationUtils.findAnnotation(method2, RMQSubscribe.class) == null) {
                    return false;
                }
                if (ArrayUtils.contains(method2.getParameterTypes(), MessageExt.class)) {
                    return true;
                }
                throw new RuntimeException("the parameter type of the consumer method must be a " + MessageExt.class);
            });
            return newArrayList;
        }
    }

    public void afterPropertiesSet() throws Exception {
        this.logger.info("mq consumer init. namesrvAddr: {}", this.namesrvAddr);
        ArrayList newArrayList = Lists.newArrayList(BeanFactoryUtils.beansOfTypeIncludingAncestors(this.applicationContext, AppMQConsumer.class).values());
        newArrayList.addAll(new ConsumerScanner(this.applicationContext).findConsumers());
        ((Map) newArrayList.stream().collect(Collectors.groupingBy(appMQConsumer -> {
            return appMQConsumer.getConsumerMeta();
        }))).entrySet().forEach(entry -> {
            try {
                initializeConsumers((ConsumerMeta) entry.getKey(), (List) entry.getValue());
            } catch (MQClientException | InterruptedException e) {
                this.logger.error("mq consumer initialize error: " + e.getMessage(), e);
            }
        });
    }

    private void initializeConsumers(ConsumerMeta consumerMeta, final List<AppMQConsumer> list) throws InterruptedException, MQClientException {
        Assert.hasText(consumerMeta.getGroupName(), "consumerGroup can not be empty!");
        Assert.hasText(consumerMeta.getTopic(), "topic can not be empty!");
        this.logger.info("create mq consumergroup: {}", consumerMeta.getGroupName());
        list.forEach(appMQConsumer -> {
            this.logger.info("consumer: {}", appMQConsumer.getConsumerMeta());
        });
        DefaultMQPushConsumer createAndConfigMQPushConsumer = createAndConfigMQPushConsumer(consumerMeta);
        createAndConfigMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() { // from class: org.onetwo.ext.rocketmq.consumer.RocketMQPushConsumerStarter.1
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list2, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                MessageExt messageExt = list2.get(0);
                RocketMQPushConsumerStarter.this.logger.info("receive id: {}, topic: {}, tag: {}", new Object[]{messageExt.getMsgId(), messageExt.getTopic(), messageExt.getTags()});
                try {
                    list.stream().forEach(appMQConsumer2 -> {
                        ConsumerMeta consumerMeta2 = appMQConsumer2.getConsumerMeta();
                        if (consumerMeta2.getIgnoreOffSetThreshold() > 0) {
                            long messageDiff = RocketMQPushConsumerStarter.this.getMessageDiff(messageExt);
                            if (messageDiff > consumerMeta2.getIgnoreOffSetThreshold()) {
                                RocketMQPushConsumerStarter.this.logger.info("message offset diff[{}] is greater than ignoreOffSetThreshold, ignore!", Long.valueOf(messageDiff));
                                return;
                            }
                        }
                        Object convertMessage = appMQConsumer2.convertMessage(messageExt);
                        RocketMQPushConsumerStarter.this.logger.info("consume id: {}, body: {}", messageExt.getMsgId(), convertMessage);
                        appMQConsumer2.doConsume(convertMessage);
                    });
                    RocketMQPushConsumerStarter.this.logger.info("consume finish. id: {}, topic: {}, tag: {}", new Object[]{messageExt.getMsgId(), messageExt.getTopic(), messageExt.getTags()});
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                } catch (Exception e) {
                    RocketMQPushConsumerStarter.this.logger.error("consume message error. id: " + messageExt.getMsgId() + ", topic: " + messageExt.getTopic() + ", tag: " + messageExt.getTags(), e);
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            }
        });
        createAndConfigMQPushConsumer.start();
        this.logger.info("defaultMQPushConsumer[{}] start. consumers size: {}", consumerMeta.getGroupName(), Integer.valueOf(list.size()));
        this.defaultMQPushConsumers.add(createAndConfigMQPushConsumer);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public long getMessageDiff(MessageExt messageExt) {
        try {
            return Long.parseLong(messageExt.getProperty("MAX_OFFSET")) - messageExt.getQueueOffset();
        } catch (Exception e) {
            return 0L;
        }
    }

    protected DefaultMQPushConsumer createAndConfigMQPushConsumer(ConsumerMeta consumerMeta) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer(consumerMeta.getGroupName());
        defaultMQPushConsumer.setNamesrvAddr(this.namesrvAddr);
        defaultMQPushConsumer.setVipChannelEnabled(false);
        if (consumerMeta.getTags() == null || consumerMeta.getTags().isEmpty()) {
            defaultMQPushConsumer.subscribe(consumerMeta.getTopic(), (String) null);
        } else {
            defaultMQPushConsumer.subscribe(consumerMeta.getTopic(), StringUtils.join(consumerMeta.getTags(), " || "));
        }
        defaultMQPushConsumer.setConsumeFromWhere(consumerMeta.getConsumeFromWhere());
        defaultMQPushConsumer.setMessageModel(consumerMeta.getMessageModel());
        return defaultMQPushConsumer;
    }

    public void destroy() {
        this.defaultMQPushConsumers.forEach(defaultMQPushConsumer -> {
            defaultMQPushConsumer.shutdown();
        });
        this.logger.info("DefaultMQPushConsumer shutdown.");
    }

    public void setNamesrvAddr(String str) {
        this.namesrvAddr = str;
    }
}
