package dev.profunktor.redis4cats.pubsub.internals;

import cats.Applicative;
import cats.effect.kernel.Async;
import cats.effect.kernel.MonadCancel;
import cats.effect.kernel.Sync$;
import cats.effect.kernel.implicits$;
import cats.effect.kernel.syntax.MonadCancelOps_$;
import cats.effect.std.AtomicCell;
import cats.effect.std.Dispatcher;
import cats.effect.std.Dispatcher$;
import cats.syntax.ApplicativeIdOps$;
import cats.syntax.ApplyOps$;
import cats.syntax.package$all$;
import dev.profunktor.redis4cats.effect.Log;
import dev.profunktor.redis4cats.effect.Log$;
import fs2.Stream;
import fs2.Stream$;
import fs2.concurrent.Topic;
import fs2.concurrent.Topic$;
import io.lettuce.core.pubsub.RedisPubSubListener;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import scala.Function2;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.Tuple3;
import scala.Tuple4;
import scala.collection.immutable.Map;

/* compiled from: Subscriber.scala */
/* loaded from: input_file:dev/profunktor/redis4cats/pubsub/internals/Subscriber$.class */
public final class Subscriber$ {
    public static Subscriber$ MODULE$;

    static {
        new Subscriber$();
    }

    private <F, K, V> F onStreamTermination(AtomicCell<F, Map<K, Redis4CatsSubscription<F, V>>> atomicCell, K k, Applicative<F> applicative, Log<F> log) {
        return (F) atomicCell.evalUpdate(map -> {
            Some some = map.get(k);
            if (None$.MODULE$.equals(some)) {
                return package$all$.MODULE$.toFunctorOps(Log$.MODULE$.apply(log).error(() -> {
                    return new StringBuilder(109).append("We were notified about stream termination for ").append(k).append(" but we don't have a subscription, ").append("this is a bug in redis4cats!").toString();
                }), applicative).as(map);
            }
            if (!(some instanceof Some)) {
                throw new MatchError(some);
            }
            Redis4CatsSubscription redis4CatsSubscription = (Redis4CatsSubscription) some.value();
            return !redis4CatsSubscription.isLastSubscriber() ? ApplicativeIdOps$.MODULE$.pure$extension(package$all$.MODULE$.catsSyntaxApplicativeId(map.updated(k, redis4CatsSubscription.removeSubscriber())), applicative) : package$all$.MODULE$.toFunctorOps(redis4CatsSubscription.cleanup(), applicative).as(map.$minus(k));
        });
    }

    public <F, K, V> F dev$profunktor$redis4cats$pubsub$internals$Subscriber$$unsubscribeFrom(K k, AtomicCell<F, Map<K, Redis4CatsSubscription<F, V>>> atomicCell, MonadCancel<F, Throwable> monadCancel, Log<F> log) {
        return (F) atomicCell.evalUpdate(map -> {
            Some some = map.get(k);
            if (None$.MODULE$.equals(some)) {
                return package$all$.MODULE$.toFunctorOps(Log$.MODULE$.apply(log).debug(() -> {
                    return new StringBuilder(60).append("Not unsubscribing from ").append(k).append(" because we don't have a subscription").toString();
                }), monadCancel).as(map);
            }
            if (!(some instanceof Some)) {
                throw new MatchError(some);
            }
            Redis4CatsSubscription redis4CatsSubscription = (Redis4CatsSubscription) some.value();
            return package$all$.MODULE$.toFunctorOps(MonadCancelOps_$.MODULE$.uncancelable$extension(implicits$.MODULE$.monadCancelOps_(ApplyOps$.MODULE$.$times$greater$extension(package$all$.MODULE$.catsSyntaxApplyOps(Log$.MODULE$.apply(log).info(() -> {
                return new StringBuilder(37).append("Unsubscribing from ").append(k).append(" with ").append(redis4CatsSubscription.subscribers()).append(" subscribers").toString();
            })), redis4CatsSubscription.topic().publish1(None$.MODULE$), monadCancel)), monadCancel), monadCancel).as(map);
        });
    }

    public <F, TypedKey, SubValue, K, V> Stream<F, SubValue> dev$profunktor$redis4cats$pubsub$internals$Subscriber$$subscribe(TypedKey typedkey, AtomicCell<F, Map<TypedKey, Redis4CatsSubscription<F, SubValue>>> atomicCell, StatefulRedisPubSubConnection<K, V> statefulRedisPubSubConnection, F f, F f2, Function2<Dispatcher<F>, Topic<F, Option<SubValue>>, RedisPubSubListener<K, V>> function2, Async<F> async, Log<F> log) {
        return Stream$.MODULE$.eval(atomicCell.evalModify(map -> {
            Some some = map.get(typedkey);
            if (some instanceof Some) {
                Redis4CatsSubscription redis4CatsSubscription = (Redis4CatsSubscription) some.value();
                Redis4CatsSubscription addSubscriber = redis4CatsSubscription.addSubscriber();
                return package$all$.MODULE$.toFunctorOps(Log$.MODULE$.apply(log).debug(() -> {
                    return new StringBuilder(55).append("Returning existing subscription for ").append(typedkey).append(", ").append("subscribers: ").append(redis4CatsSubscription.subscribers()).append(" -> ").append(addSubscriber.subscribers()).toString();
                }), async).as(new Tuple2(map.updated(typedkey, addSubscriber), stream$1(addSubscriber, atomicCell, typedkey, async, log)));
            }
            if (!None$.MODULE$.equals(some)) {
                throw new MatchError(some);
            }
            return MonadCancelOps_$.MODULE$.uncancelable$extension(implicits$.MODULE$.monadCancelOps_(package$all$.MODULE$.toFlatMapOps(Log$.MODULE$.apply(log).info(() -> {
                return new StringBuilder(26).append("Creating subscription for ").append(typedkey).toString();
            }), async).flatMap(boxedUnit -> {
                return package$all$.MODULE$.toFlatMapOps(package$all$.MODULE$.toFunctorOps(Dispatcher$.MODULE$.parallel(async).allocated(async), async).map(tuple2 -> {
                    if (tuple2 == null) {
                        throw new MatchError(tuple2);
                    }
                    Tuple3 tuple3 = new Tuple3(tuple2, (Dispatcher) tuple2._1(), tuple2._2());
                    Tuple2 tuple2 = (Tuple2) tuple3._1();
                    tuple3._3();
                    return new Tuple2(tuple2, tuple2);
                }), async).flatMap(tuple22 -> {
                    Tuple2 tuple22;
                    if (tuple22 == null || (tuple22 = (Tuple2) tuple22._2()) == null) {
                        throw new MatchError(tuple22);
                    }
                    Dispatcher dispatcher = (Dispatcher) tuple22._1();
                    Object _2 = tuple22._2();
                    return package$all$.MODULE$.toFlatMapOps(package$all$.MODULE$.toFunctorOps(Topic$.MODULE$.apply(async), async).map(topic -> {
                        RedisPubSubListener redisPubSubListener = (RedisPubSubListener) function2.apply(dispatcher, topic);
                        Object delay = Sync$.MODULE$.apply(async).delay(() -> {
                            statefulRedisPubSubConnection.removeListener(redisPubSubListener);
                        });
                        return new Tuple4(topic, redisPubSubListener, delay, MonadCancelOps_$.MODULE$.uncancelable$extension(implicits$.MODULE$.monadCancelOps_(ApplyOps$.MODULE$.$times$greater$extension(package$all$.MODULE$.catsSyntaxApplyOps(ApplyOps$.MODULE$.$times$greater$extension(package$all$.MODULE$.catsSyntaxApplyOps(ApplyOps$.MODULE$.$times$greater$extension(package$all$.MODULE$.catsSyntaxApplyOps(ApplyOps$.MODULE$.$times$greater$extension(package$all$.MODULE$.catsSyntaxApplyOps(Log$.MODULE$.apply(log).debug(() -> {
                            return new StringBuilder(39).append("Cleaning up resources for ").append(typedkey).append(" subscription").toString();
                        })), f2, async)), delay, async)), _2, async)), Log$.MODULE$.apply(log).debug(() -> {
                            return new StringBuilder(38).append("Cleaned up resources for ").append(typedkey).append(" subscription").toString();
                        }), async)), async));
                    }), async).flatMap(tuple4 -> {
                        if (tuple4 == null) {
                            throw new MatchError(tuple4);
                        }
                        Topic topic2 = (Topic) tuple4._1();
                        RedisPubSubListener redisPubSubListener = (RedisPubSubListener) tuple4._2();
                        Object _4 = tuple4._4();
                        return package$all$.MODULE$.toFlatMapOps(Sync$.MODULE$.apply(async).delay(() -> {
                            statefulRedisPubSubConnection.addListener(redisPubSubListener);
                        }), async).flatMap(boxedUnit -> {
                            return package$all$.MODULE$.toFlatMapOps(package$all$.MODULE$.toFunctorOps(f, async).map(boxedUnit -> {
                                Redis4CatsSubscription redis4CatsSubscription2 = new Redis4CatsSubscription(topic2, 1L, _4);
                                return new Tuple3(boxedUnit, redis4CatsSubscription2, map.updated(typedkey, redis4CatsSubscription2));
                            }), async).flatMap(tuple3 -> {
                                if (tuple3 == null) {
                                    throw new MatchError(tuple3);
                                }
                                Redis4CatsSubscription redis4CatsSubscription2 = (Redis4CatsSubscription) tuple3._2();
                                Map map = (Map) tuple3._3();
                                return package$all$.MODULE$.toFunctorOps(Log$.MODULE$.apply(log).debug(() -> {
                                    return new StringBuilder(25).append("Created subscription for ").append(typedkey).toString();
                                }), async).map(boxedUnit2 -> {
                                    return new Tuple2(map, stream$1(redis4CatsSubscription2, atomicCell, typedkey, async, log));
                                });
                            });
                        });
                    });
                });
            })), async);
        })).flatten(Predef$.MODULE$.$conforms());
    }

    private static final Stream stream$1(Redis4CatsSubscription redis4CatsSubscription, AtomicCell atomicCell, Object obj, Async async, Log log) {
        return redis4CatsSubscription.stream(MODULE$.onStreamTermination(atomicCell, obj, async, log), async);
    }

    private Subscriber$() {
        MODULE$ = this;
    }
}
