package io.rsocket.resume;

import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;
import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.security.oauth2.core.endpoint.OAuth2ParameterNames;
import reactor.core.CoreSubscriber;
import reactor.core.Fuseable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.core.publisher.Sinks;
import reactor.util.annotation.Nullable;

/* loaded from: input_file:BOOT-INF/lib/rsocket-core-1.1.5.jar:io/rsocket/resume/InMemoryResumableFramesStore.class */
public class InMemoryResumableFramesStore extends Flux<ByteBuf> implements ResumableFramesStore, Subscription {
    private FramesSubscriber framesSubscriber;
    final Sinks.Empty<Void> disposed = Sinks.empty();
    final Queue<ByteBuf> cachedFrames = new ArrayDeque();
    final String side;
    final String session;
    final int cacheLimit;
    volatile long impliedPosition;
    volatile long firstAvailableFramePosition;
    long remoteImpliedPosition;
    int cacheSize;
    Throwable terminal;
    CoreSubscriber<? super ByteBuf> actual;
    CoreSubscriber<? super ByteBuf> pendingActual;
    volatile long state;
    static final long FINALIZED_FLAG = Long.MIN_VALUE;
    static final long DISPOSED_FLAG = 4611686018427387904L;
    static final long TERMINATED_FLAG = 2305843009213693952L;
    static final long CONNECTED_FLAG = 1152921504606846976L;
    static final long PENDING_CONNECTION_FLAG = 576460752303423488L;
    static final long REMOTE_IMPLIED_POSITION_CHANGED_FLAG = 288230376151711744L;
    static final long HAS_FRAME_FLAG = 144115188075855872L;
    static final long MAX_WORK_IN_PROGRESS = 72057594037927935L;
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) InMemoryResumableFramesStore.class);
    static final AtomicLongFieldUpdater<InMemoryResumableFramesStore> IMPLIED_POSITION = AtomicLongFieldUpdater.newUpdater(InMemoryResumableFramesStore.class, "impliedPosition");
    static final AtomicLongFieldUpdater<InMemoryResumableFramesStore> FIRST_AVAILABLE_FRAME_POSITION = AtomicLongFieldUpdater.newUpdater(InMemoryResumableFramesStore.class, "firstAvailableFramePosition");
    static final AtomicLongFieldUpdater<InMemoryResumableFramesStore> STATE = AtomicLongFieldUpdater.newUpdater(InMemoryResumableFramesStore.class, OAuth2ParameterNames.STATE);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:BOOT-INF/lib/rsocket-core-1.1.5.jar:io/rsocket/resume/InMemoryResumableFramesStore$FramesSubscriber.class */
    public static class FramesSubscriber implements CoreSubscriber<ByteBuf>, Fuseable.QueueSubscription<Void> {
        final CoreSubscriber<? super Void> actual;
        final InMemoryResumableFramesStore parent;
        Fuseable.QueueSubscription<ByteBuf> qs;
        boolean done;

        FramesSubscriber(CoreSubscriber<? super Void> coreSubscriber, InMemoryResumableFramesStore inMemoryResumableFramesStore) {
            this.actual = coreSubscriber;
            this.parent = inMemoryResumableFramesStore;
        }

        @Override // reactor.core.CoreSubscriber, org.reactivestreams.Subscriber
        public void onSubscribe(Subscription subscription) {
            if (Operators.validate(this.qs, subscription)) {
                Fuseable.QueueSubscription<ByteBuf> queueSubscription = (Fuseable.QueueSubscription) subscription;
                this.qs = queueSubscription;
                if (queueSubscription.requestFusion(3) == 2) {
                    this.actual.onSubscribe(this);
                    return;
                }
                subscription.cancel();
                this.actual.onSubscribe(this);
                this.actual.onError(new IllegalStateException("Source has to be ASYNC fuseable"));
            }
        }

        @Override // org.reactivestreams.Subscriber
        public void onNext(ByteBuf byteBuf) {
            InMemoryResumableFramesStore inMemoryResumableFramesStore = this.parent;
            long markFrameAdded = InMemoryResumableFramesStore.markFrameAdded(inMemoryResumableFramesStore);
            if (InMemoryResumableFramesStore.isFinalized(markFrameAdded)) {
                this.qs.clear();
            } else {
                if (InMemoryResumableFramesStore.isWorkInProgress(markFrameAdded)) {
                    return;
                }
                if (InMemoryResumableFramesStore.isConnected(markFrameAdded) || InMemoryResumableFramesStore.hasPendingConnection(markFrameAdded)) {
                    inMemoryResumableFramesStore.drain((markFrameAdded + 1) | InMemoryResumableFramesStore.HAS_FRAME_FLAG);
                }
            }
        }

        @Override // org.reactivestreams.Subscriber
        public void onError(Throwable th) {
            if (this.done) {
                Operators.onErrorDropped(th, this.actual.currentContext());
                return;
            }
            InMemoryResumableFramesStore inMemoryResumableFramesStore = this.parent;
            inMemoryResumableFramesStore.terminal = th;
            this.done = true;
            long markTerminated = InMemoryResumableFramesStore.markTerminated(inMemoryResumableFramesStore);
            if (InMemoryResumableFramesStore.isFinalized(markTerminated)) {
                Operators.onErrorDropped(th, this.actual.currentContext());
            } else {
                if (InMemoryResumableFramesStore.isWorkInProgress(markTerminated)) {
                    return;
                }
                inMemoryResumableFramesStore.drain((markTerminated + 1) | InMemoryResumableFramesStore.TERMINATED_FLAG);
            }
        }

        @Override // org.reactivestreams.Subscriber
        public void onComplete() {
            if (this.done) {
                return;
            }
            InMemoryResumableFramesStore inMemoryResumableFramesStore = this.parent;
            this.done = true;
            long markTerminated = InMemoryResumableFramesStore.markTerminated(inMemoryResumableFramesStore);
            if (InMemoryResumableFramesStore.isFinalized(markTerminated) || InMemoryResumableFramesStore.isWorkInProgress(markTerminated)) {
                return;
            }
            inMemoryResumableFramesStore.drain((markTerminated + 1) | InMemoryResumableFramesStore.TERMINATED_FLAG);
        }

        @Override // org.reactivestreams.Subscription
        public void cancel() {
            if (this.done) {
                return;
            }
            this.done = true;
            long markTerminated = InMemoryResumableFramesStore.markTerminated(this.parent);
            if (InMemoryResumableFramesStore.isFinalized(markTerminated) || InMemoryResumableFramesStore.isWorkInProgress(markTerminated)) {
                return;
            }
            this.parent.drain(markTerminated | InMemoryResumableFramesStore.TERMINATED_FLAG);
        }

        @Override // org.reactivestreams.Subscription
        public void request(long j) {
        }

        @Override // reactor.core.Fuseable.QueueSubscription
        public int requestFusion(int i) {
            return 0;
        }

        @Override // java.util.Queue
        public Void poll() {
            return null;
        }

        @Override // java.util.Collection
        public int size() {
            return 0;
        }

        @Override // java.util.Collection
        public boolean isEmpty() {
            return false;
        }

        @Override // java.util.Collection
        public void clear() {
        }
    }

    public InMemoryResumableFramesStore(String str, ByteBuf byteBuf, int i) {
        this.side = str;
        this.session = byteBuf.toString(CharsetUtil.UTF_8);
        this.cacheLimit = i;
    }

    @Override // io.rsocket.resume.ResumableFramesStore
    public Mono<Void> saveFrames(Flux<ByteBuf> flux) {
        return flux.transform(Operators.lift((scannable, coreSubscriber) -> {
            FramesSubscriber framesSubscriber = new FramesSubscriber(coreSubscriber, this);
            this.framesSubscriber = framesSubscriber;
            return framesSubscriber;
        })).then();
    }

    @Override // io.rsocket.resume.ResumableFramesStore
    public void releaseFrames(long j) {
        if (this.remoteImpliedPosition > j) {
            throw new IllegalStateException("Given Remote Implied Position is behind the last received Remote Implied Position");
        }
        this.remoteImpliedPosition = j;
        long markRemoteImpliedPositionChanged = markRemoteImpliedPositionChanged(this);
        if (isFinalized(markRemoteImpliedPositionChanged) || isWorkInProgress(markRemoteImpliedPositionChanged)) {
            return;
        }
        drain((markRemoteImpliedPositionChanged + 1) | REMOTE_IMPLIED_POSITION_CHANGED_FLAG);
    }

    void drain(long j) {
        Fuseable.QueueSubscription<ByteBuf> queueSubscription = this.framesSubscriber.qs;
        Queue<ByteBuf> queue = this.cachedFrames;
        do {
            if (hasRemoteImpliedPositionChanged(j)) {
                j = handlePendingRemoteImpliedPositionChanges(j, queue);
            }
            if (hasPendingConnection(j)) {
                j = handlePendingConnection(j, queue);
            }
            if (isConnected(j)) {
                if (isTerminated(j)) {
                    handleTerminated(queueSubscription, this.terminal);
                } else if (isDisposed()) {
                    handleDisposed();
                } else if (hasFrames(j)) {
                    handlePendingFrames(queueSubscription);
                }
            }
            if (isDisposed(j) || isTerminated(j)) {
                clearAndFinalize(this);
                return;
            } else {
                j = markWorkDone(this, j);
                if (isFinalized(j)) {
                    return;
                }
            }
        } while (isWorkInProgress(j));
    }

    long handlePendingRemoteImpliedPositionChanges(long j, Queue<ByteBuf> queue) {
        long j2 = this.remoteImpliedPosition;
        long j3 = this.firstAvailableFramePosition;
        long max = Math.max(0L, j2 - j3);
        if (max > 0) {
            int dropFramesFromCache = dropFramesFromCache(max, queue);
            if (max > dropFramesFromCache) {
                this.terminal = new IllegalStateException(String.format("Local and remote state disagreement: need to remove additional %d bytes, but cache is empty", Long.valueOf(max)));
                j = markTerminated(this) | TERMINATED_FLAG;
            }
            if (max < dropFramesFromCache) {
                this.terminal = new IllegalStateException("Local and remote state disagreement: local and remote frame sizes are not equal");
                j = markTerminated(this) | TERMINATED_FLAG;
            }
            FIRST_AVAILABLE_FRAME_POSITION.lazySet(this, j3 + dropFramesFromCache);
            if (this.cacheLimit != Integer.MAX_VALUE) {
                this.cacheSize -= dropFramesFromCache;
                if (logger.isDebugEnabled()) {
                    logger.debug("Side[{}]|Session[{}]. Removed frames from cache to position[{}]. CacheSize[{}]", this.side, this.session, Long.valueOf(this.remoteImpliedPosition), Integer.valueOf(this.cacheSize));
                }
            }
        }
        return j;
    }

    void handlePendingFrames(Fuseable.QueueSubscription<ByteBuf> queueSubscription) {
        do {
            ByteBuf poll = queueSubscription.poll();
            if (poll == null) {
                return;
            } else {
                handleFrame(poll);
            }
        } while (isConnected(this.state));
    }

    long handlePendingConnection(long j, Queue<ByteBuf> queue) {
        CoreSubscriber<? super ByteBuf> coreSubscriber = null;
        while (true) {
            CoreSubscriber<? super ByteBuf> coreSubscriber2 = coreSubscriber;
            CoreSubscriber<? super ByteBuf> coreSubscriber3 = this.pendingActual;
            if (coreSubscriber3 != coreSubscriber2) {
                Iterator<ByteBuf> it = queue.iterator();
                while (it.hasNext()) {
                    coreSubscriber3.onNext(it.next().retainedSlice());
                }
            }
            j = markConnected(this, j);
            if (isConnected(j)) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Side[{}]|Session[{}]. Connected at Position[{}] and ImpliedPosition[{}]", this.side, this.session, Long.valueOf(this.firstAvailableFramePosition), Long.valueOf(this.impliedPosition));
                }
                this.actual = coreSubscriber3;
            } else {
                if (!hasPendingConnection(j)) {
                    break;
                }
                coreSubscriber = coreSubscriber3;
            }
        }
        return j;
    }

    static int dropFramesFromCache(long j, Queue<ByteBuf> queue) {
        int i;
        int i2 = 0;
        while (true) {
            i = i2;
            if (j <= i || queue.size() <= 0) {
                break;
            }
            ByteBuf poll = queue.poll();
            int readableBytes = poll.readableBytes();
            poll.release();
            i2 = i + readableBytes;
        }
        return i;
    }

    @Override // io.rsocket.resume.ResumableFramesStore
    public Flux<ByteBuf> resumeStream() {
        return this;
    }

    @Override // io.rsocket.resume.ResumableFramesStore
    public long framePosition() {
        return this.firstAvailableFramePosition;
    }

    @Override // io.rsocket.resume.ResumableFramesStore
    public long frameImpliedPosition() {
        return this.impliedPosition & Long.MAX_VALUE;
    }

    @Override // io.rsocket.resume.ResumableFramesStore
    public boolean resumableFrameReceived(ByteBuf byteBuf) {
        long j;
        int readableBytes = byteBuf.readableBytes();
        do {
            j = this.impliedPosition;
            if (j < 0) {
                return false;
            }
        } while (!IMPLIED_POSITION.compareAndSet(this, j, j + readableBytes));
        return true;
    }

    void pauseImplied() {
        long j;
        do {
            j = this.impliedPosition;
        } while (!IMPLIED_POSITION.compareAndSet(this, j, j | Long.MIN_VALUE));
        logger.debug("Side[{}]|Session[{}]. Paused at position[{}]", this.side, this.session, Long.valueOf(j));
    }

    void resumeImplied() {
        long j;
        long j2;
        do {
            j = this.impliedPosition;
            j2 = j & Long.MAX_VALUE;
        } while (!IMPLIED_POSITION.compareAndSet(this, j, j2));
        logger.debug("Side[{}]|Session[{}]. Resumed at position[{}]", this.side, this.session, Long.valueOf(j2));
    }

    @Override // io.rsocket.Closeable
    public Mono<Void> onClose() {
        return this.disposed.asMono();
    }

    @Override // reactor.core.Disposable
    public void dispose() {
        long markDisposed = markDisposed(this);
        if (isFinalized(markDisposed) || isDisposed(markDisposed) || isWorkInProgress(markDisposed)) {
            return;
        }
        drain((markDisposed + 1) | 4611686018427387904L);
    }

    void clearCache() {
        Queue<ByteBuf> queue = this.cachedFrames;
        this.cacheSize = 0;
        while (true) {
            ByteBuf poll = queue.poll();
            if (poll == null) {
                return;
            } else {
                poll.release();
            }
        }
    }

    @Override // reactor.core.Disposable
    public boolean isDisposed() {
        return isDisposed(this.state);
    }

    void handleFrame(ByteBuf byteBuf) {
        if (ResumableDuplexConnection.isResumableFrame(byteBuf)) {
            handleResumableFrame(byteBuf);
        } else {
            handleConnectionFrame(byteBuf);
        }
    }

    void handleTerminated(Fuseable.QueueSubscription<ByteBuf> queueSubscription, @Nullable Throwable th) {
        while (true) {
            ByteBuf poll = queueSubscription.poll();
            if (poll == null) {
                break;
            } else {
                handleFrame(poll);
            }
        }
        if (th != null) {
            this.actual.onError(th);
        } else {
            this.actual.onComplete();
        }
    }

    void handleDisposed() {
        this.actual.onError(new CancellationException("Disposed"));
    }

    void handleConnectionFrame(ByteBuf byteBuf) {
        this.actual.onNext(byteBuf);
    }

    void handleResumableFrame(ByteBuf byteBuf) {
        boolean z;
        Queue<ByteBuf> queue = this.cachedFrames;
        int readableBytes = byteBuf.readableBytes();
        int i = this.cacheLimit;
        int i2 = this.cacheSize;
        if (i != Integer.MAX_VALUE) {
            long j = i - i2;
            if (j < readableBytes) {
                long j2 = this.firstAvailableFramePosition;
                long j3 = readableBytes - j;
                int dropFramesFromCache = dropFramesFromCache(j3, queue);
                i2 -= dropFramesFromCache;
                z = ((long) dropFramesFromCache) >= j3;
                if (z) {
                    FIRST_AVAILABLE_FRAME_POSITION.lazySet(this, j2 + dropFramesFromCache);
                } else {
                    this.cacheSize = i2;
                    FIRST_AVAILABLE_FRAME_POSITION.lazySet(this, j2 + dropFramesFromCache + readableBytes);
                }
            } else {
                z = true;
            }
        } else {
            z = true;
        }
        if (z) {
            queue.offer(byteBuf);
            if (i != Integer.MAX_VALUE) {
                this.cacheSize = i2 + readableBytes;
            }
        }
        this.actual.onNext(z ? byteBuf.retainedSlice() : byteBuf);
    }

    @Override // org.reactivestreams.Subscription
    public void request(long j) {
    }

    @Override // org.reactivestreams.Subscription
    public void cancel() {
        pauseImplied();
        markDisconnected(this);
        if (logger.isDebugEnabled()) {
            logger.debug("Side[{}]|Session[{}]. Disconnected at Position[{}] and ImpliedPosition[{}]", this.side, this.session, Long.valueOf(this.firstAvailableFramePosition), Long.valueOf(frameImpliedPosition()));
        }
    }

    @Override // reactor.core.publisher.Flux, reactor.core.CorePublisher
    public void subscribe(CoreSubscriber<? super ByteBuf> coreSubscriber) {
        resumeImplied();
        coreSubscriber.onSubscribe(this);
        this.pendingActual = coreSubscriber;
        long markPendingConnection = markPendingConnection(this);
        if (isDisposed(markPendingConnection)) {
            coreSubscriber.onError(new CancellationException("Disposed"));
        } else if (isTerminated(markPendingConnection)) {
            coreSubscriber.onError(new CancellationException("Disposed"));
        } else {
            if (isWorkInProgress(markPendingConnection)) {
                return;
            }
            drain((markPendingConnection + 1) | PENDING_CONNECTION_FLAG);
        }
    }

    static long markFrameAdded(InMemoryResumableFramesStore inMemoryResumableFramesStore) {
        long j;
        long j2;
        do {
            j = inMemoryResumableFramesStore.state;
            if (isFinalized(j)) {
                return j;
            }
            j2 = j;
            if (isConnected(j) || hasPendingConnection(j) || isWorkInProgress(j)) {
                j2 = (j & MAX_WORK_IN_PROGRESS) == MAX_WORK_IN_PROGRESS ? j2 : j2 + 1;
            }
        } while (!STATE.compareAndSet(inMemoryResumableFramesStore, j, j2 | HAS_FRAME_FLAG));
        return j;
    }

    static long markPendingConnection(InMemoryResumableFramesStore inMemoryResumableFramesStore) {
        long j;
        do {
            j = inMemoryResumableFramesStore.state;
            if (isFinalized(j) || isDisposed(j) || isTerminated(j)) {
                return j;
            }
            if (isConnected(j)) {
                return j;
            }
        } while (!STATE.compareAndSet(inMemoryResumableFramesStore, j, ((j & MAX_WORK_IN_PROGRESS) == MAX_WORK_IN_PROGRESS ? j : j + 1) | PENDING_CONNECTION_FLAG));
        return j;
    }

    static long markRemoteImpliedPositionChanged(InMemoryResumableFramesStore inMemoryResumableFramesStore) {
        long j;
        do {
            j = inMemoryResumableFramesStore.state;
            if (isFinalized(j)) {
                return j;
            }
        } while (!STATE.compareAndSet(inMemoryResumableFramesStore, j, ((j & MAX_WORK_IN_PROGRESS) == MAX_WORK_IN_PROGRESS ? j : j + 1) | REMOTE_IMPLIED_POSITION_CHANGED_FLAG));
        return j;
    }

    static long markDisconnected(InMemoryResumableFramesStore inMemoryResumableFramesStore) {
        long j;
        do {
            j = inMemoryResumableFramesStore.state;
            if (isFinalized(j)) {
                return j;
            }
        } while (!STATE.compareAndSet(inMemoryResumableFramesStore, j, j & (-1152921504606846977L) & (-576460752303423489L)));
        return j;
    }

    static long markWorkDone(InMemoryResumableFramesStore inMemoryResumableFramesStore, long j) {
        long j2;
        long j3;
        do {
            j2 = inMemoryResumableFramesStore.state;
            if (j == j2 && !isFinalized(j2)) {
                j3 = j2 & (-72057594037927936L) & (-288230376151711745L);
            }
            return j2;
        } while (!STATE.compareAndSet(inMemoryResumableFramesStore, j2, j3));
        return j3;
    }

    static long markConnected(InMemoryResumableFramesStore inMemoryResumableFramesStore, long j) {
        long j2;
        long j3;
        do {
            j2 = inMemoryResumableFramesStore.state;
            if (j2 == j && !isFinalized(j2)) {
                j3 = (j2 ^ PENDING_CONNECTION_FLAG) | CONNECTED_FLAG;
            }
            return j2;
        } while (!STATE.compareAndSet(inMemoryResumableFramesStore, j2, j3));
        return j3;
    }

    static long markTerminated(InMemoryResumableFramesStore inMemoryResumableFramesStore) {
        long j;
        do {
            j = inMemoryResumableFramesStore.state;
            if (isFinalized(j)) {
                return j;
            }
        } while (!STATE.compareAndSet(inMemoryResumableFramesStore, j, ((j & MAX_WORK_IN_PROGRESS) == MAX_WORK_IN_PROGRESS ? j : j + 1) | TERMINATED_FLAG));
        return j;
    }

    static long markDisposed(InMemoryResumableFramesStore inMemoryResumableFramesStore) {
        long j;
        do {
            j = inMemoryResumableFramesStore.state;
            if (isFinalized(j)) {
                return j;
            }
        } while (!STATE.compareAndSet(inMemoryResumableFramesStore, j, ((j & MAX_WORK_IN_PROGRESS) == MAX_WORK_IN_PROGRESS ? j : j + 1) | 4611686018427387904L));
        return j;
    }

    static void clearAndFinalize(InMemoryResumableFramesStore inMemoryResumableFramesStore) {
        long j;
        Fuseable.QueueSubscription<ByteBuf> queueSubscription = inMemoryResumableFramesStore.framesSubscriber.qs;
        do {
            j = inMemoryResumableFramesStore.state;
            queueSubscription.clear();
            inMemoryResumableFramesStore.clearCache();
            if (isFinalized(j)) {
                return;
            }
        } while (!STATE.compareAndSet(inMemoryResumableFramesStore, j, j | Long.MIN_VALUE));
        inMemoryResumableFramesStore.disposed.tryEmitEmpty();
        inMemoryResumableFramesStore.framesSubscriber.onComplete();
    }

    static boolean isConnected(long j) {
        return (j & CONNECTED_FLAG) == CONNECTED_FLAG;
    }

    static boolean hasRemoteImpliedPositionChanged(long j) {
        return (j & REMOTE_IMPLIED_POSITION_CHANGED_FLAG) == REMOTE_IMPLIED_POSITION_CHANGED_FLAG;
    }

    static boolean hasPendingConnection(long j) {
        return (j & PENDING_CONNECTION_FLAG) == PENDING_CONNECTION_FLAG;
    }

    static boolean hasFrames(long j) {
        return (j & HAS_FRAME_FLAG) == HAS_FRAME_FLAG;
    }

    static boolean isTerminated(long j) {
        return (j & TERMINATED_FLAG) == TERMINATED_FLAG;
    }

    static boolean isDisposed(long j) {
        return (j & 4611686018427387904L) == 4611686018427387904L;
    }

    static boolean isFinalized(long j) {
        return (j & Long.MIN_VALUE) == Long.MIN_VALUE;
    }

    static boolean isWorkInProgress(long j) {
        return (j & MAX_WORK_IN_PROGRESS) > 0;
    }
}
