package io.nats.vertx.impl;

import io.nats.client.Connection;
import io.nats.client.ConnectionListener;
import io.nats.client.JetStreamOptions;
import io.nats.client.KeyValueOptions;
import io.nats.client.Message;
import io.nats.client.Nats;
import io.nats.client.Options;
import io.nats.client.Subscription;
import io.nats.client.impl.Headers;
import io.nats.client.impl.VertxDispatcherFactory;
import io.nats.vertx.NatsClient;
import io.nats.vertx.NatsOptions;
import io.nats.vertx.NatsStream;
import io.nats.vertx.NatsVertxKeyValue;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.future.PromiseInternal;
import io.vertx.core.streams.StreamBase;
import io.vertx.core.streams.WriteStream;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

/* loaded from: input_file:io/nats/vertx/impl/NatsClientImpl.class */
public class NatsClientImpl implements NatsClient {
    private static final Duration NO_WAIT = Duration.ofNanos(1);
    private final Vertx vertx;
    private final boolean periodicFlush;
    private final Options options;
    private Promise<Void> connectFuture;
    private final AtomicReference<Handler<Throwable>> exceptionHandler;
    private final long periodicFlushInterval;
    private final AtomicReference<Connection> connection = new AtomicReference<>();
    private final ConcurrentHashMap<String, Subscription> subscriptionMap = new ConcurrentHashMap<>();

    public NatsClientImpl(Options.Builder builder, NatsOptions natsOptions) {
        this.vertx = natsOptions.getVertx();
        this.periodicFlush = natsOptions.isPeriodicFlush();
        this.periodicFlushInterval = natsOptions.getPeriodicFlushInterval();
        builder.dispatcherFactory(new VertxDispatcherFactory(this.vertx));
        this.options = wireConnectListener(builder, context());
        if (natsOptions.getExceptionHandler() == null) {
            this.exceptionHandler = new AtomicReference<>((v0) -> {
                v0.printStackTrace();
            });
        } else {
            this.exceptionHandler = new AtomicReference<>(natsOptions.getExceptionHandler());
        }
    }

    private ContextInternal context() {
        return this.vertx.getOrCreateContext();
    }

    private Options wireConnectListener(Options.Builder builder, ContextInternal contextInternal) {
        Options build = builder.build();
        PromiseInternal promise = contextInternal.promise();
        if (build.getConnectionListener() == null) {
            builder.connectionListener((connection, events) -> {
                if (events == ConnectionListener.Events.CONNECTED) {
                    promise.complete();
                }
            });
        } else {
            ConnectionListener connectionListener = build.getConnectionListener();
            builder.connectionListener((connection2, events2) -> {
                if (events2 == ConnectionListener.Events.CONNECTED) {
                    promise.complete();
                }
                connectionListener.connectionEvent(connection2, events2);
            });
        }
        this.connectFuture = promise;
        return builder.build();
    }

    @Override // io.nats.vertx.NatsClient
    public Future<Void> connect() {
        context().executeBlocking(promise -> {
            try {
                this.connection.set(Nats.connect(this.options));
            } catch (Exception e) {
                handleException(this.connectFuture, e);
            }
        }, false);
        if (this.periodicFlush) {
            context().setTimer(this.periodicFlushInterval, l -> {
                runFlush();
            });
        }
        return this.connectFuture.future();
    }

    private void runFlush() {
        if (this.periodicFlush) {
            context().executeBlocking(promise -> {
                try {
                    Connection connection = this.connection.get();
                    if (connection != null && connection.getStatus() == Connection.Status.CONNECTED) {
                        connection.flush(Duration.ofSeconds(1L));
                    }
                    context().setTimer(this.periodicFlushInterval, l -> {
                        runFlush();
                    });
                } catch (Exception e) {
                    this.exceptionHandler.get().handle(e);
                }
            });
        }
    }

    @Override // io.nats.vertx.NatsClient
    public Future<NatsStream> jetStream() {
        return jetStream(null);
    }

    @Override // io.nats.vertx.NatsClient
    public Future<NatsStream> jetStream(JetStreamOptions jetStreamOptions) {
        PromiseInternal promise = context().promise();
        context().executeBlocking(promise2 -> {
            try {
                promise.complete(new NatsStreamImpl(this.connection.get(), this.vertx, this.exceptionHandler.get(), jetStreamOptions));
            } catch (Exception e) {
                handleException(promise, e);
            }
        }, false);
        return promise.future();
    }

    @Override // io.nats.vertx.NatsClient
    public Future<NatsVertxKeyValue> keyValue(String str) {
        return keyValue(str, null);
    }

    @Override // io.nats.vertx.NatsClient
    public Future<NatsVertxKeyValue> keyValue(String str, KeyValueOptions keyValueOptions) {
        PromiseInternal promise = context().promise();
        context().executeBlocking(promise2 -> {
            try {
                promise.complete(new NatsVertxKeyValueImpl(this.connection.get(), this.vertx, this.exceptionHandler.get(), str, keyValueOptions));
            } catch (Exception e) {
                handleException(promise, e);
            }
        }, false);
        return promise.future();
    }

    public WriteStream<Message> exceptionHandler(Handler<Throwable> handler) {
        this.exceptionHandler.set(handler);
        return this;
    }

    public Future<Void> write(Message message) {
        PromiseInternal promise = context().promise();
        context().executeBlocking(promise2 -> {
            try {
                this.connection.get().publish(message);
                promise.complete();
            } catch (Exception e) {
                handleException(promise, e);
            }
        }, false);
        return promise.future();
    }

    public void write(Message message, Handler<AsyncResult<Void>> handler) {
        PromiseInternal promise = context().promise();
        context().executeBlocking(promise2 -> {
            try {
                this.connection.get().publish(message);
                promise.complete();
                handler.handle(promise.future());
            } catch (Exception e) {
                handleExceptionWithHandler(handler, promise, e);
            }
        }, false);
    }

    public void end(Handler<AsyncResult<Void>> handler) {
        PromiseInternal promise = context().promise();
        context().executeBlocking(promise2 -> {
            try {
                this.connection.get().close();
                promise.complete();
                handler.handle(promise.future());
            } catch (Exception e) {
                handleExceptionWithHandler(handler, promise, e);
            }
        }, false);
    }

    public WriteStream<Message> setWriteQueueMaxSize(int i) {
        return this;
    }

    public boolean writeQueueFull() {
        return false;
    }

    @Override // io.nats.vertx.NatsClient
    public NatsClient drainHandler(Handler<Void> handler) {
        return this;
    }

    @Override // io.nats.vertx.NatsClient
    public void publish(Message message, Handler<AsyncResult<Void>> handler) {
        write(message, handler);
    }

    @Override // io.nats.vertx.NatsClient
    public Future<Void> publish(Message message) {
        return write(message);
    }

    @Override // io.nats.vertx.NatsClient
    public Future<Void> publish(String str, String str2, String str3) {
        return publish(str, str2, str3.getBytes(StandardCharsets.UTF_8));
    }

    @Override // io.nats.vertx.NatsClient
    public Future<Void> publish(String str, String str2, byte[] bArr) {
        PromiseInternal promise = context().promise();
        context().executeBlocking(promise2 -> {
            try {
                this.connection.get().publish(str, str2, bArr);
                promise.complete();
            } catch (Exception e) {
                handleException(promise, e);
            }
        }, false);
        return promise.future();
    }

    @Override // io.nats.vertx.NatsClient
    public Future<Void> publish(String str, String str2) {
        return publish(str, str2.getBytes(StandardCharsets.UTF_8));
    }

    @Override // io.nats.vertx.NatsClient
    public Future<Void> publish(String str, byte[] bArr) {
        PromiseInternal promise = context().promise();
        context().executeBlocking(promise2 -> {
            try {
                this.connection.get().publish(str, bArr);
                promise.complete();
            } catch (Exception e) {
                handleException(promise, e);
            }
        }, false);
        return promise.future();
    }

    private void handleException(Promise<?> promise, Exception exc) {
        promise.fail(exc);
        this.exceptionHandler.get().handle(exc);
    }

    private void handleExceptionWithHandler(Handler<AsyncResult<Void>> handler, Promise<Void> promise, Exception exc) {
        promise.fail(exc);
        handler.handle(promise.future());
        this.exceptionHandler.get().handle(exc);
    }

    @Override // io.nats.vertx.NatsClient
    public void request(Message message, Handler<AsyncResult<Message>> handler) {
        PromiseInternal promise = context().promise();
        context().executeBlocking(promise2 -> {
            try {
                promise.complete((Message) this.connection.get().request(message).get());
                handler.handle(promise.future());
            } catch (Exception e) {
                promise.fail(e);
                handler.handle(promise.future());
                this.exceptionHandler.get().handle(e);
            }
        }, false);
    }

    @Override // io.nats.vertx.NatsClient
    public Future<Message> request(Message message) {
        return context().executeBlocking(promise -> {
            try {
                promise.complete((Message) this.connection.get().request(message).get());
            } catch (Exception e) {
                handleException(promise, e);
            }
        }, false);
    }

    @Override // io.nats.vertx.NatsClient
    public Future<Message> request(String str, String str2) {
        return request(str, str2.getBytes(StandardCharsets.UTF_8));
    }

    @Override // io.nats.vertx.NatsClient
    public Future<Message> request(String str, byte[] bArr) {
        return context().executeBlocking(promise -> {
            try {
                promise.complete((Message) this.connection.get().request(str, bArr).get());
            } catch (Exception e) {
                handleException(promise, e);
            }
        }, false);
    }

    @Override // io.nats.vertx.NatsClient
    public Future<Message> request(String str, Headers headers, byte[] bArr) {
        return context().executeBlocking(promise -> {
            try {
                promise.complete((Message) this.connection.get().request(str, headers, bArr).get());
            } catch (Exception e) {
                handleException(promise, e);
            }
        }, false);
    }

    @Override // io.nats.vertx.NatsClient
    public Future<Message> requestWithTimeout(String str, Headers headers, byte[] bArr, Duration duration) {
        return context().executeBlocking(promise -> {
            try {
                promise.complete((Message) this.connection.get().requestWithTimeout(str, headers, bArr, duration).get());
            } catch (Exception e) {
                handleException(promise, e);
            }
        }, false);
    }

    @Override // io.nats.vertx.NatsClient
    public void request(Message message, Handler<AsyncResult<Message>> handler, Duration duration) {
        PromiseInternal promise = context().promise();
        context().executeBlocking(promise2 -> {
            try {
                promise.complete((Message) this.connection.get().request(message).get(duration.toNanos(), TimeUnit.NANOSECONDS));
                handler.handle(promise.future());
            } catch (Exception e) {
                promise.fail(e);
                handler.handle(promise.future());
                this.exceptionHandler.get().handle(e);
            }
        }, false);
    }

    @Override // io.nats.vertx.NatsClient
    public Future<Message> request(Message message, Duration duration) {
        return context().executeBlocking(promise -> {
            try {
                promise.complete((Message) this.connection.get().request(message).get(duration.toNanos(), TimeUnit.NANOSECONDS));
            } catch (Exception e) {
                handleException(promise, e);
            }
        }, false);
    }

    @Override // io.nats.vertx.NatsClient
    public Future<Message> request(String str, String str2, Duration duration) {
        return request(str, str2.getBytes(StandardCharsets.UTF_8), duration);
    }

    @Override // io.nats.vertx.NatsClient
    public Future<Message> request(String str, byte[] bArr, Duration duration) {
        return context().executeBlocking(promise -> {
            try {
                promise.complete((Message) this.connection.get().request(str, bArr).get(duration.toNanos(), TimeUnit.NANOSECONDS));
            } catch (Exception e) {
                handleException(promise, e);
            }
        }, false);
    }

    @Override // io.nats.vertx.NatsClient
    public Future<Void> publish(String str, Headers headers, byte[] bArr) {
        PromiseInternal promise = context().promise();
        context().executeBlocking(promise2 -> {
            try {
                this.connection.get().publish(str, headers, bArr);
                promise.complete();
            } catch (Exception e) {
                handleException(promise, e);
            }
        }, false);
        return promise.future();
    }

    @Override // io.nats.vertx.NatsClient
    public Future<Void> publish(String str, String str2, Headers headers, byte[] bArr) {
        PromiseInternal promise = context().promise();
        context().executeBlocking(promise2 -> {
            try {
                this.connection.get().publish(str, str2, headers, bArr);
                promise.complete();
            } catch (Exception e) {
                handleException(promise, e);
            }
        }, false);
        return promise.future();
    }

    @Override // io.nats.vertx.NatsClient
    public Future<Void> subscribe(String str, Handler<Message> handler) {
        PromiseInternal promise = context().promise();
        context().executeBlocking(promise2 -> {
            try {
                Subscription subscribe = this.connection.get().subscribe(str);
                this.subscriptionMap.put(str, subscribe);
                context().executeBlocking(promise2 -> {
                    drainSubscription(handler, subscribe, str);
                });
                promise.complete();
            } catch (Exception e) {
                handleException(promise, e);
            }
        }, false);
        return promise.future();
    }

    private void drainSubscription(Handler<Message> handler, Subscription subscription, String str) {
        try {
            Message nextMessage = subscription.nextMessage(NO_WAIT);
            while (nextMessage != null) {
                try {
                    handler.handle(nextMessage);
                } catch (Exception e) {
                    this.exceptionHandler.get().handle(e);
                }
                nextMessage = subscription.nextMessage(NO_WAIT);
            }
            if (this.subscriptionMap.containsKey(str)) {
                context().setTimer(100L, l -> {
                    context().executeBlocking(promise -> {
                        drainSubscription(handler, subscription, str);
                    }, false);
                });
            }
        } catch (Exception e2) {
            this.exceptionHandler.get().handle(e2);
        }
    }

    @Override // io.nats.vertx.NatsClient
    public Future<Void> subscribe(String str, String str2, Handler<Message> handler) {
        PromiseInternal promise = context().promise();
        context().executeBlocking(promise2 -> {
            try {
                Subscription subscribe = this.connection.get().subscribe(str, str2);
                this.subscriptionMap.put(str, subscribe);
                context().executeBlocking(promise2 -> {
                    drainSubscription(handler, subscribe, str);
                }, false);
                promise.complete();
            } catch (Exception e) {
                handleException(promise, e);
            }
        }, false);
        return promise.future();
    }

    @Override // io.nats.vertx.NatsClient
    public Future<Void> unsubscribe(String str) {
        PromiseInternal promise = context().promise();
        context().executeBlocking(promise2 -> {
            try {
                Subscription subscription = this.subscriptionMap.get(str);
                if (subscription != null) {
                    this.subscriptionMap.remove(str);
                    subscription.unsubscribe();
                }
                promise.complete();
            } catch (Exception e) {
                handleException(promise, e);
            }
        }, false);
        return promise.future();
    }

    @Override // io.nats.vertx.NatsClient
    public Connection getConnection() {
        return this.connection.get();
    }

    @Override // io.nats.vertx.NatsClient
    public Future<Void> close() {
        PromiseInternal promise = context().promise();
        context().executeBlocking(promise2 -> {
            try {
                getConnection().close();
                promise.complete();
            } catch (Exception e) {
                handleException(promise, e);
            }
        }, false);
        return promise.future();
    }

    @Override // io.nats.vertx.NatsClient
    /* renamed from: drainHandler */
    public /* bridge */ /* synthetic */ WriteStream mo0drainHandler(Handler handler) {
        return drainHandler((Handler<Void>) handler);
    }

    public /* bridge */ /* synthetic */ void write(Object obj, Handler handler) {
        write((Message) obj, (Handler<AsyncResult<Void>>) handler);
    }

    /* renamed from: exceptionHandler, reason: collision with other method in class */
    public /* bridge */ /* synthetic */ StreamBase m3exceptionHandler(Handler handler) {
        return exceptionHandler((Handler<Throwable>) handler);
    }
}
