package cc.chensoul.rose.redis.mq.config;

import cc.chensoul.rose.core.util.FormatUtils;
import cc.chensoul.rose.core.util.NetUtils;
import cc.chensoul.rose.redis.config.RedisCacheConfig;
import cc.chensoul.rose.redis.mq.RedisMQTemplate;
import cc.chensoul.rose.redis.mq.job.RedisPendingMessageResendJob;
import cc.chensoul.rose.redis.mq.pubsub.AbstractRedisChannelMessageListener;
import cc.chensoul.rose.redis.mq.stream.AbstractRedisStreamMessageListener;
import java.lang.management.ManagementFactory;
import java.util.List;
import java.util.Properties;
import org.apache.commons.lang3.StringUtils;
import org.redisson.api.RedissonClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.scheduling.annotation.EnableScheduling;

@EnableScheduling
@AutoConfiguration(after = {RedisCacheConfig.class})
/* loaded from: input_file:cc/chensoul/rose/redis/mq/config/EnjoyRedisMQConsumerAutoConfiguration.class */
public class EnjoyRedisMQConsumerAutoConfiguration {
    private static final Logger log;
    static final /* synthetic */ boolean $assertionsDisabled;

    private static String buildConsumerName() {
        return String.format("%s@%d", NetUtils.getLocalhostStr(), Long.valueOf(Long.parseLong(ManagementFactory.getRuntimeMXBean().getName().split("@")[0])));
    }

    private static void checkRedisVersion(RedisTemplate<String, ?> redisTemplate) {
        Properties properties = (Properties) redisTemplate.execute((v0) -> {
            return v0.info();
        });
        if (!$assertionsDisabled && properties == null) {
            throw new AssertionError();
        }
        String property = properties.getProperty("redis_version");
        if (Integer.parseInt(StringUtils.substringBefore(property, 46)) < 5) {
            throw new IllegalStateException(FormatUtils.format("您当前的 Redis 版本为 {}，小于最低要求的 5.0.0 版本！", new Object[]{property}));
        }
    }

    @ConditionalOnBean({AbstractRedisChannelMessageListener.class})
    @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer(RedisMQTemplate redisMQTemplate, List<AbstractRedisChannelMessageListener<?>> list) {
        RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
        redisMessageListenerContainer.setConnectionFactory(redisMQTemplate.getRedisTemplate().getRequiredConnectionFactory());
        list.forEach(abstractRedisChannelMessageListener -> {
            abstractRedisChannelMessageListener.setRedisMQTemplate(redisMQTemplate);
            redisMessageListenerContainer.addMessageListener(abstractRedisChannelMessageListener, new ChannelTopic(abstractRedisChannelMessageListener.getChannel()));
            log.info("[redisMessageListenerContainer][注册 Channel({}) 对应的监听器({})]", abstractRedisChannelMessageListener.getChannel(), abstractRedisChannelMessageListener.getClass().getName());
        });
        return redisMessageListenerContainer;
    }

    @ConditionalOnBean({AbstractRedisStreamMessageListener.class})
    @Bean
    public RedisPendingMessageResendJob redisPendingMessageResendJob(List<AbstractRedisStreamMessageListener<?>> list, RedisMQTemplate redisMQTemplate, @Value("${spring.application.name}") String str, RedissonClient redissonClient) {
        return new RedisPendingMessageResendJob(list, redisMQTemplate, str, redissonClient);
    }

    @ConditionalOnBean({AbstractRedisStreamMessageListener.class})
    @Bean(initMethod = "start", destroyMethod = "stop")
    public StreamMessageListenerContainer<String, ObjectRecord<String, String>> redisStreamMessageListenerContainer(RedisMQTemplate redisMQTemplate, List<AbstractRedisStreamMessageListener<?>> list) {
        RedisTemplate<String, ?> redisTemplate = redisMQTemplate.getRedisTemplate();
        checkRedisVersion(redisTemplate);
        StreamMessageListenerContainer<String, ObjectRecord<String, String>> create = StreamMessageListenerContainer.create(redisMQTemplate.getRedisTemplate().getRequiredConnectionFactory(), StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder().batchSize(10).targetType(String.class).build());
        String buildConsumerName = buildConsumerName();
        list.parallelStream().forEach(abstractRedisStreamMessageListener -> {
            log.info("[redisStreamMessageListenerContainer][开始注册 StreamKey({}) 对应的监听器({})]", abstractRedisStreamMessageListener.getStreamKey(), abstractRedisStreamMessageListener.getClass().getName());
            try {
                redisTemplate.opsForStream().createGroup(abstractRedisStreamMessageListener.getStreamKey(), abstractRedisStreamMessageListener.getGroup());
            } catch (Exception e) {
            }
            abstractRedisStreamMessageListener.setRedisMQTemplate(redisMQTemplate);
            create.register(StreamMessageListenerContainer.StreamReadRequest.builder(StreamOffset.create(abstractRedisStreamMessageListener.getStreamKey(), ReadOffset.lastConsumed())).consumer(Consumer.from(abstractRedisStreamMessageListener.getGroup(), buildConsumerName)).autoAcknowledge(false).cancelOnError(th -> {
                return false;
            }).build(), abstractRedisStreamMessageListener);
            log.info("[redisStreamMessageListenerContainer][完成注册 StreamKey({}) 对应的监听器({})]", abstractRedisStreamMessageListener.getStreamKey(), abstractRedisStreamMessageListener.getClass().getName());
        });
        return create;
    }

    static {
        $assertionsDisabled = !EnjoyRedisMQConsumerAutoConfiguration.class.desiredAssertionStatus();
        log = LoggerFactory.getLogger(EnjoyRedisMQConsumerAutoConfiguration.class);
    }
}
