package com.github.eupedroosouza.channels.channel;

import com.github.eupedroosouza.channels.listener.OneChannelRedisPubSubListener;
import com.github.eupedroosouza.channels.listener.ReceiveListener;
import com.github.eupedroosouza.channels.util.Builder;
import io.lettuce.core.RedisClient;
import io.lettuce.core.codec.ByteArrayCodec;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.codec.StringCodec;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import java.io.Closeable;
import java.io.IOException;
import java.util.HashMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/github/eupedroosouza/channels/channel/SubChannel.class */
public class SubChannel<T> implements Closeable {
    private final RedisClient client;
    private final RedisCodec<String, T> codec;

    @Nullable
    private final Executor executor;
    private StatefulRedisPubSubConnection<String, T> connection;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final Logger logger = LoggerFactory.getLogger(SubChannel.class);
    private final HashMap<String, ReceiveListener<T>> channels = new HashMap<>();

    /* loaded from: input_file:com/github/eupedroosouza/channels/channel/SubChannel$SubChannelBuilder.class */
    public static class SubChannelBuilder<T> implements Builder<SubChannel<T>> {
        private RedisClient client;
        private RedisCodec<?, T> codec;

        @Nullable
        private Executor executor;
        static final /* synthetic */ boolean $assertionsDisabled;

        public SubChannelBuilder<T> client(@NotNull RedisClient redisClient) {
            this.client = redisClient;
            return this;
        }

        public RedisClient client() {
            return this.client;
        }

        public SubChannelBuilder<T> codec(@NotNull RedisCodec<?, T> redisCodec) {
            this.codec = redisCodec;
            return this;
        }

        public RedisCodec<?, T> codec() {
            return this.codec;
        }

        public SubChannelBuilder<T> executor(@NotNull Executor executor) {
            this.executor = executor;
            return this;
        }

        @Nullable
        public Executor executor() {
            return this.executor;
        }

        @Override // com.github.eupedroosouza.channels.util.Builder
        public SubChannel<T> build() {
            if (!$assertionsDisabled && this.client == null) {
                throw new AssertionError("client is null");
            }
            if (this.codec == null) {
                this.codec = ByteArrayCodec.INSTANCE;
            }
            return new SubChannel<>(this);
        }

        static {
            $assertionsDisabled = !SubChannel.class.desiredAssertionStatus();
        }
    }

    public static <E> SubChannelBuilder<E> builder() {
        return new SubChannelBuilder<>();
    }

    public SubChannel(SubChannelBuilder<T> subChannelBuilder) {
        this.client = subChannelBuilder.client();
        this.codec = RedisCodec.of(StringCodec.UTF8, subChannelBuilder.codec());
        this.executor = subChannelBuilder.executor();
    }

    public void connect() {
        this.connection = this.client.connectPubSub(this.codec);
        this.connection.addListener(new OneChannelRedisPubSubListener<String, T>() { // from class: com.github.eupedroosouza.channels.channel.SubChannel.1
            public void message(String str, T t) {
                if (!SubChannel.this.channels.containsKey(str)) {
                    SubChannel.this.logger.warn("Received unknown channel: {}", str);
                    return;
                }
                try {
                    SubChannel.this.runAsync(() -> {
                        ((ReceiveListener) SubChannel.this.channels.get(str)).onReceive(str, t);
                    });
                } catch (Exception e) {
                    SubChannel.this.logger.error("Error while receiving channel [{}]", str, e);
                }
            }

            public void subscribed(String str, long j) {
                SubChannel.this.logger.debug("Subscribed to channel [{}] with [{}] subscribed channels ", str, Long.valueOf(j));
            }

            public void unsubscribed(String str, long j) {
                SubChannel.this.logger.debug("Unsubscribe to channel [{}] with [{}] subscribed channels ", str, Long.valueOf(j));
            }

            public /* bridge */ /* synthetic */ void message(Object obj, Object obj2) {
                message((String) obj, (String) obj2);
            }
        });
    }

    private void unsubscribe() {
        if (!$assertionsDisabled && this.connection == null) {
            throw new AssertionError("connection is null");
        }
        String[] strArr = (String[]) this.channels.keySet().toArray(new String[0]);
        if (strArr.length > 0) {
            this.logger.debug("Unsubscribing channels...");
            this.connection.sync().unsubscribe(strArr);
        }
    }

    private void subscribe() {
        if (!$assertionsDisabled && this.connection == null) {
            throw new AssertionError("connection is null");
        }
        String[] strArr = (String[]) this.channels.keySet().toArray(new String[0]);
        if (strArr.length > 0) {
            this.logger.debug("Subscribing channels...");
            this.connection.sync().subscribe(strArr);
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        unsubscribe();
        this.channels.clear();
        if (this.connection != null) {
            this.connection.close();
        }
    }

    public void sub(String str, ReceiveListener<T> receiveListener) {
        if (!$assertionsDisabled && this.channels.containsKey(str)) {
            throw new AssertionError("channel " + str + " already exists");
        }
        unsubscribe();
        this.channels.put(str, receiveListener);
        subscribe();
        this.logger.info("Subscribed channel [{}].", str);
    }

    public void unsub(String str) {
        if (this.channels.containsKey(str)) {
            unsubscribe();
            this.channels.remove(str);
            subscribe();
            this.logger.info("Unsubscribed channel [{}].", str);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public CompletableFuture<Void> runAsync(Runnable runnable) {
        return this.executor != null ? CompletableFuture.runAsync(runnable, this.executor) : CompletableFuture.runAsync(runnable);
    }

    static {
        $assertionsDisabled = !SubChannel.class.desiredAssertionStatus();
    }
}
