package org.apache.cassandra.streaming.async;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import io.netty.channel.ChannelFuture;
import io.netty.util.concurrent.Future;
import java.io.IOError;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.ClosedByInterruptException;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.cassandra.concurrent.ExecutorFactory;
import org.apache.cassandra.concurrent.ExecutorPlus;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.streaming.StreamDeserializingTask;
import org.apache.cassandra.streaming.StreamSession;
import org.apache.cassandra.streaming.StreamingChannel;
import org.apache.cassandra.streaming.StreamingDataOutputPlus;
import org.apache.cassandra.streaming.messages.KeepAliveMessage;
import org.apache.cassandra.streaming.messages.OutgoingStreamMessage;
import org.apache.cassandra.streaming.messages.StreamMessage;
import org.apache.cassandra.utils.Clock;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.concurrent.BlockingQueues;
import org.apache.cassandra.utils.concurrent.ImmediateFuture;
import org.apache.cassandra.utils.concurrent.Semaphore;
import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/cassandra/streaming/async/StreamingMultiplexedChannel.class */
public class StreamingMultiplexedChannel {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) StreamingMultiplexedChannel.class);
    private static final int DEFAULT_MAX_PARALLEL_TRANSFERS = FBUtilities.getAvailableProcessors();
    private static final int MAX_PARALLEL_TRANSFERS = Integer.parseInt(System.getProperty("cassandra.streaming.session.parallelTransfers", Integer.toString(DEFAULT_MAX_PARALLEL_TRANSFERS)));
    private static final Semaphore fileTransferSemaphore = Semaphore.newFairSemaphore(DEFAULT_MAX_PARALLEL_TRANSFERS);
    private final StreamingChannel.Factory factory;
    private final InetAddressAndPort to;
    private final StreamSession session;
    private final int messagingVersion;
    private volatile boolean closed;
    private volatile StreamingChannel controlChannel;
    private final ExecutorPlus fileTransferExecutor;
    private final Collection<ScheduledFuture<?>> channelKeepAlives = BlockingQueues.newBlockingQueue();
    private final ConcurrentMap<Thread, StreamingChannel> threadToChannelMap = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/cassandra/streaming/async/StreamingMultiplexedChannel$FileStreamTask.class */
    public class FileStreamTask implements Runnable {
        private static final int SEMAPHORE_UNAVAILABLE_LOG_INTERVAL = 3;
        private final StreamMessage msg;
        private final InetAddressAndPort connectTo;

        FileStreamTask(OutgoingStreamMessage outgoingStreamMessage, InetAddressAndPort inetAddressAndPort) {
            this.msg = outgoingStreamMessage;
            this.connectTo = inetAddressAndPort;
        }

        FileStreamTask(StreamMessage streamMessage) {
            this.msg = streamMessage;
            this.connectTo = null;
        }

        @Override // java.lang.Runnable
        public void run() {
            if (acquirePermit(3)) {
                try {
                    try {
                        StreamingDataOutputPlus acquireOut = getOrCreateFileChannel(this.connectTo).acquireOut();
                        Throwable th = null;
                        try {
                            StreamMessage.serialize(this.msg, acquireOut, StreamingMultiplexedChannel.this.messagingVersion, StreamingMultiplexedChannel.this.session);
                            if (acquireOut != null) {
                                if (0 != 0) {
                                    try {
                                        acquireOut.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                } else {
                                    acquireOut.close();
                                }
                            }
                            StreamingMultiplexedChannel.fileTransferSemaphore.release(1);
                        } catch (Throwable th3) {
                            if (acquireOut != null) {
                                if (0 != 0) {
                                    try {
                                        acquireOut.close();
                                    } catch (Throwable th4) {
                                        th.addSuppressed(th4);
                                    }
                                } else {
                                    acquireOut.close();
                                }
                            }
                            throw th3;
                        }
                    } catch (Throwable th5) {
                        StreamingMultiplexedChannel.fileTransferSemaphore.release(1);
                        throw th5;
                    }
                } catch (Exception e) {
                    StreamingMultiplexedChannel.this.session.onError(e);
                    StreamingMultiplexedChannel.fileTransferSemaphore.release(1);
                } catch (Throwable th6) {
                    if (StreamingMultiplexedChannel.this.closed && (Throwables.getRootCause(th6) instanceof ClosedByInterruptException) && StreamingMultiplexedChannel.this.fileTransferExecutor.isShutdown()) {
                        StreamingMultiplexedChannel.logger.debug("{} Streaming channel was closed due to the executor pool being shutdown", StreamSession.createLogTag(StreamingMultiplexedChannel.this.session, (StreamingChannel) null));
                    } else {
                        JVMStabilityInspector.inspectThrowable(th6);
                        if (!StreamingMultiplexedChannel.this.session.state().isFinalState()) {
                            StreamingMultiplexedChannel.this.session.onError(th6);
                        }
                    }
                    StreamingMultiplexedChannel.fileTransferSemaphore.release(1);
                }
            }
        }

        boolean acquirePermit(int i) {
            long nanos = TimeUnit.MINUTES.toNanos(i);
            long nanoTime = Clock.Global.nanoTime();
            while (!StreamingMultiplexedChannel.this.closed) {
                try {
                    if (StreamingMultiplexedChannel.fileTransferSemaphore.tryAcquire(1, 1L, TimeUnit.SECONDS)) {
                        return true;
                    }
                    long nanoTime2 = Clock.Global.nanoTime();
                    if (nanoTime2 - nanoTime > nanos) {
                        nanoTime = nanoTime2;
                        OutgoingStreamMessage outgoingStreamMessage = (OutgoingStreamMessage) this.msg;
                        if (StreamingMultiplexedChannel.logger.isInfoEnabled()) {
                            StreamingMultiplexedChannel.logger.info("{} waiting to acquire a permit to begin streaming {}. This message logs every {} minutes", StreamSession.createLogTag(StreamingMultiplexedChannel.this.session), outgoingStreamMessage.getName(), Integer.valueOf(i));
                        }
                    }
                } catch (InterruptedException e) {
                    throw new UncheckedInterruptedException(e);
                }
            }
            return false;
        }

        private StreamingChannel getOrCreateFileChannel(InetAddressAndPort inetAddressAndPort) {
            Thread currentThread = Thread.currentThread();
            try {
                StreamingChannel streamingChannel = (StreamingChannel) StreamingMultiplexedChannel.this.threadToChannelMap.get(currentThread);
                if (streamingChannel != null) {
                    return streamingChannel;
                }
                StreamingChannel createFileChannel = StreamingMultiplexedChannel.this.createFileChannel(inetAddressAndPort);
                StreamingMultiplexedChannel.this.threadToChannelMap.put(currentThread, createFileChannel);
                return createFileChannel;
            } catch (Exception e) {
                throw new IOError(e);
            }
        }

        void injectChannel(StreamingChannel streamingChannel) {
            Thread currentThread = Thread.currentThread();
            if (StreamingMultiplexedChannel.this.threadToChannelMap.get(currentThread) != null) {
                throw new IllegalStateException("previous channel already set");
            }
            StreamingMultiplexedChannel.this.threadToChannelMap.put(currentThread, streamingChannel);
        }

        void unsetChannel() {
            StreamingMultiplexedChannel.this.threadToChannelMap.remove(Thread.currentThread());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/cassandra/streaming/async/StreamingMultiplexedChannel$KeepAliveTask.class */
    public class KeepAliveTask implements Runnable {
        private final StreamingChannel channel;
        ScheduledFuture<?> future;

        KeepAliveTask(StreamingChannel streamingChannel) {
            this.channel = streamingChannel;
        }

        @Override // java.lang.Runnable
        public void run() {
            if (!this.channel.connected() || StreamingMultiplexedChannel.this.closed) {
                if (null != this.future) {
                    this.future.cancel(false);
                }
            } else {
                if (StreamingMultiplexedChannel.logger.isTraceEnabled()) {
                    StreamingMultiplexedChannel.logger.trace("{} Sending keep-alive to {}.", StreamSession.createLogTag(StreamingMultiplexedChannel.this.session, this.channel), StreamingMultiplexedChannel.this.session.peer);
                }
                StreamingMultiplexedChannel.this.sendControlMessage(new KeepAliveMessage()).addListener2(future -> {
                    if (future.isSuccess() || future.isCancelled() || !StreamingMultiplexedChannel.logger.isDebugEnabled()) {
                        return;
                    }
                    StreamingMultiplexedChannel.logger.debug("{} Could not send keep-alive message (perhaps stream session is finished?).", StreamSession.createLogTag(StreamingMultiplexedChannel.this.session, this.channel), future.cause());
                });
            }
        }
    }

    public StreamingMultiplexedChannel(StreamSession streamSession, StreamingChannel.Factory factory, InetAddressAndPort inetAddressAndPort, @Nullable StreamingChannel streamingChannel, int i) {
        this.session = streamSession;
        this.factory = factory;
        this.to = inetAddressAndPort;
        this.messagingVersion = i;
        this.controlChannel = streamingChannel;
        this.fileTransferExecutor = (ExecutorPlus) ExecutorFactory.Global.executorFactory().configurePooled("NettyStreaming-Outbound-" + streamSession.peer.toString().replace(':', '.'), MAX_PARALLEL_TRANSFERS).withKeepAlive(1L, TimeUnit.SECONDS).build();
    }

    public InetAddressAndPort peer() {
        return this.to;
    }

    public InetSocketAddress connectedTo() {
        return this.controlChannel == null ? this.to : this.controlChannel.connectedTo();
    }

    private void setupControlMessageChannel() throws IOException {
        if (this.controlChannel == null) {
            this.controlChannel = createControlChannel();
        }
    }

    private StreamingChannel createControlChannel() throws IOException {
        logger.debug("Creating stream session to {} as {}", this.to, this.session.isFollower() ? "follower" : "initiator");
        StreamingChannel create = this.factory.create(this.to, this.messagingVersion, StreamingChannel.Kind.CONTROL);
        ExecutorFactory.Global.executorFactory().startThread(String.format("Stream-Deserializer-%s-%s", this.to.toString(), create.id()), new StreamDeserializingTask(this.session, create, this.messagingVersion));
        this.session.attachInbound(create);
        this.session.attachOutbound(create);
        scheduleKeepAliveTask(create);
        logger.debug("Creating control {}", create.description());
        return create;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public StreamingChannel createFileChannel(InetAddressAndPort inetAddressAndPort) throws IOException {
        logger.debug("Creating stream session to {} as {}", this.to, this.session.isFollower() ? "follower" : "initiator");
        StreamingChannel create = this.factory.create(this.to, inetAddressAndPort, this.messagingVersion, StreamingChannel.Kind.FILE);
        this.session.attachOutbound(create);
        logger.debug("Creating file {}", create.description());
        return create;
    }

    public Future<?> sendControlMessage(StreamMessage streamMessage) {
        try {
            setupControlMessageChannel();
            return sendMessage(this.controlChannel, streamMessage);
        } catch (Exception e) {
            close();
            this.session.onError(e);
            return ImmediateFuture.failure(e);
        }
    }

    public Future<?> sendMessage(StreamingChannel streamingChannel, StreamMessage streamMessage) {
        if (this.closed) {
            throw new RuntimeException("stream has been closed, cannot send " + streamMessage);
        }
        if (streamMessage instanceof OutgoingStreamMessage) {
            if (this.session.isPreview()) {
                throw new RuntimeException("Cannot send stream data messages for preview streaming sessions");
            }
            if (logger.isDebugEnabled()) {
                logger.debug("{} Sending {}", StreamSession.createLogTag(this.session), streamMessage);
            }
            return this.fileTransferExecutor.submit((Runnable) new FileStreamTask((OutgoingStreamMessage) streamMessage, this.factory.supportsPreferredIp() ? SystemKeyspace.getPreferredIP(this.to) : this.to));
        }
        try {
            Future<?> send = streamingChannel.send(intFunction -> {
                long serializedSize = StreamMessage.serializedSize(streamMessage, this.messagingVersion);
                if (serializedSize > 1073741824) {
                    throw new IllegalStateException(String.format("%s something is seriously wrong with the calculated stream control message's size: %d bytes, type is %s", StreamSession.createLogTag(this.session, this.controlChannel.id()), Long.valueOf(serializedSize), streamMessage.type));
                }
                StreamingDataOutputPlus streamingDataOutputPlus = (StreamingDataOutputPlus) intFunction.apply((int) serializedSize);
                Throwable th = null;
                try {
                    try {
                        StreamMessage.serialize(streamMessage, streamingDataOutputPlus, this.messagingVersion, this.session);
                        if (streamingDataOutputPlus != null) {
                            if (0 == 0) {
                                streamingDataOutputPlus.close();
                                return;
                            }
                            try {
                                streamingDataOutputPlus.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        }
                    } catch (Throwable th3) {
                        th = th3;
                        throw th3;
                    }
                } catch (Throwable th4) {
                    if (streamingDataOutputPlus != null) {
                        if (th != null) {
                            try {
                                streamingDataOutputPlus.close();
                            } catch (Throwable th5) {
                                th.addSuppressed(th5);
                            }
                        } else {
                            streamingDataOutputPlus.close();
                        }
                    }
                    throw th4;
                }
            });
            send.addListener2(future -> {
                onMessageComplete(future, streamMessage);
            });
            return send;
        } catch (Exception e) {
            close();
            this.session.onError(e);
            return ImmediateFuture.failure(e);
        }
    }

    Future<?> onMessageComplete(Future<?> future, StreamMessage streamMessage) {
        Throwable cause = future.cause();
        if (cause == null) {
            return null;
        }
        logger.error("{} failed to send a stream message/data to peer {}: msg = {}", StreamSession.createLogTag(this.session, future instanceof ChannelFuture ? ((ChannelFuture) future).channel() : null), this.to, streamMessage, future.cause());
        return this.session.onError(cause);
    }

    private void scheduleKeepAliveTask(StreamingChannel streamingChannel) {
        int streamingKeepAlivePeriod;
        if ((streamingChannel instanceof NettyStreamingChannel) && (streamingKeepAlivePeriod = DatabaseDescriptor.getStreamingKeepAlivePeriod()) > 0) {
            if (logger.isDebugEnabled()) {
                logger.debug("{} Scheduling keep-alive task with {}s period.", StreamSession.createLogTag(this.session, streamingChannel), Integer.valueOf(streamingKeepAlivePeriod));
            }
            KeepAliveTask keepAliveTask = new KeepAliveTask(streamingChannel);
            io.netty.util.concurrent.ScheduledFuture<?> scheduleAtFixedRate = ((NettyStreamingChannel) streamingChannel).channel.eventLoop().scheduleAtFixedRate((Runnable) keepAliveTask, streamingKeepAlivePeriod, streamingKeepAlivePeriod, TimeUnit.SECONDS);
            keepAliveTask.future = scheduleAtFixedRate;
            this.channelKeepAlives.add(scheduleAtFixedRate);
        }
    }

    public void setClosed() {
        this.closed = true;
    }

    void setControlChannel(NettyStreamingChannel nettyStreamingChannel) {
        this.controlChannel = nettyStreamingChannel;
    }

    int semaphoreAvailablePermits() {
        return fileTransferSemaphore.permits();
    }

    public boolean connected() {
        return !this.closed && (this.controlChannel == null || this.controlChannel.connected());
    }

    public void close() {
        if (this.closed) {
            return;
        }
        this.closed = true;
        if (logger.isDebugEnabled()) {
            logger.debug("{} Closing stream connection channels on {}", StreamSession.createLogTag(this.session), this.to);
        }
        Iterator<ScheduledFuture<?>> it = this.channelKeepAlives.iterator();
        while (it.hasNext()) {
            it.next().cancel(false);
        }
        this.channelKeepAlives.clear();
        this.threadToChannelMap.values().forEach((v0) -> {
            v0.close();
        });
        this.threadToChannelMap.clear();
        this.fileTransferExecutor.shutdownNow();
    }

    @VisibleForTesting
    public void unsafeCloseControlChannel() {
        logger.warn("Unsafe close of control channel");
        this.controlChannel.close().mo8003awaitUninterruptibly();
    }
}
