package io.rsocket.resume;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.CharsetUtil;
import io.rsocket.DuplexConnection;
import io.rsocket.exceptions.ConnectionErrorException;
import io.rsocket.exceptions.Exceptions;
import io.rsocket.exceptions.RejectedResumeException;
import io.rsocket.frame.FrameHeaderCodec;
import io.rsocket.frame.FrameType;
import io.rsocket.frame.ResumeFrameCodec;
import io.rsocket.frame.ResumeOkFrameCodec;
import io.rsocket.keepalive.KeepAliveSupport;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Function;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.util.function.Tuple2;
import reactor.util.retry.Retry;

/* loaded from: input_file:BOOT-INF/lib/rsocket-core-1.1.5.jar:io/rsocket/resume/ClientRSocketSession.class */
public class ClientRSocketSession implements RSocketSession, ResumeStateHolder, CoreSubscriber<Tuple2<ByteBuf, DuplexConnection>> {
    final ResumableDuplexConnection resumableConnection;
    final Mono<Tuple2<ByteBuf, DuplexConnection>> connectionFactory;
    final ResumableFramesStore resumableFramesStore;
    final ByteBufAllocator allocator;
    final Duration resumeSessionDuration;
    final Retry retry;
    final boolean cleanupStoreOnKeepAlive;
    final ByteBuf resumeToken;
    final String session;
    final Disposable reconnectDisposable;
    volatile Subscription s;
    KeepAliveSupport keepAliveSupport;
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) ClientRSocketSession.class);
    static final AtomicReferenceFieldUpdater<ClientRSocketSession, Subscription> S = AtomicReferenceFieldUpdater.newUpdater(ClientRSocketSession.class, Subscription.class, "s");

    public ClientRSocketSession(ByteBuf byteBuf, ResumableDuplexConnection resumableDuplexConnection, Mono<DuplexConnection> mono, Function<DuplexConnection, Mono<Tuple2<ByteBuf, DuplexConnection>>> function, ResumableFramesStore resumableFramesStore, Duration duration, Retry retry, boolean z) {
        this.resumeToken = byteBuf;
        this.session = byteBuf.toString(CharsetUtil.UTF_8);
        this.connectionFactory = mono.doOnDiscard(DuplexConnection.class, duplexConnection -> {
            duplexConnection.sendErrorAndClose(new ConnectionErrorException("resumption_server=[Session Expired]"));
            duplexConnection.receive().subscribe();
        }).flatMap(duplexConnection2 -> {
            long frameImpliedPosition = resumableFramesStore.frameImpliedPosition();
            long framePosition = resumableFramesStore.framePosition();
            duplexConnection2.sendFrame(0, ResumeFrameCodec.encode(duplexConnection2.alloc(), byteBuf.retain(), frameImpliedPosition, framePosition));
            if (logger.isDebugEnabled()) {
                logger.debug("Side[client]|Session[{}]. ResumeFrame[impliedPosition[{}], position[{}]] has been sent.", this.session, Long.valueOf(frameImpliedPosition), Long.valueOf(framePosition));
            }
            return (Mono) function.apply(duplexConnection2);
        }).doOnDiscard(Tuple2.class, this::tryReestablishSession);
        this.resumableFramesStore = resumableFramesStore;
        this.allocator = resumableDuplexConnection.alloc();
        this.resumeSessionDuration = duration;
        this.retry = retry;
        this.cleanupStoreOnKeepAlive = z;
        this.resumableConnection = resumableDuplexConnection;
        resumableDuplexConnection.onClose().doFinally(signalType -> {
            dispose();
        }).subscribe();
        this.reconnectDisposable = resumableDuplexConnection.onActiveConnectionClosed().subscribe((v1) -> {
            reconnect(v1);
        });
    }

    void reconnect(int i) {
        if (this.s == Operators.cancelledSubscription()) {
            if (logger.isDebugEnabled()) {
                logger.debug("Side[client]|Session[{}]. Connection[{}] is lost. Reconnecting rejected since session is closed", this.session, Integer.valueOf(i));
            }
        } else {
            this.keepAliveSupport.stop();
            if (logger.isDebugEnabled()) {
                logger.debug("Side[client]|Session[{}]. Connection[{}] is lost. Reconnecting to resume...", this.session, Integer.valueOf(i));
            }
            this.connectionFactory.doOnNext(this::tryReestablishSession).retryWhen(this.retry).timeout(this.resumeSessionDuration).subscribe((CoreSubscriber<? super Tuple2<ByteBuf, DuplexConnection>>) this);
        }
    }

    @Override // io.rsocket.resume.ResumeStateHolder
    public long impliedPosition() {
        return this.resumableFramesStore.frameImpliedPosition();
    }

    @Override // io.rsocket.resume.ResumeStateHolder
    public void onImpliedPosition(long j) {
        if (this.cleanupStoreOnKeepAlive) {
            try {
                this.resumableFramesStore.releaseFrames(j);
            } catch (Throwable th) {
                this.resumableConnection.sendErrorAndClose(new ConnectionErrorException(th.getMessage(), th));
            }
        }
    }

    @Override // reactor.core.Disposable
    public void dispose() {
        if (logger.isDebugEnabled()) {
            logger.debug("Side[client]|Session[{}]. Disposing", this.session);
        }
        boolean terminate = Operators.terminate(S, this);
        if (logger.isDebugEnabled()) {
            logger.debug("Side[client]|Session[{}]. Sessions[isDisposed={}]", this.session, Boolean.valueOf(terminate));
        }
        this.reconnectDisposable.dispose();
        this.resumableConnection.dispose();
        if (this.resumeToken.refCnt() > 0) {
            this.resumeToken.release();
        }
    }

    @Override // reactor.core.Disposable
    public boolean isDisposed() {
        return this.resumableConnection.isDisposed();
    }

    void tryReestablishSession(Tuple2<ByteBuf, DuplexConnection> tuple2) {
        if (logger.isDebugEnabled()) {
            logger.debug("Active subscription is canceled {}", Boolean.valueOf(this.s == Operators.cancelledSubscription()));
        }
        ByteBuf t1 = tuple2.getT1();
        DuplexConnection t2 = tuple2.getT2();
        if (FrameHeaderCodec.streamId(t1) != 0) {
            if (logger.isDebugEnabled()) {
                logger.debug("Side[client]|Session[{}]. Illegal first frame received. RESUME_OK frame must be received before any others. Terminating received connection", this.session);
            }
            ConnectionErrorException connectionErrorException = new ConnectionErrorException("RESUME_OK frame must be received before any others");
            this.resumableConnection.dispose(t2, connectionErrorException);
            t2.sendErrorAndClose(connectionErrorException);
            t2.receive().subscribe();
            throw connectionErrorException;
        }
        FrameType nativeFrameType = FrameHeaderCodec.nativeFrameType(t1);
        if (nativeFrameType != FrameType.RESUME_OK) {
            if (nativeFrameType != FrameType.ERROR) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Side[client]|Session[{}]. Illegal first frame received. RESUME_OK frame must be received before any others. Terminating received connection", this.session);
                }
                ConnectionErrorException connectionErrorException2 = new ConnectionErrorException("RESUME_OK frame must be received before any others");
                this.resumableConnection.dispose(t2, connectionErrorException2);
                t2.sendErrorAndClose(connectionErrorException2);
                t2.receive().subscribe();
                return;
            }
            RuntimeException from = Exceptions.from(0, t1);
            if (logger.isDebugEnabled()) {
                logger.debug("Side[client]|Session[{}]. Received error frame. Terminating received connection", this.session, from);
            }
            if (!(from instanceof RejectedResumeException)) {
                t2.dispose();
                t2.receive().subscribe();
                throw from;
            }
            this.resumableConnection.dispose(t2, from);
            t2.dispose();
            t2.receive().subscribe();
            return;
        }
        long lastReceivedClientPos = ResumeOkFrameCodec.lastReceivedClientPos(t1);
        long framePosition = this.resumableFramesStore.framePosition();
        long frameImpliedPosition = this.resumableFramesStore.frameImpliedPosition();
        if (logger.isDebugEnabled()) {
            logger.debug("Side[client]|Session[{}]. ResumeOK FRAME received. ServerResumeState[remoteImpliedPosition[{}]]. ClientResumeState[impliedPosition[{}], position[{}]]", this.session, Long.valueOf(lastReceivedClientPos), Long.valueOf(frameImpliedPosition), Long.valueOf(framePosition));
        }
        if (framePosition > lastReceivedClientPos) {
            if (logger.isDebugEnabled()) {
                logger.debug("Side[client]|Session[{}]. Mismatching remote and local state. Expected RemoteImpliedPosition[{}] to be greater or equal to the LocalPosition[{}]. Terminating received connection", this.session, Long.valueOf(lastReceivedClientPos), Long.valueOf(framePosition));
            }
            ConnectionErrorException connectionErrorException3 = new ConnectionErrorException("resumption_server_pos=[" + lastReceivedClientPos + "]");
            this.resumableConnection.dispose(t2, connectionErrorException3);
            t2.sendErrorAndClose(connectionErrorException3);
            t2.receive().subscribe();
            return;
        }
        if (framePosition != lastReceivedClientPos) {
            try {
                this.resumableFramesStore.releaseFrames(lastReceivedClientPos);
            } catch (IllegalStateException e) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Side[client]|Session[{}]. Exception occurred while releasing frames in the frameStore", this.session, e);
                }
                ConnectionErrorException connectionErrorException4 = new ConnectionErrorException(e.getMessage(), e);
                this.resumableConnection.dispose(t2, connectionErrorException4);
                t2.sendErrorAndClose(connectionErrorException4);
                t2.receive().subscribe();
                return;
            }
        }
        if (!tryCancelSessionTimeout()) {
            if (logger.isDebugEnabled()) {
                logger.debug("Side[client]|Session[{}]. Session has already been expired. Terminating received connection", this.session);
            }
            t2.sendErrorAndClose(new ConnectionErrorException("resumption_server=[Session Expired]"));
            t2.receive().subscribe();
            return;
        }
        this.keepAliveSupport.start();
        if (logger.isDebugEnabled()) {
            logger.debug("Side[client]|Session[{}]. Session has been resumed successfully", this.session);
        }
        if (this.resumableConnection.connect(t2)) {
            return;
        }
        if (logger.isDebugEnabled()) {
            logger.debug("Side[client]|Session[{}]. Session has already been expired. Terminating received connection", this.session);
        }
        t2.sendErrorAndClose(new ConnectionErrorException("resumption_server_pos=[Session Expired]"));
        t2.receive().subscribe();
    }

    boolean tryCancelSessionTimeout() {
        Subscription subscription;
        do {
            subscription = this.s;
            if (subscription == Operators.cancelledSubscription()) {
                return false;
            }
        } while (!S.compareAndSet(this, subscription, null));
        subscription.cancel();
        return true;
    }

    @Override // reactor.core.CoreSubscriber, org.reactivestreams.Subscriber
    public void onSubscribe(Subscription subscription) {
        if (Operators.setOnce(S, this, subscription)) {
            subscription.request(Long.MAX_VALUE);
        }
    }

    @Override // org.reactivestreams.Subscriber
    public void onNext(Tuple2<ByteBuf, DuplexConnection> tuple2) {
    }

    @Override // org.reactivestreams.Subscriber
    public void onError(Throwable th) {
        if (!Operators.terminate(S, this)) {
            Operators.onErrorDropped(th, currentContext());
        }
        this.resumableConnection.dispose();
    }

    @Override // org.reactivestreams.Subscriber
    public void onComplete() {
    }

    @Override // io.rsocket.resume.RSocketSession
    public void setKeepAliveSupport(KeepAliveSupport keepAliveSupport) {
        this.keepAliveSupport = keepAliveSupport;
    }
}
