package org.redisson.reactive;

import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.redisson.api.RReliableTopic;
import reactor.core.publisher.Flux;

/* loaded from: input_file:BOOT-INF/lib/redisson-3.18.0.jar:org/redisson/reactive/RedissonReliableTopicReactive.class */
public class RedissonReliableTopicReactive {
    private final RReliableTopic topic;

    public RedissonReliableTopicReactive(RReliableTopic rReliableTopic) {
        this.topic = rReliableTopic;
    }

    public <M> Flux<M> getMessages(Class<M> cls) {
        return Flux.create(fluxSink -> {
            fluxSink.onRequest(j -> {
                AtomicLong atomicLong = new AtomicLong(j);
                AtomicReference atomicReference = new AtomicReference();
                this.topic.addListenerAsync(cls, (charSequence, obj) -> {
                    fluxSink.next(obj);
                    if (atomicLong.decrementAndGet() == 0) {
                        this.topic.removeListenerAsync((String) atomicReference.get());
                        fluxSink.complete();
                    }
                }).whenComplete((str, th) -> {
                    if (th != null) {
                        fluxSink.error(th);
                    } else {
                        atomicReference.set(str);
                        fluxSink.onDispose(() -> {
                            this.topic.removeListenerAsync(str);
                        });
                    }
                });
            });
        });
    }
}
