package io.synadia.kv;

import io.nats.client.Connection;
import io.nats.client.JetStreamApiException;
import io.nats.client.KeyValueOptions;
import io.nats.client.api.DeliverPolicy;
import io.nats.client.api.KeyResult;
import io.nats.client.api.KeyValueEntry;
import io.nats.client.api.KeyValueOperation;
import io.nats.client.impl.NatsKeyValueAdapter;
import io.nats.client.support.NatsKeyValueUtil;
import io.synadia.kv.codec.Codec;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;

/* loaded from: input_file:io/synadia/kv/EncodedKeyValue.class */
public class EncodedKeyValue<KeyType, DataType> {
    private final Connection connection;
    private final NatsKeyValueAdapter adapter;
    private final Codec<KeyType, DataType> codec;

    public EncodedKeyValue(Connection connection, String str, Codec<KeyType, DataType> codec) throws IOException {
        this(connection, str, codec, null);
    }

    public EncodedKeyValue(Connection connection, String str, Codec<KeyType, DataType> codec, KeyValueOptions keyValueOptions) throws IOException {
        this.connection = connection;
        this.codec = codec;
        this.adapter = new NatsKeyValueAdapter(connection, str, keyValueOptions);
    }

    public EncodedKeyValueEntry<KeyType, DataType> get(KeyType keytype) throws IOException, JetStreamApiException {
        return _get(this.adapter.get(this.codec.encodeKey(keytype)));
    }

    public EncodedKeyValueEntry<KeyType, DataType> get(KeyType keytype, long j) throws IOException, JetStreamApiException {
        return _get(this.adapter.get(this.codec.encodeKey(keytype), j));
    }

    private EncodedKeyValueEntry<KeyType, DataType> _get(KeyValueEntry keyValueEntry) {
        if (keyValueEntry == null) {
            return null;
        }
        return new EncodedKeyValueEntry<>(keyValueEntry, this.codec);
    }

    public List<EncodedKeyValueEntry<KeyType, DataType>> history(KeyType keytype) throws IOException, JetStreamApiException, InterruptedException {
        List history = this.adapter.history(this.codec.encodeKey(keytype));
        ArrayList arrayList = new ArrayList();
        Iterator it = history.iterator();
        while (it.hasNext()) {
            arrayList.add(new EncodedKeyValueEntry((KeyValueEntry) it.next(), this.codec));
        }
        return arrayList;
    }

    public long put(KeyType keytype, DataType datatype) throws IOException, JetStreamApiException {
        return this.adapter.put(this.codec.encodeKey(keytype), this.codec.encodeData(datatype));
    }

    public long create(KeyType keytype, DataType datatype) throws IOException, JetStreamApiException {
        return this.adapter.create(this.codec.encodeKey(keytype), this.codec.encodeData(datatype));
    }

    public long update(KeyType keytype, DataType datatype, long j) throws IOException, JetStreamApiException {
        return this.adapter.update(this.codec.encodeKey(keytype), this.codec.encodeData(datatype), j);
    }

    public void delete(KeyType keytype) throws IOException, JetStreamApiException {
        this.adapter.delete(this.codec.encodeKey(keytype));
    }

    public void delete(KeyType keytype, long j) throws IOException, JetStreamApiException {
        this.adapter.delete(this.codec.encodeKey(keytype), j);
    }

    public void purge(KeyType keytype) throws IOException, JetStreamApiException {
        this.adapter.purge(this.codec.encodeKey(keytype));
    }

    public void purge(KeyType keytype, long j) throws IOException, JetStreamApiException {
        this.adapter.purge(this.codec.encodeKey(keytype), j);
    }

    public LinkedBlockingQueue<EncodedKeyResult<KeyType, DataType>> consumeKeys() {
        return _consumeKeys(Collections.singletonList(this.adapter.readSubject(">")));
    }

    public LinkedBlockingQueue<EncodedKeyResult<KeyType, DataType>> consumeKeys(KeyType keytype) {
        if (this.codec.allowsFiltering()) {
            return _consumeKeys(Collections.singletonList(this.adapter.readSubject(this.codec.encodeFilter(keytype))));
        }
        throw new UnsupportedOperationException("Filters not supported");
    }

    public LinkedBlockingQueue<EncodedKeyResult<KeyType, DataType>> consumeKeys(List<KeyType> list) {
        if (!this.codec.allowsFiltering()) {
            throw new UnsupportedOperationException("Filters not supported");
        }
        ArrayList arrayList = new ArrayList(list.size());
        Iterator<KeyType> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(this.adapter.readSubject(this.codec.encodeFilter(it.next())));
        }
        return _consumeKeys(arrayList);
    }

    private LinkedBlockingQueue<EncodedKeyResult<KeyType, DataType>> _consumeKeys(List<String> list) {
        LinkedBlockingQueue<EncodedKeyResult<KeyType, DataType>> linkedBlockingQueue = new LinkedBlockingQueue<>();
        this.connection.getOptions().getExecutor().submit(() -> {
            try {
                this.adapter.visitSubject(list, DeliverPolicy.LastPerSubject, true, false, message -> {
                    if (NatsKeyValueUtil.getOperation(message.getHeaders()) == KeyValueOperation.PUT) {
                        linkedBlockingQueue.offer(new EncodedKeyResult(new KeyResult(new NatsKeyValueUtil.BucketAndKey(message).key), this.codec));
                    }
                });
                linkedBlockingQueue.offer(new EncodedKeyResult(new KeyResult(), this.codec));
            } catch (IOException | JetStreamApiException e) {
                linkedBlockingQueue.offer(new EncodedKeyResult(new KeyResult(e), this.codec));
            } catch (InterruptedException e2) {
                linkedBlockingQueue.offer(new EncodedKeyResult(new KeyResult(e2), this.codec));
                Thread.currentThread().interrupt();
            }
        });
        return linkedBlockingQueue;
    }
}
