package org.onetwo.boot.mq.task;

import java.util.Date;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.joda.time.LocalDateTime;
import org.onetwo.boot.core.config.BootJFishConfig;
import org.onetwo.boot.module.redis.RedisLockRunner;
import org.onetwo.boot.mq.MQProperties;
import org.onetwo.boot.mq.ProducerService;
import org.onetwo.boot.mq.SendMessageFlags;
import org.onetwo.boot.mq.entity.SendMessageEntity;
import org.onetwo.boot.mq.repository.SendMessageRepository;
import org.onetwo.boot.mq.serializer.MessageBodyStoreSerializer;
import org.onetwo.common.date.NiceDate;
import org.onetwo.common.exception.BaseException;
import org.onetwo.common.log.JFishLoggerFactory;
import org.onetwo.common.utils.LangUtils;
import org.onetwo.dbm.core.spi.DbmSessionFactory;
import org.slf4j.Logger;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.redis.util.RedisLockRegistry;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.util.Assert;

/* loaded from: input_file:org/onetwo/boot/mq/task/DelayableSendMessageTask.class */
public class DelayableSendMessageTask implements InitializingBean, DisposableBean, SendMessageTask {
    public static final String LOCK_KEY = "mq:SendMessageTask";

    @Autowired(required = false)
    private RedisLockRegistry redisLockRegistry;

    @Autowired
    private MessageBodyStoreSerializer messageBodyStoreSerializer;

    @Autowired
    private SendMessageRepository sendMessageRepository;

    @Autowired
    private ProducerService<?, ?> producerService;
    private boolean useReidsLock;
    private String redisLockTimeout;

    @Autowired
    private MQProperties mqProperties;

    @Autowired
    private DbmSessionFactory sessionFactory;
    private ExecutorService executorService;
    protected final Logger log = JFishLoggerFactory.getLogger(getClass());
    private DelayQueue<DelayedMessage> delayedMessageQueue = new DelayQueue<>();
    private Set<String> delayedMessageIds = ConcurrentHashMap.newKeySet();

    /* loaded from: input_file:org/onetwo/boot/mq/task/DelayableSendMessageTask$DelayedMessage.class */
    public static class DelayedMessage implements Delayed {
        SendMessageEntity message;

        public DelayedMessage(SendMessageEntity sendMessageEntity) {
            this.message = sendMessageEntity;
        }

        @Override // java.lang.Comparable
        public int compareTo(Delayed delayed) {
            long time = getMessage().getDeliverAt().getTime();
            long time2 = ((DelayedMessage) delayed).getMessage().getDeliverAt().getTime();
            if (time > time2) {
                return 1;
            }
            return time < time2 ? -1 : 0;
        }

        @Override // java.util.concurrent.Delayed
        public long getDelay(TimeUnit timeUnit) {
            return timeUnit.convert(this.message.getDeliverAt().getTime() - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            DelayedMessage delayedMessage = (DelayedMessage) obj;
            return this.message == null ? delayedMessage.message == null : this.message.getKey().equals(delayedMessage.message.getKey());
        }

        public int hashCode() {
            return (31 * 1) + (this.message == null ? 0 : this.message.getKey().hashCode());
        }

        public SendMessageEntity getMessage() {
            return this.message;
        }

        public void setMessage(SendMessageEntity sendMessageEntity) {
            this.message = sendMessageEntity;
        }

        public String toString() {
            return "DelayableSendMessageTask.DelayedMessage(message=" + getMessage() + ")";
        }
    }

    public void afterPropertiesSet() throws Exception {
        if (this.useReidsLock) {
            Assert.notNull(this.redisLockRegistry, "redisLockRegistry not found!");
        }
        if (this.mqProperties.getTransactional().getSendTask().isCheckMessageTable()) {
            checkMessageTable();
        }
        this.executorService = Executors.newFixedThreadPool(1, newThreadFactory());
        startDelayedTask();
    }

    private ThreadFactory newThreadFactory() {
        return new ThreadFactory() { // from class: org.onetwo.boot.mq.task.DelayableSendMessageTask.1
            @Override // java.util.concurrent.ThreadFactory
            public Thread newThread(Runnable runnable) {
                Thread thread = new Thread(runnable);
                thread.setDaemon(false);
                thread.setName(BootJFishConfig.PREFIX.toUpperCase() + "-RMQ-DelayedTask");
                return thread;
            }
        };
    }

    private void checkMessageTable() {
        if (!this.sessionFactory.getDatabaseMetaDialet().getTableMeta("data_mq_send").hasColumn("is_delay")) {
            throw new BaseException("the column[is_delay] not found on table[data_mq_send], please upgrade the lasted table schemal");
        }
    }

    public void setUseReidsLock(boolean z) {
        this.useReidsLock = z;
    }

    @Override // org.onetwo.boot.mq.task.SendMessageTask
    @Scheduled(fixedDelayString = "${jfish.mq.transactional.sendTask.fixedDelayInMillis:60000}", initialDelay = 5000)
    public void scheduleCheckSendMessage() {
        MQProperties.SendTaskProps sendTask = this.mqProperties.getTransactional().getSendTask();
        doCheckSendMessage(sendTask.getSendCountPerTask(), Integer.parseInt(sendTask.getFixedDelayInMillis()) / 1000);
    }

    protected void doCheckSendMessage(int i, int i2) {
        this.log.info("start to check unsend message...");
        if (this.useReidsLock) {
            getRedisLockRunner().tryLock(() -> {
                findAndProcessUnsendMessage(i, i2);
                return null;
            });
        } else {
            findAndProcessUnsendMessage(i, i2);
        }
        this.log.info("finish check unsend message...");
    }

    protected void findAndProcessUnsendMessage(int i, int i2) {
        LocalDateTime plusSeconds = LocalDateTime.now().plusSeconds(i2);
        String locker = this.mqProperties.getTransactional().getSendTask().getLocker();
        int lockSendMessage = this.sendMessageRepository.lockSendMessage(locker, plusSeconds.toDate(), SendMessageEntity.SendStates.UNSEND);
        if (this.log.isInfoEnabled()) {
            this.log.info("lock [{}] mesage from database", Integer.valueOf(lockSendMessage));
        }
        List<SendMessageEntity> findLockerMessage = this.sendMessageRepository.findLockerMessage(locker, plusSeconds.toDate(), SendMessageEntity.SendStates.UNSEND, i);
        if (LangUtils.isEmpty(findLockerMessage)) {
            if (this.log.isInfoEnabled()) {
                this.log.info("no unsend mesage found from database");
                return;
            }
            return;
        }
        if (this.log.isInfoEnabled()) {
            this.log.info("find [{}] mesage from database to be sending", Integer.valueOf(findLockerMessage.size()));
        }
        Date time = NiceDate.Now().nextMinute(-this.mqProperties.getTransactional().getSendTask().getIgnoreSendCreateAtRecentlyInSeconds()).getTime();
        for (SendMessageEntity sendMessageEntity : findLockerMessage) {
            if (sendMessageEntity.getDelay() != null && sendMessageEntity.getDelay().booleanValue()) {
                DelayedMessage delayedMessage = new DelayedMessage(sendMessageEntity);
                if (this.delayedMessageIds.contains(sendMessageEntity.getKey()) || !this.delayedMessageQueue.offer((DelayQueue<DelayedMessage>) delayedMessage)) {
                    this.log.info("message has in the delayed queue, key: {}", sendMessageEntity.getKey());
                } else {
                    this.delayedMessageIds.add(sendMessageEntity.getKey());
                    if (this.log.isInfoEnabled()) {
                        this.log.info("add message to the delayed queue, key: {}", sendMessageEntity.getKey());
                    }
                }
            } else if (sendMessageEntity.getCreateAt().getTime() <= time.getTime()) {
                sendMessage(sendMessageEntity);
            }
        }
    }

    public void startDelayedTask() throws InterruptedException {
        this.executorService.execute(new Runnable() { // from class: org.onetwo.boot.mq.task.DelayableSendMessageTask.2
            @Override // java.lang.Runnable
            public void run() {
                boolean z = false;
                while (true) {
                    try {
                        try {
                            DelayedMessage delayedMessage = (DelayedMessage) DelayableSendMessageTask.this.delayedMessageQueue.take();
                            DelayableSendMessageTask.this.sendMessage(delayedMessage.getMessage());
                            DelayableSendMessageTask.this.delayedMessageIds.remove(delayedMessage.getMessage().getKey());
                            if (z) {
                                Thread.currentThread().interrupt();
                            }
                        } catch (InterruptedException e) {
                            DelayableSendMessageTask.this.log.error("delayed message queue has interrupted", e);
                            z = true;
                            if (1 != 0) {
                                Thread.currentThread().interrupt();
                            }
                        } catch (Exception e2) {
                            DelayableSendMessageTask.this.log.error("send delay message error", e2);
                            if (z) {
                                Thread.currentThread().interrupt();
                            }
                        }
                    } catch (Throwable th) {
                        if (z) {
                            Thread.currentThread().interrupt();
                        }
                        throw th;
                    }
                }
            }
        });
    }

    public void sendMessage(SendMessageEntity sendMessageEntity) {
        this.producerService.send(this.messageBodyStoreSerializer.deserialize(sendMessageEntity.getBody()), SendMessageFlags.DisableDatabaseTransactional);
        this.sendMessageRepository.updateToSent(sendMessageEntity);
        if (this.log.isInfoEnabled()) {
            this.log.info("sent message and mark status from database, key: {}", sendMessageEntity.getKey());
        }
    }

    private RedisLockRunner getRedisLockRunner() {
        return RedisLockRunner.createLocker(this.redisLockRegistry, LOCK_KEY, this.redisLockTimeout);
    }

    public void setRedisLockTimeout(String str) {
        this.redisLockTimeout = str;
    }

    public void destroy() throws Exception {
        if (this.executorService != null) {
            this.executorService.shutdown();
        }
    }
}
