package io.nats.vertx.impl;

import io.nats.client.Connection;
import io.nats.client.JetStreamApiException;
import io.nats.client.KeyValue;
import io.nats.client.KeyValueOptions;
import io.nats.client.PurgeOptions;
import io.nats.client.api.DeliverPolicy;
import io.nats.client.api.KeyValueEntry;
import io.nats.client.api.KeyValueOperation;
import io.nats.client.api.KeyValuePurgeOptions;
import io.nats.client.api.KeyValueStatus;
import io.nats.client.api.KeyValueWatchOption;
import io.nats.client.api.KeyValueWatcher;
import io.nats.client.api.MessageInfo;
import io.nats.client.api.PublishAck;
import io.nats.client.impl.Headers;
import io.nats.client.impl.NatsKeyValueWatchSubscription;
import io.nats.client.impl.NatsMessage;
import io.nats.client.support.DateTimeUtils;
import io.nats.client.support.NatsKeyValueUtil;
import io.nats.client.support.Validator;
import io.nats.vertx.NatsVertxKeyValue;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.streams.StreamBase;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;

/* loaded from: input_file:io/nats/vertx/impl/NatsVertxKeyValueImpl.class */
public class NatsVertxKeyValueImpl extends NatsImpl implements NatsVertxKeyValue {
    private final String bucketName;
    private final String streamName;
    private final String streamSubject;
    private final String readPrefix;
    private final String writePrefix;
    private final KeyValue kv;

    public NatsVertxKeyValueImpl(Connection connection, Vertx vertx, Handler<Throwable> handler, String str, KeyValueOptions keyValueOptions) {
        super(connection, vertx, handler, keyValueOptions == null ? null : keyValueOptions.getJetStreamOptions());
        this.bucketName = Validator.validateBucketName(str, true);
        this.streamName = NatsKeyValueUtil.toStreamName(str);
        this.streamSubject = NatsKeyValueUtil.toStreamSubject(str);
        this.readPrefix = NatsKeyValueUtil.toKeyPrefix(str);
        if (keyValueOptions == null || keyValueOptions.getJetStreamOptions().isDefaultPrefix()) {
            this.writePrefix = this.readPrefix;
        } else {
            this.writePrefix = keyValueOptions.getJetStreamOptions().getPrefix() + this.readPrefix;
        }
        try {
            this.kv = connection.keyValue(str, keyValueOptions);
        } catch (IOException e) {
            if (handler != null) {
                handler.handle(e);
            }
            throw new RuntimeException(e);
        }
    }

    @Override // io.nats.vertx.NatsVertxKeyValue
    public NatsImpl getImpl() {
        return this;
    }

    public NatsVertxKeyValueImpl exceptionHandler(Handler<Throwable> handler) {
        this.exceptionHandler.set(handler);
        return this;
    }

    @Override // io.nats.vertx.NatsVertxKeyValue
    public String readSubject(String str) {
        return this.readPrefix + str;
    }

    @Override // io.nats.vertx.NatsVertxKeyValue
    public String writeSubject(String str) {
        return this.writePrefix + str;
    }

    @Override // io.nats.vertx.NatsVertxKeyValue
    public String getBucketName() {
        return this.bucketName;
    }

    @Override // io.nats.vertx.NatsVertxKeyValue
    public Future<KeyValueEntry> get(String str) {
        return _getFuture(str, null, true);
    }

    @Override // io.nats.vertx.NatsVertxKeyValue
    public Future<KeyValueEntry> get(String str, long j) {
        return _getFuture(str, Long.valueOf(j), true);
    }

    @Override // io.nats.vertx.NatsVertxKeyValue
    public Future<Long> put(String str, byte[] bArr) {
        return publishData(str, bArr, null);
    }

    @Override // io.nats.vertx.NatsVertxKeyValue
    public Future<Long> put(String str, String str2) {
        return publishData(str, str2.getBytes(StandardCharsets.UTF_8), null);
    }

    @Override // io.nats.vertx.NatsVertxKeyValue
    public Future<Long> put(String str, Number number) {
        return publishData(str, number.toString().getBytes(StandardCharsets.US_ASCII), null);
    }

    @Override // io.nats.vertx.NatsVertxKeyValue
    public Future<Long> create(String str, byte[] bArr) {
        return executeUnorderedBlocking(() -> {
            KeyValueEntry _getLastEntry;
            try {
                return Long.valueOf(_publish(str, bArr, new Headers().add("Nats-Expected-Last-Subject-Sequence", new String[]{Long.toString(0L)})).getSeqno());
            } catch (JetStreamApiException e) {
                if (e.getApiErrorCode() != 10071 || (_getLastEntry = _getLastEntry(str, false)) == null || _getLastEntry.getOperation() == KeyValueOperation.PUT) {
                    throw e;
                }
                return Long.valueOf(_publish(str, bArr, new Headers().add("Nats-Expected-Last-Subject-Sequence", new String[]{Long.toString(_getLastEntry.getRevision())})).getSeqno());
            }
        });
    }

    @Override // io.nats.vertx.NatsVertxKeyValue
    public Future<Long> update(String str, byte[] bArr, long j) {
        return publishData(str, bArr, new Headers().add("Nats-Expected-Last-Subject-Sequence", new String[]{Long.toString(j)}));
    }

    @Override // io.nats.vertx.NatsVertxKeyValue
    public Future<Long> update(String str, String str2, long j) {
        return update(str, str2.getBytes(StandardCharsets.UTF_8), j);
    }

    @Override // io.nats.vertx.NatsVertxKeyValue
    public Future<Void> delete(String str) {
        return publishCommand(str, NatsKeyValueUtil.getDeleteHeaders());
    }

    @Override // io.nats.vertx.NatsVertxKeyValue
    public Future<Void> delete(String str, long j) {
        return publishCommand(str, NatsKeyValueUtil.getDeleteHeaders().put("Nats-Expected-Last-Subject-Sequence", new String[]{Long.toString(j)}));
    }

    @Override // io.nats.vertx.NatsVertxKeyValue
    public Future<Void> purge(String str) {
        return publishCommand(str, NatsKeyValueUtil.getPurgeHeaders());
    }

    @Override // io.nats.vertx.NatsVertxKeyValue
    public Future<Void> purge(String str, long j) {
        return publishCommand(str, NatsKeyValueUtil.getPurgeHeaders().put("Nats-Expected-Last-Subject-Sequence", new String[]{Long.toString(j)}));
    }

    @Override // io.nats.vertx.NatsVertxKeyValue
    public Future<NatsKeyValueWatchSubscription> watch(String str, KeyValueWatcher keyValueWatcher, KeyValueWatchOption... keyValueWatchOptionArr) {
        return watch(Collections.singletonList(str), keyValueWatcher, -1L, keyValueWatchOptionArr);
    }

    @Override // io.nats.vertx.NatsVertxKeyValue
    public Future<NatsKeyValueWatchSubscription> watch(String str, KeyValueWatcher keyValueWatcher, long j, KeyValueWatchOption... keyValueWatchOptionArr) {
        return watch(Collections.singletonList(str), keyValueWatcher, j, keyValueWatchOptionArr);
    }

    @Override // io.nats.vertx.NatsVertxKeyValue
    public Future<NatsKeyValueWatchSubscription> watch(List<String> list, KeyValueWatcher keyValueWatcher, KeyValueWatchOption... keyValueWatchOptionArr) {
        return watch(list, keyValueWatcher, -1L, keyValueWatchOptionArr);
    }

    @Override // io.nats.vertx.NatsVertxKeyValue
    public Future<NatsKeyValueWatchSubscription> watch(List<String> list, KeyValueWatcher keyValueWatcher, long j, KeyValueWatchOption... keyValueWatchOptionArr) {
        return executeUnorderedBlocking(() -> {
            Validator.validateKvKeysWildcardAllowedRequired(list);
            Validator.validateNotNull(keyValueWatcher, "Watcher is required");
            return this.kv.watch(list, keyValueWatcher, j, keyValueWatchOptionArr);
        });
    }

    @Override // io.nats.vertx.NatsVertxKeyValue
    public Future<NatsKeyValueWatchSubscription> watchAll(KeyValueWatcher keyValueWatcher, KeyValueWatchOption... keyValueWatchOptionArr) {
        return watch(Collections.singletonList(">"), keyValueWatcher, -1L, keyValueWatchOptionArr);
    }

    @Override // io.nats.vertx.NatsVertxKeyValue
    public Future<NatsKeyValueWatchSubscription> watchAll(KeyValueWatcher keyValueWatcher, long j, KeyValueWatchOption... keyValueWatchOptionArr) {
        return watch(Collections.singletonList(">"), keyValueWatcher, j, keyValueWatchOptionArr);
    }

    @Override // io.nats.vertx.NatsVertxKeyValue
    public Future<List<String>> keys() {
        return _keys(Collections.singletonList(readSubject(">")));
    }

    @Override // io.nats.vertx.NatsVertxKeyValue
    public Future<List<String>> keys(String str) {
        return _keys(Collections.singletonList(readSubject(str)));
    }

    @Override // io.nats.vertx.NatsVertxKeyValue
    public Future<List<String>> keys(List<String> list) {
        ArrayList arrayList = new ArrayList(list.size());
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(readSubject(it.next()));
        }
        return _keys(arrayList);
    }

    private Future<List<String>> _keys(List<String> list) {
        return executeUnorderedBlocking(() -> {
            ArrayList arrayList = new ArrayList();
            visitSubject(this.streamName, (List<String>) list, DeliverPolicy.LastPerSubject, true, false, message -> {
                if (NatsKeyValueUtil.getOperation(message.getHeaders()) == KeyValueOperation.PUT) {
                    arrayList.add(new NatsKeyValueUtil.BucketAndKey(message).key);
                }
            });
            return arrayList;
        });
    }

    @Override // io.nats.vertx.NatsVertxKeyValue
    public Future<List<KeyValueEntry>> history(String str) {
        return executeUnorderedBlocking(() -> {
            Validator.validateNonWildcardKvKeyRequired(str);
            ArrayList arrayList = new ArrayList();
            visitSubject(this.streamName, readSubject(str), DeliverPolicy.All, false, true, message -> {
                arrayList.add(new KeyValueEntry(message));
            });
            return arrayList;
        });
    }

    @Override // io.nats.vertx.NatsVertxKeyValue
    public Future<Void> purgeDeletes() {
        return purgeDeletes(null);
    }

    @Override // io.nats.vertx.NatsVertxKeyValue
    public Future<Void> purgeDeletes(KeyValuePurgeOptions keyValuePurgeOptions) {
        return executeUnorderedBlocking(() -> {
            long deleteMarkersThresholdMillis = keyValuePurgeOptions == null ? KeyValuePurgeOptions.DEFAULT_THRESHOLD_MILLIS : keyValuePurgeOptions.getDeleteMarkersThresholdMillis();
            ZonedDateTime fromNow = deleteMarkersThresholdMillis < 0 ? DateTimeUtils.fromNow(600000L) : deleteMarkersThresholdMillis == 0 ? DateTimeUtils.fromNow(KeyValuePurgeOptions.DEFAULT_THRESHOLD_MILLIS) : DateTimeUtils.fromNow(-deleteMarkersThresholdMillis);
            ArrayList arrayList = new ArrayList();
            ArrayList arrayList2 = new ArrayList();
            ZonedDateTime zonedDateTime = fromNow;
            visitSubject(this.streamName, this.streamSubject, DeliverPolicy.LastPerSubject, true, false, message -> {
                KeyValueEntry keyValueEntry = new KeyValueEntry(message);
                if (keyValueEntry.getOperation() != KeyValueOperation.PUT) {
                    if (keyValueEntry.getCreated().isAfter(zonedDateTime)) {
                        arrayList2.add(new NatsKeyValueUtil.BucketAndKey(message).key);
                    } else {
                        arrayList.add(new NatsKeyValueUtil.BucketAndKey(message).key);
                    }
                }
            });
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                this.jsm.purgeStream(this.streamName, PurgeOptions.subject(readSubject((String) it.next())));
            }
            Iterator it2 = arrayList2.iterator();
            while (it2.hasNext()) {
                this.jsm.purgeStream(this.streamName, PurgeOptions.builder().subject(readSubject((String) it2.next())).keep(1L).build());
            }
            return null;
        });
    }

    @Override // io.nats.vertx.NatsVertxKeyValue
    public Future<KeyValueStatus> getStatus() {
        return executeUnorderedBlocking(() -> {
            return new KeyValueStatus(this.jsm.getStreamInfo(this.streamName));
        });
    }

    private PublishAck _publish(String str, byte[] bArr, Headers headers) throws IOException, JetStreamApiException {
        Validator.validateNonWildcardKvKeyRequired(str);
        return this.js.publish(NatsMessage.builder().subject(writeSubject(str)).data(bArr).headers(headers).build());
    }

    private Future<Long> publishData(String str, byte[] bArr, Headers headers) {
        return executeUnorderedBlocking(() -> {
            return Long.valueOf(_publish(str, bArr, headers).getSeqno());
        });
    }

    private Future<Void> publishCommand(String str, Headers headers) {
        return executeUnorderedBlocking(() -> {
            _publish(str, null, headers);
            return null;
        });
    }

    Future<KeyValueEntry> _getFuture(String str, Long l, boolean z) {
        return executeUnorderedBlocking(() -> {
            Validator.validateNonWildcardKvKeyRequired(str);
            return l == null ? _getLastEntry(str, z) : _getRevisionEntry(str, l.longValue(), z);
        });
    }

    private KeyValueEntry resolveExistingOnly(KeyValueEntry keyValueEntry, boolean z) {
        if (!z || keyValueEntry.getOperation() == KeyValueOperation.PUT) {
            return keyValueEntry;
        }
        return null;
    }

    private KeyValueEntry _getLastEntry(String str, boolean z) throws IOException, JetStreamApiException {
        MessageInfo _getLastMi = _getLastMi(readSubject(str));
        KeyValueEntry keyValueEntry = _getLastMi == null ? null : new KeyValueEntry(_getLastMi);
        if (keyValueEntry != null) {
            keyValueEntry = resolveExistingOnly(keyValueEntry, z);
        }
        return keyValueEntry;
    }

    private KeyValueEntry _getRevisionEntry(String str, long j, boolean z) throws IOException, JetStreamApiException {
        MessageInfo _getRevisionMi = _getRevisionMi(j);
        KeyValueEntry keyValueEntry = _getRevisionMi == null ? null : new KeyValueEntry(_getRevisionMi);
        if (keyValueEntry != null) {
            keyValueEntry = str.equals(keyValueEntry.getKey()) ? resolveExistingOnly(keyValueEntry, z) : null;
        }
        return keyValueEntry;
    }

    protected MessageInfo _getLastMi(String str) throws IOException, JetStreamApiException {
        try {
            return this.jsm.getLastMessage(this.streamName, str);
        } catch (JetStreamApiException e) {
            if (e.getApiErrorCode() == 10037) {
                return null;
            }
            throw e;
        }
    }

    protected MessageInfo _getRevisionMi(long j) throws IOException, JetStreamApiException {
        try {
            return this.jsm.getMessage(this.streamName, j);
        } catch (JetStreamApiException e) {
            if (e.getApiErrorCode() == 10037) {
                return null;
            }
            throw e;
        }
    }

    /* renamed from: exceptionHandler, reason: collision with other method in class */
    public /* bridge */ /* synthetic */ StreamBase m5exceptionHandler(Handler handler) {
        return exceptionHandler((Handler<Throwable>) handler);
    }
}
