package org.onetwo.ext.ons.consumer;

import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.common.message.MessageExt;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.onetwo.common.exception.BaseException;
import org.onetwo.common.exception.MessageOnlyServiceException;
import org.onetwo.common.utils.LangUtils;
import org.onetwo.ext.alimq.BatchConsumContext;
import org.onetwo.ext.alimq.ConsumContext;
import org.onetwo.ext.alimq.JsonMessageSerializer;
import org.onetwo.ext.alimq.MessageDeserializer;
import org.onetwo.ext.alimq.OnsMessage;
import org.onetwo.ext.ons.ONSConsumerListenerComposite;
import org.onetwo.ext.ons.ONSProperties;
import org.onetwo.ext.ons.ONSUtils;
import org.onetwo.ext.ons.exception.ConsumeException;
import org.onetwo.ext.ons.exception.DeserializeMessageException;
import org.onetwo.ext.ons.exception.ImpossibleConsumeException;
import org.slf4j.Logger;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.Assert;

/* loaded from: input_file:org/onetwo/ext/ons/consumer/DelegateMessageService.class */
public class DelegateMessageService implements InitializingBean {
    final Logger logger = ONSUtils.getONSLogger();
    private final MessageDeserializer messageDeserializer;
    private ONSConsumerListenerComposite consumerListenerComposite;

    @Autowired
    private DelegateMessageService delegateMessageService;

    public DelegateMessageService(MessageDeserializer messageDeserializer, ONSConsumerListenerComposite oNSConsumerListenerComposite) {
        this.messageDeserializer = messageDeserializer;
        this.consumerListenerComposite = oNSConsumerListenerComposite;
    }

    public void afterPropertiesSet() throws Exception {
        Assert.notNull(this.messageDeserializer, "messageDeserializer can not be null");
        Assert.notNull(this.consumerListenerComposite, "consumerListenerComposite can not be null");
    }

    private MessageDeserializer getMessageDeserializer(String str) {
        MessageDeserializer messageDeserializer = this.messageDeserializer;
        if (StringUtils.isNotBlank(str)) {
            messageDeserializer = ONSProperties.MessageSerializerType.valueOf(str.toUpperCase()).getDeserializer();
        }
        return messageDeserializer;
    }

    public BatchConsumContext processMessages(ConsumerMeta consumerMeta, List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        CustomONSConsumer customONSConsumer = (CustomONSConsumer) consumerMeta.getConsumerAction();
        ArrayList newArrayListWithExpectedSize = Lists.newArrayListWithExpectedSize(list.size());
        BatchConsumContext batchConsumContext = new BatchConsumContext(newArrayListWithExpectedSize);
        ConsumContext consumContext = null;
        for (MessageExt messageExt : list) {
            String messageId = ONSUtils.getMessageId(messageExt);
            if (this.logger.isInfoEnabled()) {
                this.logger.info("rmq-consumer[{}] received id: {}, key: {}, topic: {}, tag: {}", new Object[]{consumerMeta.getConsumerId(), messageId, messageExt.getKeys(), messageExt.getTopic(), messageExt.getTags()});
            }
            MessageDeserializer messageDeserializer = getMessageDeserializer(messageExt.getUserProperty(OnsMessage.TracableMessage.SERIALIZER_KEY));
            messageExt.getBody();
            if (consumerMeta.isAutoDeserialize()) {
                Class<?> messageBodyClass = customONSConsumer.getMessageBodyClass(consumContext);
                if (messageBodyClass != null) {
                    messageExt.putUserProperty(JsonMessageSerializer.PROP_BODY_TYPE, messageBodyClass.getName());
                }
                consumContext = ConsumContext.builder().messageId(messageId).message(messageExt).deserializedBody(deserializeMessage(messageDeserializer, messageExt)).messageDeserializer(messageDeserializer).build();
            } else {
                consumContext = ConsumContext.builder().messageId(messageId).message(messageExt).messageDeserializer(messageDeserializer).build();
            }
            if (consumerMeta.isUseBatchMode()) {
                newArrayListWithExpectedSize.add(consumContext);
            } else {
                batchConsumContext.setCurrentContext(consumContext);
                if (consumerMeta.shouldWithTransational()) {
                    this.delegateMessageService.consumeMessageWithTransactional(customONSConsumer, consumerMeta, consumContext);
                } else {
                    consumeMessage(customONSConsumer, consumerMeta, consumContext);
                }
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("rmq-consumer[{}] consumed message. id: {}, topic: {}, tag: {}, body: {}", new Object[]{consumerMeta.getConsumerId(), messageId, messageExt.getTopic(), messageExt.getTags(), consumContext.getDeserializedBody()});
                } else if (this.logger.isInfoEnabled()) {
                    this.logger.info("rmq-consumer[{}] consumed message. id: {}, topic: {}, tag: {}", new Object[]{consumerMeta.getConsumerId(), messageId, messageExt.getTopic(), messageExt.getTags()});
                }
            }
        }
        if (consumerMeta.isUseBatchMode()) {
            consumeBatchMessages(customONSConsumer, consumerMeta, batchConsumContext);
        }
        return batchConsumContext;
    }

    private void consumeBatchMessages(CustomONSConsumer customONSConsumer, ConsumerMeta consumerMeta, BatchConsumContext batchConsumContext) {
        List<ConsumContext> contexts = batchConsumContext.getContexts();
        Iterator<ConsumContext> it = contexts.iterator();
        while (it.hasNext()) {
            this.consumerListenerComposite.beforeConsumeMessage(consumerMeta, it.next());
        }
        try {
            customONSConsumer.doConsumeBatch(contexts);
            Iterator<ConsumContext> it2 = contexts.iterator();
            while (it2.hasNext()) {
                this.consumerListenerComposite.afterConsumeMessage(consumerMeta, it2.next());
            }
            if (this.logger.isInfoEnabled()) {
                this.logger.info("rmq-batch-consumer[{}] consumed message. id: {}, topic: {}, tag: {}", new Object[]{consumerMeta.getConsumerId(), consumerMeta.getTopic(), consumerMeta.getSubExpression()});
            }
        } catch (Throwable th) {
            String str = "rmq-batch-consumer[" + consumerMeta.getConsumerId() + "] consumed message error. topic: " + consumerMeta.getTopic() + ", tags: " + consumerMeta.getSubExpression();
            if (batchConsumContext.getCurrentContext() != null) {
                str = buildErrorMessage(consumerMeta, batchConsumContext.getCurrentContext());
            }
            this.consumerListenerComposite.onBatchConsumeMessageError(batchConsumContext, th);
            throw new ConsumeException(str, th);
        }
    }

    private Object deserializeMessage(MessageDeserializer messageDeserializer, MessageExt messageExt) {
        try {
            return messageDeserializer.deserialize(messageExt.getBody(), messageExt);
        } catch (Exception e) {
            String messageId = ONSUtils.getMessageId(messageExt);
            if (LangUtils.getCauseException(e, JsonMappingException.class) != null) {
                throw new ImpossibleConsumeException("deserialize message error, ignore consume. msgId: " + messageId + ", msg: " + e.getMessage(), e);
            }
            if (e instanceof RuntimeException) {
                throw ((RuntimeException) e);
            }
            throw new DeserializeMessageException("deserialize message error, msgId: " + messageId + ", msg: " + e.getMessage(), e);
        }
    }

    private void consumeMessage(CustomONSConsumer customONSConsumer, ConsumerMeta consumerMeta, ConsumContext consumContext) {
        this.consumerListenerComposite.beforeConsumeMessage(consumerMeta, consumContext);
        try {
            customONSConsumer.doConsume(consumContext);
            this.consumerListenerComposite.afterConsumeMessage(consumerMeta, consumContext);
        } catch (Throwable th) {
            String buildErrorMessage = buildErrorMessage(consumerMeta, consumContext);
            this.consumerListenerComposite.onConsumeMessageError(consumContext, th);
            BaseException consumeException = new ConsumeException(buildErrorMessage, th);
            if (th instanceof MessageOnlyServiceException) {
                consumContext.markWillSkipConsume();
            }
            throw consumeException;
        }
    }

    public static String buildErrorMessage(ConsumerMeta consumerMeta, ConsumContext consumContext) {
        return "rmq-consumer[" + consumerMeta.getConsumerId() + "] consumed message error. id: " + ONSUtils.getMessageId(consumContext.getMessage()) + ", key: " + consumContext.getMessage().getKeys() + "topic: " + consumContext.getTopic() + ", tags: " + consumContext.getTags();
    }

    @Transactional
    public void consumeMessageWithTransactional(CustomONSConsumer customONSConsumer, ConsumerMeta consumerMeta, ConsumContext consumContext) {
        consumeMessage(customONSConsumer, consumerMeta, consumContext);
    }
}
