package org.drasyl.handler.pubsub;

import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.util.concurrent.Promise;
import io.netty.util.concurrent.PromiseNotifier;
import java.time.Duration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.drasyl.channel.OverlayAddressedMessage;
import org.drasyl.identity.DrasylAddress;
import org.drasyl.util.Pair;
import org.drasyl.util.Preconditions;
import org.drasyl.util.logging.Logger;
import org.drasyl.util.logging.LoggerFactory;

/* loaded from: input_file:org/drasyl/handler/pubsub/PubSubSubscribeHandler.class */
public class PubSubSubscribeHandler extends ChannelDuplexHandler {
    public static final Duration DEFAULT_SUBSCRIBE_TIMEOUT = Duration.ofMillis(5000);
    private static final Logger LOG = LoggerFactory.getLogger(PubSubSubscribeHandler.class);
    private final Duration subscribeTimeout;
    private final Map<UUID, Pair<Promise<Void>, String>> requests;
    private final DrasylAddress broker;
    private final Set<String> subscriptions;

    PubSubSubscribeHandler(Duration duration, Map<UUID, Pair<Promise<Void>, String>> map, DrasylAddress drasylAddress, Set<String> set) {
        this.subscribeTimeout = Preconditions.requireNonNegative(duration);
        this.requests = (Map) Objects.requireNonNull(map);
        this.broker = (DrasylAddress) Objects.requireNonNull(drasylAddress);
        this.subscriptions = (Set) Objects.requireNonNull(set);
    }

    public PubSubSubscribeHandler(Duration duration, DrasylAddress drasylAddress) {
        this(duration, new HashMap(), drasylAddress, new HashSet());
    }

    public PubSubSubscribeHandler(DrasylAddress drasylAddress) {
        this(DEFAULT_SUBSCRIBE_TIMEOUT, drasylAddress);
    }

    public void channelInactive(ChannelHandlerContext channelHandlerContext) {
        unsubscribeFromAll(channelHandlerContext);
        channelHandlerContext.fireChannelInactive();
    }

    public void write(ChannelHandlerContext channelHandlerContext, Object obj, ChannelPromise channelPromise) {
        if (obj instanceof PubSubSubscribe) {
            doSubscribe(channelHandlerContext, (PubSubSubscribe) obj, channelPromise);
        } else if (obj instanceof PubSubUnsubscribe) {
            doUnsubscribe(channelHandlerContext, (PubSubUnsubscribe) obj, channelPromise);
        } else {
            channelHandlerContext.write(obj, channelPromise);
        }
    }

    private void doSubscribe(ChannelHandlerContext channelHandlerContext, PubSubSubscribe pubSubSubscribe, ChannelPromise channelPromise) {
        LOG.trace("Send `{}` to broker `{}`.", pubSubSubscribe, this.broker);
        channelHandlerContext.write(new OverlayAddressedMessage(pubSubSubscribe, this.broker)).addListener(future -> {
            if (this.subscribeTimeout.isZero() || !future.isSuccess()) {
                PromiseNotifier.cascade(future, channelPromise);
                return;
            }
            this.requests.put(pubSubSubscribe.getId(), Pair.of(channelPromise, pubSubSubscribe.getTopic()));
            channelPromise.addListener(future -> {
                this.requests.remove(pubSubSubscribe.getId());
            });
            channelHandlerContext.executor().schedule(() -> {
                return Boolean.valueOf(channelPromise.tryFailure(new Exception("Got no confirmation from broker within " + this.subscribeTimeout.toMillis() + "ms.")));
            }, this.subscribeTimeout.toMillis(), TimeUnit.MILLISECONDS);
        });
    }

    private void doUnsubscribe(ChannelHandlerContext channelHandlerContext, PubSubUnsubscribe pubSubUnsubscribe, ChannelPromise channelPromise) {
        LOG.trace("Send `{}` to broker `{}`.", pubSubUnsubscribe, this.broker);
        channelHandlerContext.write(new OverlayAddressedMessage(pubSubUnsubscribe, this.broker)).addListener(future -> {
            if (this.subscribeTimeout.isZero() || !future.isSuccess()) {
                PromiseNotifier.cascade(future, channelPromise);
                return;
            }
            this.requests.put(pubSubUnsubscribe.getId(), Pair.of(channelPromise, pubSubUnsubscribe.getTopic()));
            channelPromise.addListener(future -> {
                this.requests.remove(pubSubUnsubscribe.getId());
            });
            channelHandlerContext.executor().schedule(() -> {
                return Boolean.valueOf(channelPromise.tryFailure(new Exception("Got no confirmation from broker within " + this.subscribeTimeout.toMillis() + "ms.")));
            }, this.subscribeTimeout.toMillis(), TimeUnit.MILLISECONDS);
        });
    }

    public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
        if ((obj instanceof OverlayAddressedMessage) && (((OverlayAddressedMessage) obj).content() instanceof PubSubSubscribed) && this.broker.equals(((OverlayAddressedMessage) obj).sender())) {
            handleSubscribed((PubSubSubscribed) ((OverlayAddressedMessage) obj).content());
            return;
        }
        if ((obj instanceof OverlayAddressedMessage) && (((OverlayAddressedMessage) obj).content() instanceof PubSubUnsubscribed) && this.broker.equals(((OverlayAddressedMessage) obj).sender())) {
            handleUnsubscribed((PubSubUnsubscribed) ((OverlayAddressedMessage) obj).content());
            return;
        }
        if ((obj instanceof OverlayAddressedMessage) && (((OverlayAddressedMessage) obj).content() instanceof PubSubPublish) && this.broker.equals(((OverlayAddressedMessage) obj).sender())) {
            handlePublish(channelHandlerContext, (PubSubPublish) ((OverlayAddressedMessage) obj).content());
        } else if ((obj instanceof OverlayAddressedMessage) && (((OverlayAddressedMessage) obj).content() instanceof PubSubUnsubscribe) && this.broker.equals(((OverlayAddressedMessage) obj).sender())) {
            handleUnsubscribe((PubSubUnsubscribe) ((OverlayAddressedMessage) obj).content());
        } else {
            channelHandlerContext.fireChannelRead(obj);
        }
    }

    private void handleSubscribed(PubSubSubscribed pubSubSubscribed) {
        LOG.trace("Got `{}` from broker `{}`.", pubSubSubscribed, this.broker);
        Pair<Promise<Void>, String> remove = this.requests.remove(pubSubSubscribed.getId());
        if (remove != null) {
            Promise promise = (Promise) remove.first();
            String str = (String) remove.second();
            if (this.subscriptions.add(str)) {
                LOG.debug("Subscribed to topic `{}` at broker `{}`.", str, this.broker);
            }
            promise.trySuccess((Object) null);
        }
    }

    private void handleUnsubscribed(PubSubUnsubscribed pubSubUnsubscribed) {
        LOG.trace("Got `{}` from broker `{}`.", pubSubUnsubscribed, this.broker);
        Pair<Promise<Void>, String> remove = this.requests.remove(pubSubUnsubscribed.getId());
        if (remove != null) {
            Promise promise = (Promise) remove.first();
            String str = (String) remove.second();
            if (this.subscriptions.remove(str)) {
                LOG.debug("Unsubscribed from topic `{}` from broker `{}`.", str, this.broker);
            }
            promise.trySuccess((Object) null);
        }
    }

    private void handlePublish(ChannelHandlerContext channelHandlerContext, PubSubPublish pubSubPublish) {
        LOG.trace("Got `{}` from broker `{}`.", pubSubPublish, this.broker);
        String topic = pubSubPublish.getTopic();
        if (this.subscriptions.contains(topic)) {
            LOG.trace("Got publication for topic `{}` from broker `{}`: {}", topic, this.broker, pubSubPublish.getContent());
            channelHandlerContext.fireChannelRead(pubSubPublish);
        } else {
            LOG.trace("Got publication for topic `{}` from broker `{}` we're not subscribed to. Discard publication: {}", topic, this.broker, pubSubPublish.getContent());
            pubSubPublish.release();
        }
    }

    private void handleUnsubscribe(PubSubUnsubscribe pubSubUnsubscribe) {
        LOG.trace("Got `{}` from broker `{}`.", pubSubUnsubscribe, this.broker);
        String topic = pubSubUnsubscribe.getTopic();
        if (this.subscriptions.remove(topic)) {
            LOG.debug("Unsubscribed from topic `{}` as broker `{}` is shutting down.", topic, this.broker);
        }
    }

    private void unsubscribeFromAll(ChannelHandlerContext channelHandlerContext) {
        for (String str : this.subscriptions) {
            LOG.trace("Channel is closing. Unsubscribe from topic `{}` at broker `{}`.", str, this.broker);
            channelHandlerContext.write(new OverlayAddressedMessage(PubSubUnsubscribe.of(str), this.broker));
        }
        if (this.subscriptions.isEmpty()) {
            return;
        }
        channelHandlerContext.flush();
        this.subscriptions.clear();
    }
}
