package io.nats.vertx.impl;

import io.nats.client.Connection;
import io.nats.client.Dispatcher;
import io.nats.client.JetStream;
import io.nats.client.JetStreamApiException;
import io.nats.client.JetStreamManagement;
import io.nats.client.JetStreamOptions;
import io.nats.client.JetStreamSubscription;
import io.nats.client.Message;
import io.nats.client.MessageHandler;
import io.nats.client.PushSubscribeOptions;
import io.nats.client.api.AckPolicy;
import io.nats.client.api.ConsumerConfiguration;
import io.nats.client.api.DeliverPolicy;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.impl.ContextInternal;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;

/* loaded from: input_file:io/nats/vertx/impl/NatsImpl.class */
public class NatsImpl {
    public final Vertx vertx;
    public final Connection conn;
    public final Duration timeout;
    public final JetStreamOptions jso;
    public final JetStreamManagement jsm;
    public final JetStream js;
    protected final ConcurrentHashMap<String, Dispatcher> dispatcherMap = new ConcurrentHashMap<>();
    protected final ConcurrentHashMap<String, JetStreamSubscription> subscriptionMap = new ConcurrentHashMap<>();
    protected final AtomicReference<Handler<Throwable>> exceptionHandler = new AtomicReference<>();

    /* JADX INFO: Access modifiers changed from: protected */
    public NatsImpl(Connection connection, Vertx vertx, Handler<Throwable> handler, JetStreamOptions jetStreamOptions) {
        this.conn = connection;
        this.timeout = (jetStreamOptions == null || jetStreamOptions.getRequestTimeout() == null) ? connection.getOptions().getConnectionTimeout() : jetStreamOptions.getRequestTimeout();
        this.jso = JetStreamOptions.builder(jetStreamOptions).requestTimeout(this.timeout).build();
        try {
            this.jsm = connection.jetStreamManagement(this.jso);
            this.js = this.jsm.jetStream();
            this.vertx = vertx;
            this.exceptionHandler.set(handler);
        } catch (IOException e) {
            if (handler != null) {
                handler.handle(e);
            }
            throw new RuntimeException(e);
        }
    }

    public ContextInternal context() {
        return this.vertx.getOrCreateContext();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void endImpl(Handler<AsyncResult<Void>> handler) {
        handler.handle(context().promise().future());
    }

    public <T> Future<T> executeUnorderedBlocking(Callable<T> callable) {
        return context().executeBlocking(() -> {
            try {
                return callable.call();
            } catch (Exception e) {
                this.exceptionHandler.get().handle(e);
                throw e;
            }
        }, false);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void visitSubject(String str, String str2, DeliverPolicy deliverPolicy, boolean z, boolean z2, MessageHandler messageHandler) throws IOException, JetStreamApiException {
        visitSubject(str, Collections.singletonList(str2), deliverPolicy, z, z2, messageHandler);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void visitSubject(String str, List<String> list, DeliverPolicy deliverPolicy, boolean z, boolean z2, MessageHandler messageHandler) throws IOException, JetStreamApiException {
        JetStreamSubscription subscribe = this.js.subscribe((String) null, ((PushSubscribeOptions.Builder) ((PushSubscribeOptions.Builder) PushSubscribeOptions.builder().stream(str)).ordered(z2).configuration(ConsumerConfiguration.builder().ackPolicy(AckPolicy.None).deliverPolicy(deliverPolicy).headersOnly(Boolean.valueOf(z)).filterSubjects(list).build())).build());
        try {
            try {
                boolean z3 = false;
                long calculatedPending = subscribe.getConsumerInfo().getCalculatedPending();
                while (calculatedPending > 0) {
                    Message nextMessage = subscribe.nextMessage(this.timeout);
                    if (nextMessage != null) {
                        messageHandler.onMessage(nextMessage);
                        long j = calculatedPending - 1;
                        calculatedPending = j;
                        if (j == 0) {
                            subscribe.unsubscribe();
                            return;
                        }
                        z3 = false;
                    } else if (z3) {
                        return;
                    } else {
                        z3 = true;
                    }
                }
                subscribe.unsubscribe();
            } catch (Exception e) {
                this.exceptionHandler.get().handle(e);
                subscribe.unsubscribe();
            }
        } finally {
            subscribe.unsubscribe();
        }
    }
}
