package org.apache.cassandra.db.streaming;

import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.PartitionPosition;
import org.apache.cassandra.db.lifecycle.SSTableIntervalTree;
import org.apache.cassandra.db.lifecycle.SSTableSet;
import org.apache.cassandra.db.lifecycle.View;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.locator.RangesAtEndpoint;
import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.streaming.IncomingStream;
import org.apache.cassandra.streaming.OutgoingStream;
import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.streaming.StreamReceiver;
import org.apache.cassandra.streaming.StreamSession;
import org.apache.cassandra.streaming.TableStreamManager;
import org.apache.cassandra.streaming.messages.StreamMessageHeader;
import org.apache.cassandra.utils.TimeUUID;
import org.apache.cassandra.utils.concurrent.Ref;
import org.apache.cassandra.utils.concurrent.Refs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/cassandra/db/streaming/CassandraStreamManager.class */
public class CassandraStreamManager implements TableStreamManager {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) CassandraStreamManager.class);
    private final ColumnFamilyStore cfs;

    public CassandraStreamManager(ColumnFamilyStore columnFamilyStore) {
        this.cfs = columnFamilyStore;
    }

    @Override // org.apache.cassandra.streaming.TableStreamManager
    public IncomingStream prepareIncomingStream(StreamSession streamSession, StreamMessageHeader streamMessageHeader) {
        return new CassandraIncomingFile(this.cfs, streamSession, streamMessageHeader);
    }

    @Override // org.apache.cassandra.streaming.TableStreamManager
    public StreamReceiver createStreamReceiver(StreamSession streamSession, int i) {
        return new CassandraStreamReceiver(this.cfs, streamSession, i);
    }

    @Override // org.apache.cassandra.streaming.TableStreamManager
    public Collection<OutgoingStream> createOutgoingStreams(StreamSession streamSession, RangesAtEndpoint rangesAtEndpoint, TimeUUID timeUUID, PreviewKind previewKind) {
        Refs<SSTableReader> refs = new Refs<>();
        try {
            ArrayList arrayList = new ArrayList(rangesAtEndpoint.size());
            Iterator<Replica> it = rangesAtEndpoint.iterator();
            while (it.hasNext()) {
                arrayList.add(Range.makeRowRange(it.next().range()));
            }
            refs.addAll(this.cfs.selectAndReference(view -> {
                HashSet newHashSet = Sets.newHashSet();
                SSTableIntervalTree build = SSTableIntervalTree.build(view.select(SSTableSet.CANONICAL));
                Predicate<SSTableReader> predicate = previewKind.isPreview() ? previewKind.predicate() : timeUUID == ActiveRepairService.NO_PENDING_REPAIR ? Predicates.alwaysTrue() : sSTableReader -> {
                    return sSTableReader.isPendingRepair() && sSTableReader.getSSTableMetadata().pendingRepair.equals(timeUUID);
                };
                Iterator it2 = arrayList.iterator();
                while (it2.hasNext()) {
                    Range range = (Range) it2.next();
                    Iterator it3 = Iterables.filter(View.sstablesInBounds((PartitionPosition) range.left, (PartitionPosition) range.right, build), predicate).iterator();
                    while (it3.hasNext()) {
                        newHashSet.add((SSTableReader) it3.next());
                    }
                }
                if (logger.isDebugEnabled()) {
                    logger.debug("ViewFilter for {}/{} sstables", Integer.valueOf(newHashSet.size()), Integer.valueOf(Iterables.size(view.select(SSTableSet.CANONICAL))));
                }
                return newHashSet;
            }).refs);
            this.cfs.writeAndAddMemtableRanges(streamSession.getPendingRepair(), () -> {
                return Range.normalize(arrayList);
            }, refs);
            List normalize = Range.normalize(rangesAtEndpoint.onlyFull().ranges());
            List normalize2 = Range.normalize(rangesAtEndpoint.ranges());
            ArrayList arrayList2 = new ArrayList(refs.size());
            Iterator<SSTableReader> it2 = refs.iterator();
            while (it2.hasNext()) {
                SSTableReader next = it2.next();
                List list = next.isRepaired() ? normalize : normalize2;
                List<SSTableReader.PartitionPositionBounds> positionsForRanges = next.getPositionsForRanges(list);
                Ref<SSTableReader> ref = refs.get(next);
                if (positionsForRanges.isEmpty()) {
                    ref.release();
                } else {
                    arrayList2.add(new CassandraOutgoingFile(streamSession.getStreamOperation(), ref, positionsForRanges, list, next.estimatedKeysForRanges(list)));
                }
            }
            return arrayList2;
        } catch (Throwable th) {
            refs.release();
            throw th;
        }
    }
}
