package io.helidon.messaging;

import io.helidon.common.configurable.ThreadPoolSupplier;
import io.helidon.common.reactive.Multi;
import io.helidon.config.Config;
import io.helidon.config.ConfigSources;
import io.helidon.config.ConfigValue;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.microprofile.reactive.messaging.spi.Connector;
import org.eclipse.microprofile.reactive.messaging.spi.IncomingConnectorFactory;
import org.eclipse.microprofile.reactive.messaging.spi.OutgoingConnectorFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/helidon/messaging/MessagingImpl.class */
public class MessagingImpl implements Messaging {
    private static final AtomicInteger INSTANCE_COUNTER = new AtomicInteger();
    private Config config;
    private ThreadPoolSupplier threadPoolSupplier;
    private final Set<Emitter<?>> emitters = new HashSet();
    private final Map<String, Channel<?>> channelMap = new HashMap();
    private final Map<String, IncomingConnectorFactory> incomingConnectors = new HashMap();
    private final Map<String, OutgoingConnectorFactory> outgoingConnectors = new HashMap();
    private State state = State.INIT;
    private final int instanceNumber = INSTANCE_COUNTER.incrementAndGet();

    /* loaded from: input_file:io/helidon/messaging/MessagingImpl$State.class */
    enum State {
        INIT { // from class: io.helidon.messaging.MessagingImpl.State.1
            @Override // io.helidon.messaging.MessagingImpl.State
            void start(MessagingImpl messagingImpl) {
                messagingImpl.state = STARTED;
            }

            @Override // io.helidon.messaging.MessagingImpl.State
            void stop(MessagingImpl messagingImpl) {
                throw new MessagingException("Messaging is not started yet!");
            }
        },
        STARTED { // from class: io.helidon.messaging.MessagingImpl.State.2
            @Override // io.helidon.messaging.MessagingImpl.State
            void start(MessagingImpl messagingImpl) {
                throw new MessagingException("Messaging has been started already!");
            }

            @Override // io.helidon.messaging.MessagingImpl.State
            void stop(MessagingImpl messagingImpl) {
                messagingImpl.state = STOPPED;
            }
        },
        STOPPED { // from class: io.helidon.messaging.MessagingImpl.State.3
            @Override // io.helidon.messaging.MessagingImpl.State
            void start(MessagingImpl messagingImpl) {
                throw new MessagingException("Messaging has been stopped already!");
            }

            @Override // io.helidon.messaging.MessagingImpl.State
            void stop(MessagingImpl messagingImpl) {
                start(messagingImpl);
            }
        };

        abstract void start(MessagingImpl messagingImpl);

        abstract void stop(MessagingImpl messagingImpl);
    }

    @Override // io.helidon.messaging.Messaging
    public Messaging start() {
        this.state.start(this);
        if (!this.emitters.isEmpty()) {
            this.threadPoolSupplier = ThreadPoolSupplier.builder().threadNamePrefix("helidon-messaging-" + this.instanceNumber + "-").build();
            this.emitters.forEach(emitter -> {
                emitter.init(this.threadPoolSupplier.get(), Flow.defaultBufferSize());
            });
        }
        this.channelMap.values().forEach(this::findConnectors);
        this.channelMap.values().forEach((v0) -> {
            v0.connect();
        });
        return this;
    }

    @Override // io.helidon.messaging.Messaging
    public void stop() {
        this.state.stop(this);
        Multi create = Multi.create(this.incomingConnectors.values());
        Class<Object> cls = Object.class;
        Objects.requireNonNull(Object.class);
        Multi map = create.map((v1) -> {
            return r1.cast(v1);
        });
        Multi create2 = Multi.create(this.outgoingConnectors.values());
        Class<Object> cls2 = Object.class;
        Objects.requireNonNull(Object.class);
        Multi distinct = Multi.concat(map, create2.map((v1) -> {
            return r2.cast(v1);
        })).distinct();
        Class<Stoppable> cls3 = Stoppable.class;
        Objects.requireNonNull(Stoppable.class);
        Multi filter = distinct.filter(cls3::isInstance);
        Class<Stoppable> cls4 = Stoppable.class;
        Objects.requireNonNull(Stoppable.class);
        filter.map(cls4::cast).forEach((v0) -> {
            v0.stop();
        });
        if (this.emitters.isEmpty()) {
            return;
        }
        this.emitters.forEach((v0) -> {
            v0.complete();
        });
        this.threadPoolSupplier.get().shutdown();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setConfig(Config config) {
        this.config = config;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Config getConfig() {
        return this.config;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void addIncomingConnector(IncomingConnectorFactory incomingConnectorFactory) {
        this.incomingConnectors.put(getConnectorName(incomingConnectorFactory.getClass()), incomingConnectorFactory);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void addOutgoingConnector(OutgoingConnectorFactory outgoingConnectorFactory) {
        this.outgoingConnectors.put(getConnectorName(outgoingConnectorFactory.getClass()), outgoingConnectorFactory);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void addEmitter(Emitter<?> emitter) {
        this.emitters.add(emitter);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void registerChannel(Channel<?> channel) {
        if (this.channelMap.get(channel.name()) == null) {
            this.channelMap.put(channel.name(), channel);
        }
    }

    private String getConnectorName(Class<?> cls) {
        Connector annotation = cls.getAnnotation(Connector.class);
        if (annotation == null) {
            throw new MessagingException("Missing @Connector annotation in provided " + cls.getSimpleName());
        }
        return annotation.value();
    }

    private void findConnectors(Channel<?> channel) {
        Config.Builder disableEnvironmentVariablesSource = Config.builder().disableSystemPropertiesSource().disableEnvironmentVariablesSource();
        if (this.config != null) {
            disableEnvironmentVariablesSource.addSource(ConfigSources.create(this.config));
        }
        if (channel.getPublisherConfig() != null) {
            disableEnvironmentVariablesSource.addSource(ConnectorConfigHelper.prefixedConfigSource("mp.messaging.incoming." + channel.name(), channel.getPublisherConfig()));
        }
        if (channel.getSubscriberConfig() != null) {
            disableEnvironmentVariablesSource.addSource(ConnectorConfigHelper.prefixedConfigSource("mp.messaging.outgoing." + channel.name(), channel.getSubscriberConfig()));
        }
        Config build = disableEnvironmentVariablesSource.build();
        ConfigValue<String> incomingConnectorName = ConnectorConfigHelper.getIncomingConnectorName(build, channel.name());
        ConfigValue<String> outgoingConnectorName = ConnectorConfigHelper.getOutgoingConnectorName(build, channel.name());
        if (incomingConnectorName.isPresent()) {
            String str = (String) incomingConnectorName.get();
            channel.setPublisher(((IncomingConnectorFactory) Optional.ofNullable(this.incomingConnectors.get(str)).orElseThrow(() -> {
                return new MessagingException("Unknown incoming connector " + str);
            })).getPublisherBuilder(ConnectorConfigHelper.getConnectorConfig(channel.name(), str, build)).buildRs());
        }
        if (outgoingConnectorName.isPresent()) {
            String str2 = (String) outgoingConnectorName.get();
            channel.setSubscriber(((OutgoingConnectorFactory) Optional.ofNullable(this.outgoingConnectors.get(str2)).orElseThrow(() -> {
                return new MessagingException("Unknown outgoing connector " + str2);
            })).getSubscriberBuilder(ConnectorConfigHelper.getConnectorConfig(channel.name(), str2, build)).build());
        }
    }
}
