package cc.chensoul.rose.redis.mq.job;

import cc.chensoul.rose.redis.mq.RedisMQTemplate;
import cc.chensoul.rose.redis.mq.stream.AbstractRedisStreamMessageListener;
import java.util.List;
import java.util.Objects;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.domain.Range;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.PendingMessages;
import org.springframework.data.redis.connection.stream.PendingMessagesSummary;
import org.springframework.data.redis.connection.stream.Record;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.data.redis.core.StreamOperations;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.util.CollectionUtils;

/* loaded from: input_file:BOOT-INF/lib/rose-spring-boot-redis-0.0.1-SNAPSHOT.jar:cc/chensoul/rose/redis/mq/job/RedisPendingMessageResendJob.class */
public class RedisPendingMessageResendJob {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) RedisPendingMessageResendJob.class);
    private static final String LOCK_KEY = "mq:pending:msg:lock";
    private static final int EXPIRE_TIME = 300;
    private final List<AbstractRedisStreamMessageListener<?>> listeners;
    private final RedisMQTemplate redisTemplate;
    private final String groupName;
    private final RedissonClient redissonClient;

    @Scheduled(cron = "35 * * * * ?")
    public void messageResend() {
        RLock lock = this.redissonClient.getLock(LOCK_KEY);
        try {
        } catch (Exception e) {
            log.error("[messageResend][执行异常]", (Throwable) e);
        } finally {
            lock.unlock();
        }
        if (lock.tryLock()) {
            execute();
        }
    }

    private void execute() {
        StreamOperations<String, HK, HV> opsForStream = this.redisTemplate.getRedisTemplate().opsForStream();
        this.listeners.forEach(abstractRedisStreamMessageListener -> {
            ((PendingMessagesSummary) Objects.requireNonNull(opsForStream.pending((StreamOperations) abstractRedisStreamMessageListener.getStreamKey(), this.groupName))).getPendingMessagesPerConsumer().forEach((str, l) -> {
                log.info("[processPendingMessage][消费者({}) 消息数量({})]", str, l);
                PendingMessages pending = opsForStream.pending((StreamOperations) abstractRedisStreamMessageListener.getStreamKey(), Consumer.from(this.groupName, str), Range.unbounded(), l.longValue());
                if (pending.isEmpty()) {
                    return;
                }
                pending.forEach(pendingMessage -> {
                    if (pendingMessage.getElapsedTimeSinceLastDelivery().getSeconds() < 300) {
                        return;
                    }
                    List range = opsForStream.range(abstractRedisStreamMessageListener.getStreamKey(), Range.of(Range.Bound.inclusive(pendingMessage.getIdAsString()), Range.Bound.inclusive(pendingMessage.getIdAsString())));
                    if (CollectionUtils.isEmpty(range)) {
                        return;
                    }
                    this.redisTemplate.getRedisTemplate().opsForStream().add(StreamRecords.newRecord().ofObject(((MapRecord) range.get(0)).getValue()).withStreamKey((Record) abstractRedisStreamMessageListener.getStreamKey()));
                    this.redisTemplate.getRedisTemplate().opsForStream().acknowledge(this.groupName, (Record) range.get(0));
                    log.info("[processPendingMessage][消息({})重新投递成功]", ((MapRecord) range.get(0)).getId());
                });
            });
        });
    }

    public RedisPendingMessageResendJob(List<AbstractRedisStreamMessageListener<?>> list, RedisMQTemplate redisMQTemplate, String str, RedissonClient redissonClient) {
        this.listeners = list;
        this.redisTemplate = redisMQTemplate;
        this.groupName = str;
        this.redissonClient = redissonClient;
    }
}
