package io.github.cocoa.framework.mq.redis.config;

import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.system.SystemUtil;
import io.github.cocoa.framework.common.enums.DocumentEnum;
import io.github.cocoa.framework.mq.redis.core.RedisMqTemplate;
import io.github.cocoa.framework.mq.redis.core.job.RedisPendingMessageResendJob;
import io.github.cocoa.framework.mq.redis.core.pubsub.AbstractRedisChannelMessageListener;
import io.github.cocoa.framework.mq.redis.core.stream.AbstractRedisStreamMessageListener;
import io.github.cocoa.framework.redis.config.CocoaRedisAutoConfiguration;
import java.util.List;
import java.util.Properties;
import org.apache.catalina.Lifecycle;
import org.redisson.api.RedissonClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.scheduling.annotation.EnableScheduling;

@EnableScheduling
@AutoConfiguration(after = {CocoaRedisAutoConfiguration.class})
/* loaded from: input_file:BOOT-INF/lib/cocoa-spring-boot-starter-mq-1.8.0-SNAPSHOT.jar:io/github/cocoa/framework/mq/redis/config/CocoaRedisMQConsumerAutoConfiguration.class */
public class CocoaRedisMQConsumerAutoConfiguration {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) CocoaRedisMQConsumerAutoConfiguration.class);

    @ConditionalOnBean({AbstractRedisChannelMessageListener.class})
    @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer(RedisMqTemplate redisMqTemplate, List<AbstractRedisChannelMessageListener<?>> list) {
        RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
        redisMessageListenerContainer.setConnectionFactory(redisMqTemplate.getRedisTemplate().getRequiredConnectionFactory());
        list.forEach(abstractRedisChannelMessageListener -> {
            abstractRedisChannelMessageListener.setRedisMqTemplate(redisMqTemplate);
            redisMessageListenerContainer.addMessageListener(abstractRedisChannelMessageListener, new ChannelTopic(abstractRedisChannelMessageListener.getChannel()));
            log.info("[redisMessageListenerContainer][注册 Channel({}) 对应的监听器({})]", abstractRedisChannelMessageListener.getChannel(), abstractRedisChannelMessageListener.getClass().getName());
        });
        return redisMessageListenerContainer;
    }

    @ConditionalOnBean({AbstractRedisStreamMessageListener.class})
    @Bean
    public RedisPendingMessageResendJob redisPendingMessageResendJob(List<AbstractRedisStreamMessageListener<?>> list, RedisMqTemplate redisMqTemplate, @Value("${spring.application.name}") String str, RedissonClient redissonClient) {
        return new RedisPendingMessageResendJob(list, redisMqTemplate, str, redissonClient);
    }

    @ConditionalOnBean({AbstractRedisStreamMessageListener.class})
    @Bean(initMethod = Lifecycle.START_EVENT, destroyMethod = Lifecycle.STOP_EVENT)
    public StreamMessageListenerContainer<String, ObjectRecord<String, String>> redisStreamMessageListenerContainer(RedisMqTemplate redisMqTemplate, List<AbstractRedisStreamMessageListener<?>> list) {
        RedisTemplate<String, ?> redisTemplate = redisMqTemplate.getRedisTemplate();
        checkRedisVersion(redisTemplate);
        StreamMessageListenerContainer<String, ObjectRecord<String, String>> create = StreamMessageListenerContainer.create(redisMqTemplate.getRedisTemplate().getRequiredConnectionFactory(), StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder().batchSize(10).targetType(String.class).build());
        String buildConsumerName = buildConsumerName();
        list.parallelStream().forEach(abstractRedisStreamMessageListener -> {
            log.info("[redisStreamMessageListenerContainer][开始注册 StreamKey({}) 对应的监听器({})]", abstractRedisStreamMessageListener.getStreamKey(), abstractRedisStreamMessageListener.getClass().getName());
            try {
                redisTemplate.opsForStream().createGroup(abstractRedisStreamMessageListener.getStreamKey(), abstractRedisStreamMessageListener.getGroup());
            } catch (Exception e) {
            }
            abstractRedisStreamMessageListener.setRedisMqTemplate(redisMqTemplate);
            create.register(StreamMessageListenerContainer.StreamReadRequest.builder(StreamOffset.create(abstractRedisStreamMessageListener.getStreamKey(), ReadOffset.lastConsumed())).consumer(Consumer.from(abstractRedisStreamMessageListener.getGroup(), buildConsumerName)).autoAcknowledge(false).cancelOnError(th -> {
                return false;
            }).build(), abstractRedisStreamMessageListener);
            log.info("[redisStreamMessageListenerContainer][完成注册 StreamKey({}) 对应的监听器({})]", abstractRedisStreamMessageListener.getStreamKey(), abstractRedisStreamMessageListener.getClass().getName());
        });
        return create;
    }

    private static String buildConsumerName() {
        return String.format("%s@%d", SystemUtil.getHostInfo().getAddress(), Long.valueOf(SystemUtil.getCurrentPID()));
    }

    private static void checkRedisVersion(RedisTemplate<String, ?> redisTemplate) {
        String str = MapUtil.getStr((Properties) redisTemplate.execute((v0) -> {
            return v0.info();
        }), "redis_version");
        if (Integer.parseInt(StrUtil.subBefore((CharSequence) str, '.', false)) < 5) {
            throw new IllegalStateException(StrUtil.format("您当前的 Redis 版本为 {}，小于最低要求的 5.0.0 版本！请参考 {} 文档进行安装。", str, DocumentEnum.REDIS_INSTALL.getUrl()));
        }
    }
}
