package org.springframework.messaging.simp.broker;

import ch.qos.logback.core.spi.AbstractComponentTracker;
import java.security.Principal;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import org.springframework.messaging.simp.SimpMessageType;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.messaging.support.MessageHeaderAccessor;
import org.springframework.messaging.support.MessageHeaderInitializer;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.util.Assert;
import org.springframework.util.MultiValueMap;
import org.springframework.util.PathMatcher;

/* loaded from: input_file:BOOT-INF/lib/spring-messaging-6.2.5.jar:org/springframework/messaging/simp/broker/SimpleBrokerMessageHandler.class */
public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler {
    private static final byte[] EMPTY_PAYLOAD = new byte[0];

    @Nullable
    private PathMatcher pathMatcher;

    @Nullable
    private Integer cacheLimit;

    @Nullable
    private String selectorHeaderName;

    @Nullable
    private TaskScheduler taskScheduler;

    @Nullable
    private long[] heartbeatValue;

    @Nullable
    private MessageHeaderInitializer headerInitializer;
    private SubscriptionRegistry subscriptionRegistry;
    private final Map<String, SessionInfo> sessions;

    @Nullable
    private ScheduledFuture<?> heartbeatFuture;

    /* loaded from: input_file:BOOT-INF/lib/spring-messaging-6.2.5.jar:org/springframework/messaging/simp/broker/SimpleBrokerMessageHandler$HeartbeatTask.class */
    private class HeartbeatTask implements Runnable {
        private HeartbeatTask() {
        }

        @Override // java.lang.Runnable
        public void run() {
            long currentTimeMillis = System.currentTimeMillis();
            for (SessionInfo sessionInfo : SimpleBrokerMessageHandler.this.sessions.values()) {
                if (sessionInfo.getReadInterval() > 0 && currentTimeMillis - sessionInfo.getLastReadTime() > sessionInfo.getReadInterval()) {
                    SimpleBrokerMessageHandler.this.handleDisconnect(sessionInfo.getSessionId(), sessionInfo.getUser(), null);
                }
                if (sessionInfo.getWriteInterval() > 0 && currentTimeMillis - sessionInfo.getLastWriteTime() > sessionInfo.getWriteInterval()) {
                    SimpMessageHeaderAccessor create = SimpMessageHeaderAccessor.create(SimpMessageType.HEARTBEAT);
                    create.setSessionId(sessionInfo.getSessionId());
                    Principal user = sessionInfo.getUser();
                    if (user != null) {
                        create.setUser(user);
                    }
                    SimpleBrokerMessageHandler.this.initHeaders(create);
                    create.setLeaveMutable(true);
                    sessionInfo.getClientOutboundChannel().send(MessageBuilder.createMessage(SimpleBrokerMessageHandler.EMPTY_PAYLOAD, create.getMessageHeaders()));
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/spring-messaging-6.2.5.jar:org/springframework/messaging/simp/broker/SimpleBrokerMessageHandler$SessionInfo.class */
    public static class SessionInfo {
        private static final long HEARTBEAT_MULTIPLIER = 3;
        private final String sessionId;

        @Nullable
        private final Principal user;
        private final MessageChannel clientOutboundChannel;
        private final long readInterval;
        private final long writeInterval;
        private volatile long lastReadTime;
        private volatile long lastWriteTime;

        public SessionInfo(String str, @Nullable Principal principal, MessageChannel messageChannel, @Nullable long[] jArr, @Nullable long[] jArr2) {
            this.sessionId = str;
            this.user = principal;
            this.clientOutboundChannel = messageChannel;
            if (jArr == null || jArr2 == null) {
                this.readInterval = 0L;
                this.writeInterval = 0L;
            } else {
                this.readInterval = (jArr[0] <= 0 || jArr2[1] <= 0) ? 0L : Math.max(jArr[0], jArr2[1]) * 3;
                this.writeInterval = (jArr[1] <= 0 || jArr2[0] <= 0) ? 0L : Math.max(jArr[1], jArr2[0]);
            }
            long currentTimeMillis = System.currentTimeMillis();
            this.lastWriteTime = currentTimeMillis;
            this.lastReadTime = currentTimeMillis;
        }

        public String getSessionId() {
            return this.sessionId;
        }

        @Nullable
        public Principal getUser() {
            return this.user;
        }

        public MessageChannel getClientOutboundChannel() {
            return this.clientOutboundChannel;
        }

        public long getReadInterval() {
            return this.readInterval;
        }

        public long getWriteInterval() {
            return this.writeInterval;
        }

        public long getLastReadTime() {
            return this.lastReadTime;
        }

        public void setLastReadTime(long j) {
            this.lastReadTime = j;
        }

        public long getLastWriteTime() {
            return this.lastWriteTime;
        }

        public void setLastWriteTime(long j) {
            this.lastWriteTime = j;
        }
    }

    public SimpleBrokerMessageHandler(SubscribableChannel subscribableChannel, MessageChannel messageChannel, SubscribableChannel subscribableChannel2, Collection<String> collection) {
        super(subscribableChannel, messageChannel, subscribableChannel2, collection);
        this.sessions = new ConcurrentHashMap();
        this.subscriptionRegistry = new DefaultSubscriptionRegistry();
    }

    public void setSubscriptionRegistry(SubscriptionRegistry subscriptionRegistry) {
        Assert.notNull(subscriptionRegistry, "SubscriptionRegistry must not be null");
        this.subscriptionRegistry = subscriptionRegistry;
        initPathMatcherToUse();
        initCacheLimitToUse();
        initSelectorHeaderNameToUse();
    }

    public SubscriptionRegistry getSubscriptionRegistry() {
        return this.subscriptionRegistry;
    }

    public void setPathMatcher(@Nullable PathMatcher pathMatcher) {
        this.pathMatcher = pathMatcher;
        initPathMatcherToUse();
    }

    private void initPathMatcherToUse() {
        if (this.pathMatcher != null) {
            SubscriptionRegistry subscriptionRegistry = this.subscriptionRegistry;
            if (subscriptionRegistry instanceof DefaultSubscriptionRegistry) {
                ((DefaultSubscriptionRegistry) subscriptionRegistry).setPathMatcher(this.pathMatcher);
            }
        }
    }

    public void setCacheLimit(@Nullable Integer num) {
        this.cacheLimit = num;
        initCacheLimitToUse();
    }

    private void initCacheLimitToUse() {
        if (this.cacheLimit != null) {
            SubscriptionRegistry subscriptionRegistry = this.subscriptionRegistry;
            if (subscriptionRegistry instanceof DefaultSubscriptionRegistry) {
                ((DefaultSubscriptionRegistry) subscriptionRegistry).setCacheLimit(this.cacheLimit.intValue());
            }
        }
    }

    public void setSelectorHeaderName(@Nullable String str) {
        this.selectorHeaderName = str;
        initSelectorHeaderNameToUse();
    }

    private void initSelectorHeaderNameToUse() {
        SubscriptionRegistry subscriptionRegistry = this.subscriptionRegistry;
        if (subscriptionRegistry instanceof DefaultSubscriptionRegistry) {
            ((DefaultSubscriptionRegistry) subscriptionRegistry).setSelectorHeaderName(this.selectorHeaderName);
        }
    }

    public void setTaskScheduler(@Nullable TaskScheduler taskScheduler) {
        this.taskScheduler = taskScheduler;
        if (taskScheduler == null || this.heartbeatValue != null) {
            return;
        }
        this.heartbeatValue = new long[]{AbstractComponentTracker.LINGERING_TIMEOUT, AbstractComponentTracker.LINGERING_TIMEOUT};
    }

    @Nullable
    public TaskScheduler getTaskScheduler() {
        return this.taskScheduler;
    }

    public void setHeartbeatValue(@Nullable long[] jArr) {
        if (jArr != null && (jArr.length != 2 || jArr[0] < 0 || jArr[1] < 0)) {
            throw new IllegalArgumentException("Invalid heart-beat: " + Arrays.toString(jArr));
        }
        this.heartbeatValue = jArr;
    }

    @Nullable
    public long[] getHeartbeatValue() {
        return this.heartbeatValue;
    }

    public void setHeaderInitializer(@Nullable MessageHeaderInitializer messageHeaderInitializer) {
        this.headerInitializer = messageHeaderInitializer;
    }

    @Nullable
    public MessageHeaderInitializer getHeaderInitializer() {
        return this.headerInitializer;
    }

    @Override // org.springframework.messaging.simp.broker.AbstractBrokerMessageHandler
    public void startInternal() {
        publishBrokerAvailableEvent();
        if (this.taskScheduler == null) {
            Assert.isTrue(getHeartbeatValue() == null || (getHeartbeatValue()[0] == 0 && getHeartbeatValue()[1] == 0), "Heartbeat values configured but no TaskScheduler provided");
            return;
        }
        Duration initHeartbeatTaskDelay = initHeartbeatTaskDelay();
        if (initHeartbeatTaskDelay.toMillis() > 0) {
            this.heartbeatFuture = this.taskScheduler.scheduleWithFixedDelay(new HeartbeatTask(), initHeartbeatTaskDelay);
        }
    }

    private Duration initHeartbeatTaskDelay() {
        if (getHeartbeatValue() == null) {
            return Duration.ZERO;
        }
        if (getHeartbeatValue()[0] <= 0 || getHeartbeatValue()[1] <= 0) {
            return Duration.ofMillis(getHeartbeatValue()[0] > 0 ? getHeartbeatValue()[0] : getHeartbeatValue()[1]);
        }
        return Duration.ofMillis(Math.min(getHeartbeatValue()[0], getHeartbeatValue()[1]));
    }

    @Override // org.springframework.messaging.simp.broker.AbstractBrokerMessageHandler
    public void stopInternal() {
        publishBrokerUnavailableEvent();
        if (this.heartbeatFuture != null) {
            this.heartbeatFuture.cancel(true);
        }
    }

    @Override // org.springframework.messaging.simp.broker.AbstractBrokerMessageHandler
    protected void handleMessageInternal(Message<?> message) {
        MessageHeaders headers = message.getHeaders();
        String destination = SimpMessageHeaderAccessor.getDestination(headers);
        String sessionId = SimpMessageHeaderAccessor.getSessionId(headers);
        updateSessionReadTime(sessionId);
        if (checkDestinationPrefix(destination)) {
            SimpMessageType messageType = SimpMessageHeaderAccessor.getMessageType(headers);
            if (SimpMessageType.MESSAGE.equals(messageType)) {
                logMessage(message);
                sendMessageToSubscribers(destination, message);
                return;
            }
            if (!SimpMessageType.CONNECT.equals(messageType)) {
                if (SimpMessageType.DISCONNECT.equals(messageType)) {
                    logMessage(message);
                    if (sessionId != null) {
                        handleDisconnect(sessionId, SimpMessageHeaderAccessor.getUser(headers), message);
                        return;
                    }
                    return;
                }
                if (SimpMessageType.SUBSCRIBE.equals(messageType)) {
                    logMessage(message);
                    this.subscriptionRegistry.registerSubscription(message);
                    return;
                } else {
                    if (SimpMessageType.UNSUBSCRIBE.equals(messageType)) {
                        logMessage(message);
                        this.subscriptionRegistry.unregisterSubscription(message);
                        return;
                    }
                    return;
                }
            }
            logMessage(message);
            if (sessionId != null) {
                if (this.sessions.get(sessionId) != null) {
                    if (this.logger.isWarnEnabled()) {
                        this.logger.warn("Ignoring CONNECT in session " + sessionId + ". Already connected.");
                        return;
                    }
                    return;
                }
                long[] heartbeat = SimpMessageHeaderAccessor.getHeartbeat(headers);
                long[] heartbeatValue = getHeartbeatValue();
                Principal user = SimpMessageHeaderAccessor.getUser(headers);
                this.sessions.put(sessionId, new SessionInfo(sessionId, user, getClientOutboundChannelForSession(sessionId), heartbeat, heartbeatValue));
                SimpMessageHeaderAccessor create = SimpMessageHeaderAccessor.create(SimpMessageType.CONNECT_ACK);
                initHeaders(create);
                create.setSessionId(sessionId);
                if (user != null) {
                    create.setUser(user);
                }
                create.setHeader(SimpMessageHeaderAccessor.CONNECT_MESSAGE_HEADER, message);
                create.setHeader(SimpMessageHeaderAccessor.HEART_BEAT_HEADER, heartbeatValue);
                getClientOutboundChannel().send(MessageBuilder.createMessage(EMPTY_PAYLOAD, create.getMessageHeaders()));
            }
        }
    }

    private void updateSessionReadTime(@Nullable String str) {
        SessionInfo sessionInfo;
        if (str == null || (sessionInfo = this.sessions.get(str)) == null) {
            return;
        }
        sessionInfo.setLastReadTime(System.currentTimeMillis());
    }

    private void logMessage(Message<?> message) {
        if (this.logger.isDebugEnabled()) {
            SimpMessageHeaderAccessor simpMessageHeaderAccessor = (SimpMessageHeaderAccessor) MessageHeaderAccessor.getAccessor(message, SimpMessageHeaderAccessor.class);
            this.logger.debug("Processing " + (simpMessageHeaderAccessor != null ? simpMessageHeaderAccessor : SimpMessageHeaderAccessor.wrap(message)).getShortLogMessage(message.getPayload()));
        }
    }

    private void initHeaders(SimpMessageHeaderAccessor simpMessageHeaderAccessor) {
        if (getHeaderInitializer() != null) {
            getHeaderInitializer().initHeaders(simpMessageHeaderAccessor);
        }
    }

    private void handleDisconnect(String str, @Nullable Principal principal, @Nullable Message<?> message) {
        this.sessions.remove(str);
        this.subscriptionRegistry.unregisterAllSubscriptions(str);
        SimpMessageHeaderAccessor create = SimpMessageHeaderAccessor.create(SimpMessageType.DISCONNECT_ACK);
        create.setSessionId(str);
        if (principal != null) {
            create.setUser(principal);
        }
        if (message != null) {
            create.setHeader(SimpMessageHeaderAccessor.DISCONNECT_MESSAGE_HEADER, message);
        }
        initHeaders(create);
        getClientOutboundChannel().send(MessageBuilder.createMessage(EMPTY_PAYLOAD, create.getMessageHeaders()));
    }

    protected void sendMessageToSubscribers(@Nullable String str, Message<?> message) {
        MultiValueMap<String, String> findSubscriptions = this.subscriptionRegistry.findSubscriptions(message);
        if (!findSubscriptions.isEmpty() && this.logger.isDebugEnabled()) {
            this.logger.debug("Broadcasting to " + findSubscriptions.size() + " sessions.");
        }
        long currentTimeMillis = System.currentTimeMillis();
        findSubscriptions.forEach((str2, list) -> {
            Iterator it = list.iterator();
            while (it.hasNext()) {
                String str2 = (String) it.next();
                SimpMessageHeaderAccessor create = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
                initHeaders(create);
                create.setSessionId(str2);
                create.setSubscriptionId(str2);
                create.copyHeadersIfAbsent(message.getHeaders());
                create.setLeaveMutable(true);
                Message<?> createMessage = MessageBuilder.createMessage(message.getPayload(), create.getMessageHeaders());
                SessionInfo sessionInfo = this.sessions.get(str2);
                if (sessionInfo != null) {
                    try {
                        try {
                            sessionInfo.getClientOutboundChannel().send(createMessage);
                            sessionInfo.setLastWriteTime(currentTimeMillis);
                        } catch (Throwable th) {
                            if (this.logger.isErrorEnabled()) {
                                this.logger.error("Failed to send " + String.valueOf(message), th);
                            }
                            sessionInfo.setLastWriteTime(currentTimeMillis);
                        }
                    } catch (Throwable th2) {
                        sessionInfo.setLastWriteTime(currentTimeMillis);
                        throw th2;
                    }
                }
            }
        });
    }

    public String toString() {
        return "SimpleBrokerMessageHandler [" + String.valueOf(this.subscriptionRegistry) + "]";
    }
}
