package fs2.interop.flow;

import cats.effect.kernel.Async;
import cats.effect.kernel.Resource;
import cats.effect.kernel.Resource$ExitCase$;
import cats.effect.kernel.Resource$ExitCase$Canceled$;
import cats.effect.kernel.Resource$ExitCase$Errored$;
import cats.effect.kernel.Resource$ExitCase$Succeeded$;
import cats.syntax.EitherObjectOps$;
import cats.syntax.package$all$;
import fs2.Chunk;
import fs2.Pull;
import fs2.Pull$;
import fs2.Pull$StreamPullOps$;
import fs2.Stream;
import fs2.Stream$;
import fs2.Stream$InvariantOps$;
import fs2.Stream$ToPull$;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Some;
import scala.Some$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.Nothing$;
import scala.util.Either;

/* compiled from: StreamSubscription.scala */
/* loaded from: input_file:fs2/interop/flow/StreamSubscription.class */
public final class StreamSubscription<F, A> implements Flow.Subscription {
    private final Stream<F, A> stream;
    private final Flow.Subscriber<A> subscriber;
    private final AtomicLong requests;
    private final AtomicReference<Function0<BoxedUnit>> resume;
    private final AtomicReference<Function0<BoxedUnit>> canceled;
    private final Async<F> F;

    public static <F, A> StreamSubscription<F, A> apply(Stream<F, A> stream, Flow.Subscriber<A> subscriber, Async<F> async) {
        return StreamSubscription$.MODULE$.apply(stream, subscriber, async);
    }

    public static <F, A> Stream<F, Nothing$> subscribe(Stream<F, A> stream, Flow.Subscriber<A> subscriber, Async<F> async) {
        return StreamSubscription$.MODULE$.subscribe(stream, subscriber, async);
    }

    public StreamSubscription(Stream<F, A> stream, Flow.Subscriber<A> subscriber, AtomicLong atomicLong, AtomicReference<Function0<BoxedUnit>> atomicReference, AtomicReference<Function0<BoxedUnit>> atomicReference2, Async<F> async) {
        this.stream = stream;
        this.subscriber = subscriber;
        this.requests = atomicLong;
        this.resume = atomicReference;
        this.canceled = atomicReference2;
        this.F = async;
    }

    private void onError(Throwable th) {
        cancel();
        this.subscriber.onError(th);
    }

    private void onComplete() {
        cancel();
        this.subscriber.onComplete();
    }

    public Stream<F, Nothing$> run() {
        return this.stream.through(stream -> {
            return Pull$StreamPullOps$.MODULE$.stream$extension(Pull$.MODULE$.StreamPullOps(go$1(stream)));
        }).chunks().foreach(chunk -> {
            return this.F.delay(() -> {
                $anonfun$2$$anonfun$1(chunk);
                return BoxedUnit.UNIT;
            });
        }).onFinalizeCase(exitCase -> {
            cats.effect.package$.MODULE$.Resource();
            Resource$ExitCase$ resource$ExitCase$ = Resource$ExitCase$.MODULE$;
            if (Resource$ExitCase$Succeeded$.MODULE$.equals(exitCase)) {
                return this.F.delay(() -> {
                    $anonfun$3$$anonfun$1();
                    return BoxedUnit.UNIT;
                });
            }
            if (exitCase instanceof Resource.ExitCase.Errored) {
                cats.effect.package$.MODULE$.Resource();
                Resource$ExitCase$ resource$ExitCase$2 = Resource$ExitCase$.MODULE$;
                Throwable _1 = Resource$ExitCase$Errored$.MODULE$.unapply((Resource.ExitCase.Errored) exitCase)._1();
                return this.F.delay(() -> {
                    $anonfun$3$$anonfun$2(_1);
                    return BoxedUnit.UNIT;
                });
            }
            cats.effect.package$.MODULE$.Resource();
            Resource$ExitCase$ resource$ExitCase$3 = Resource$ExitCase$.MODULE$;
            if (Resource$ExitCase$Canceled$.MODULE$.equals(exitCase)) {
                return this.F.unit();
            }
            throw new MatchError(exitCase);
        }, this.F).mask().mergeHaltBoth(Stream$.MODULE$.exec(this.F.asyncCheckAttempt(function1 -> {
            return this.F.delay(() -> {
                return r1.$anonfun$4$$anonfun$1(r2);
            });
        })), this.F).onFinalizeCase(exitCase2 -> {
            cats.effect.package$.MODULE$.Resource();
            Resource$ExitCase$ resource$ExitCase$ = Resource$ExitCase$.MODULE$;
            return Resource$ExitCase$Canceled$.MODULE$.equals(exitCase2) ? this.F.delay(() -> {
                run$$anonfun$1$$anonfun$1();
                return BoxedUnit.UNIT;
            }) : this.F.unit();
        }, this.F);
    }

    @Override // java.util.concurrent.Flow.Subscription
    public final void cancel() {
        Function0<BoxedUnit> andSet = this.canceled.getAndSet(null);
        if (andSet != null) {
            andSet.apply$mcV$sp();
        }
    }

    @Override // java.util.concurrent.Flow.Subscription
    public final void request(long j) {
        if (this.canceled.get() != null) {
            if (j <= 0) {
                onError(new IllegalArgumentException(new StringBuilder(29).append("Invalid number of elements [").append(j).append("]").toString()));
            } else {
                this.requests.updateAndGet(j2 -> {
                    long j2 = j2 + j;
                    if (j2 < 0) {
                        return Long.MAX_VALUE;
                    }
                    return j2;
                });
                this.resume.getAndSet(StreamSubscription$.fs2$interop$flow$StreamSubscription$$$Sentinel).apply$mcV$sp();
            }
        }
    }

    private final long go$1$$anonfun$1() {
        return this.requests.getAndSet(0L);
    }

    private final Either go$1$$anonfun$2$$anonfun$1$$anonfun$1(Function1 function1) {
        this.resume.set(() -> {
            function1.apply(EitherObjectOps$.MODULE$.unit$extension(package$all$.MODULE$.catsSyntaxEitherObject(scala.package$.MODULE$.Either())));
        });
        if (this.requests.get() <= 0) {
            return scala.package$.MODULE$.Left().apply(Some$.MODULE$.apply(this.F.unit()));
        }
        this.resume.set(StreamSubscription$.fs2$interop$flow$StreamSubscription$$$Sentinel);
        return EitherObjectOps$.MODULE$.unit$extension(package$all$.MODULE$.catsSyntaxEitherObject(scala.package$.MODULE$.Either()));
    }

    private final Pull go$1$$anonfun$2$$anonfun$2(Stream stream) {
        return go$1(stream);
    }

    private final /* synthetic */ Pull go$1$$anonfun$2(Stream stream, long j) {
        if (j == Long.MAX_VALUE) {
            return Stream$ToPull$.MODULE$.echo$extension(new Stream.ToPull(Stream$InvariantOps$.MODULE$.pull$extension(Stream$.MODULE$.InvariantOps(stream))).fs2$Stream$ToPull$$self());
        }
        if (j == 0) {
            return Pull$.MODULE$.eval(this.F.asyncCheckAttempt(function1 -> {
                return this.F.delay(() -> {
                    return r1.go$1$$anonfun$2$$anonfun$1$$anonfun$1(r2);
                });
            })).$greater$greater(() -> {
                return r1.go$1$$anonfun$2$$anonfun$2(r2);
            });
        }
        return Stream$ToPull$.MODULE$.take$extension(new Stream.ToPull(Stream$InvariantOps$.MODULE$.pull$extension(Stream$.MODULE$.InvariantOps(stream))).fs2$Stream$ToPull$$self(), j).flatMap(option -> {
            if (None$.MODULE$.equals(option)) {
                return Pull$.MODULE$.done();
            }
            if (option instanceof Some) {
                return go$1((Stream) ((Some) option).value());
            }
            throw new MatchError(option);
        });
    }

    private final Pull go$1(Stream stream) {
        return Pull$.MODULE$.eval(this.F.delay(this::go$1$$anonfun$1)).flatMap(obj -> {
            return go$1$$anonfun$2(stream, BoxesRunTime.unboxToLong(obj));
        });
    }

    private final void $anonfun$2$$anonfun$1(Chunk chunk) {
        chunk.foreach(obj -> {
            this.subscriber.onNext(obj);
        });
    }

    private final void $anonfun$3$$anonfun$1() {
        onComplete();
    }

    private final void $anonfun$3$$anonfun$2(Throwable th) {
        onError(th);
    }

    private final Either $anonfun$4$$anonfun$1(Function1 function1) {
        return !this.canceled.compareAndSet(StreamSubscription$.fs2$interop$flow$StreamSubscription$$$Sentinel, () -> {
            function1.apply(EitherObjectOps$.MODULE$.unit$extension(package$all$.MODULE$.catsSyntaxEitherObject(scala.package$.MODULE$.Either())));
        }) ? EitherObjectOps$.MODULE$.unit$extension(package$all$.MODULE$.catsSyntaxEitherObject(scala.package$.MODULE$.Either())) : scala.package$.MODULE$.Left().apply(Some$.MODULE$.apply(this.F.unit()));
    }

    private final void run$$anonfun$1$$anonfun$1() {
        onError(new CancellationException("StreamSubscription.run was canceled"));
    }
}
