package org.apache.cassandra.streaming;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.cassandra.concurrent.ExecutorFactory;
import org.apache.cassandra.concurrent.ScheduledExecutorPlus;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.streaming.messages.OutgoingStreamMessage;
import org.apache.cassandra.utils.ExecutorUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/cassandra/streaming/StreamTransferTask.class */
public class StreamTransferTask extends StreamTask {
    private static final Logger logger;
    private static final ScheduledExecutorPlus timeoutExecutor;
    private final AtomicInteger sequenceNumber;
    private boolean aborted;

    @VisibleForTesting
    protected final Map<Integer, OutgoingStreamMessage> streams;
    private final Map<Integer, ScheduledFuture<?>> timeoutTasks;
    private long totalSize;
    private int totalFiles;
    static final /* synthetic */ boolean $assertionsDisabled;

    public StreamTransferTask(StreamSession streamSession, TableId tableId) {
        super(streamSession, tableId);
        this.sequenceNumber = new AtomicInteger(0);
        this.aborted = false;
        this.streams = new HashMap();
        this.timeoutTasks = new HashMap();
        this.totalSize = 0L;
        this.totalFiles = 0;
    }

    public synchronized void addTransferStream(OutgoingStream outgoingStream) {
        Preconditions.checkArgument(this.tableId.equals(outgoingStream.getTableId()));
        OutgoingStreamMessage reportOutgoingStream = StreamHook.instance.reportOutgoingStream(this.session, outgoingStream, new OutgoingStreamMessage(this.tableId, this.session, outgoingStream, this.sequenceNumber.getAndIncrement()));
        this.streams.put(Integer.valueOf(reportOutgoingStream.header.sequenceNumber), reportOutgoingStream);
        this.totalSize += reportOutgoingStream.stream.getEstimatedSize();
        this.totalFiles += reportOutgoingStream.stream.getNumFiles();
    }

    public void complete(int i) {
        boolean isEmpty;
        synchronized (this) {
            ScheduledFuture<?> remove = this.timeoutTasks.remove(Integer.valueOf(i));
            if (remove != null) {
                remove.cancel(false);
            }
            OutgoingStreamMessage remove2 = this.streams.remove(Integer.valueOf(i));
            if (remove2 != null) {
                remove2.complete();
            }
            logger.debug("received sequenceNumber {}, remaining files {}", Integer.valueOf(i), this.streams.keySet());
            isEmpty = this.streams.isEmpty();
        }
        if (isEmpty) {
            this.session.taskCompleted(this);
        }
    }

    public void timeout(int i) {
        synchronized (this) {
            this.timeoutTasks.remove(Integer.valueOf(i));
            OutgoingStreamMessage remove = this.streams.remove(Integer.valueOf(i));
            if (remove == null) {
                return;
            }
            remove.complete();
            logger.debug("timeout sequenceNumber {}, remaining files {}", Integer.valueOf(i), this.streams.keySet());
            this.session.sessionTimeout();
        }
    }

    @Override // org.apache.cassandra.streaming.StreamTask
    public synchronized void abort() {
        if (this.aborted) {
            return;
        }
        this.aborted = true;
        Iterator<ScheduledFuture<?>> it = this.timeoutTasks.values().iterator();
        while (it.hasNext()) {
            it.next().cancel(false);
        }
        this.timeoutTasks.clear();
        Throwable th = null;
        Iterator<OutgoingStreamMessage> it2 = this.streams.values().iterator();
        while (it2.hasNext()) {
            try {
                it2.next().complete();
            } catch (Throwable th2) {
                if (th == null) {
                    th = th2;
                } else {
                    th.addSuppressed(th2);
                }
            }
        }
        this.streams.clear();
        if (th != null) {
            Throwables.propagate(th);
        }
    }

    @Override // org.apache.cassandra.streaming.StreamTask
    public synchronized int getTotalNumberOfFiles() {
        return this.totalFiles;
    }

    @Override // org.apache.cassandra.streaming.StreamTask
    public long getTotalSize() {
        return this.totalSize;
    }

    public synchronized Collection<OutgoingStreamMessage> getFileMessages() {
        return new ArrayList(this.streams.values());
    }

    public synchronized OutgoingStreamMessage createMessageForRetry(int i) {
        ScheduledFuture<?> remove = this.timeoutTasks.remove(Integer.valueOf(i));
        if (remove != null) {
            remove.cancel(false);
        }
        return this.streams.get(Integer.valueOf(i));
    }

    public synchronized ScheduledFuture<?> scheduleTimeout(int i, long j, TimeUnit timeUnit) {
        if (!this.streams.containsKey(Integer.valueOf(i))) {
            return null;
        }
        ScheduledFuture<?> scheduleTimeoutWithDelay = timeoutExecutor.scheduleTimeoutWithDelay(() -> {
            timeout(i);
        }, j, timeUnit);
        ScheduledFuture<?> put = this.timeoutTasks.put(Integer.valueOf(i), scheduleTimeoutWithDelay);
        if ($assertionsDisabled || put == null) {
            return scheduleTimeoutWithDelay;
        }
        throw new AssertionError();
    }

    @VisibleForTesting
    public static void shutdownAndWait(long j, TimeUnit timeUnit) throws InterruptedException, TimeoutException {
        ExecutorUtils.shutdownAndWait(j, timeUnit, timeoutExecutor);
    }

    static {
        $assertionsDisabled = !StreamTransferTask.class.desiredAssertionStatus();
        logger = LoggerFactory.getLogger((Class<?>) StreamTransferTask.class);
        timeoutExecutor = ExecutorFactory.Global.executorFactory().scheduled(false, "StreamingTransferTaskTimeouts");
    }
}
