package org.apache.kafka.streams.state.internals;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.common.serialization.BytesSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.internals.RocksDBVersionedStore;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDBException;
import org.rocksdb.Snapshot;
import org.rocksdb.WriteBatchInterface;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:BOOT-INF/lib/kafka-streams-3.8.1.jar:org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.class */
public class LogicalKeyValueSegment implements Comparable<LogicalKeyValueSegment>, Segment, RocksDBVersionedStore.VersionedStoreSegment {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) LogicalKeyValueSegment.class);
    private final long id;
    private final String name;
    private final RocksDBStore physicalStore;
    private final PrefixKeyFormatter prefixKeyFormatter;
    final Set<KeyValueIterator<Bytes, byte[]>> openIterators = Collections.synchronizedSet(new HashSet());

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/kafka-streams-3.8.1.jar:org/apache/kafka/streams/state/internals/LogicalKeyValueSegment$PrefixKeyFormatter.class */
    public static class PrefixKeyFormatter {
        private final byte[] prefix;

        PrefixKeyFormatter(byte[] bArr) {
            this.prefix = bArr;
        }

        Bytes addPrefix(Bytes bytes) {
            return Bytes.wrap(addPrefix(bytes.get()));
        }

        byte[] addPrefix(byte[] bArr) {
            byte[] bArr2 = new byte[this.prefix.length + bArr.length];
            System.arraycopy(this.prefix, 0, bArr2, 0, this.prefix.length);
            System.arraycopy(bArr, 0, bArr2, this.prefix.length, bArr.length);
            return bArr2;
        }

        Bytes removePrefix(Bytes bytes) {
            return Bytes.wrap(removePrefix(bytes.get()));
        }

        private byte[] removePrefix(byte[] bArr) {
            int length = bArr.length - this.prefix.length;
            byte[] bArr2 = new byte[length];
            System.arraycopy(bArr, this.prefix.length, bArr2, 0, length);
            return bArr2;
        }

        Bytes getPrefix() {
            return Bytes.wrap(this.prefix);
        }

        boolean startsWithPrefix(Bytes bytes) {
            if (bytes.get().length < this.prefix.length) {
                return false;
            }
            byte[] bArr = new byte[this.prefix.length];
            System.arraycopy(bytes.get(), 0, bArr, 0, this.prefix.length);
            return Arrays.equals(this.prefix, bArr);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/kafka-streams-3.8.1.jar:org/apache/kafka/streams/state/internals/LogicalKeyValueSegment$StrippedPrefixKeyValueIteratorAdapter.class */
    public static class StrippedPrefixKeyValueIteratorAdapter implements KeyValueIterator<Bytes, byte[]> {
        private final KeyValueIterator<Bytes, byte[]> iteratorWithKeyPrefixes;
        private final Function<Bytes, Bytes> prefixRemover;
        private final Function<Bytes, Boolean> prefixChecker;

        StrippedPrefixKeyValueIteratorAdapter(KeyValueIterator<Bytes, byte[]> keyValueIterator, Function<Bytes, Bytes> function) {
            this(keyValueIterator, function, bytes -> {
                return true;
            });
        }

        StrippedPrefixKeyValueIteratorAdapter(KeyValueIterator<Bytes, byte[]> keyValueIterator, Function<Bytes, Bytes> function, Function<Bytes, Boolean> function2) {
            this.iteratorWithKeyPrefixes = keyValueIterator;
            this.prefixRemover = function;
            this.prefixChecker = function2;
            pruneNonPrefixedElements();
        }

        @Override // java.util.Iterator
        public boolean hasNext() {
            return this.iteratorWithKeyPrefixes.hasNext();
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // java.util.Iterator
        public KeyValue<Bytes, byte[]> next() {
            KeyValue next = this.iteratorWithKeyPrefixes.next();
            KeyValue<Bytes, byte[]> keyValue = new KeyValue<>(this.prefixRemover.apply(next.key), next.value);
            pruneNonPrefixedElements();
            return keyValue;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.kafka.streams.state.KeyValueIterator
        public Bytes peekNextKey() {
            return this.prefixRemover.apply(this.iteratorWithKeyPrefixes.peekNextKey());
        }

        @Override // java.util.Iterator
        public void remove() {
            this.iteratorWithKeyPrefixes.remove();
        }

        @Override // org.apache.kafka.streams.state.KeyValueIterator, java.io.Closeable, java.lang.AutoCloseable
        public void close() {
            this.iteratorWithKeyPrefixes.close();
        }

        private void pruneNonPrefixedElements() {
            while (this.iteratorWithKeyPrefixes.hasNext() && !this.prefixChecker.apply(this.iteratorWithKeyPrefixes.peekNextKey()).booleanValue()) {
                this.iteratorWithKeyPrefixes.next();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public LogicalKeyValueSegment(long j, String str, RocksDBStore rocksDBStore) {
        this.id = j;
        this.name = str;
        this.physicalStore = (RocksDBStore) Objects.requireNonNull(rocksDBStore);
        this.prefixKeyFormatter = new PrefixKeyFormatter(serializeLongToBytes(j));
    }

    @Override // org.apache.kafka.streams.state.internals.RocksDBVersionedStore.VersionedStoreSegment
    public long id() {
        return this.id;
    }

    @Override // java.lang.Comparable
    public int compareTo(LogicalKeyValueSegment logicalKeyValueSegment) {
        return Long.compare(this.id, logicalKeyValueSegment.id);
    }

    @Override // org.apache.kafka.streams.state.internals.Segment
    public synchronized void destroy() {
        if (this.id < 0) {
            throw new IllegalStateException("Negative segment ID indicates a reserved segment, which should not be destroyed. Reserved segments are cleaned up only when an entire store is closed, via the close() method rather than destroy().");
        }
        Bytes prefix = this.prefixKeyFormatter.getPrefix();
        this.physicalStore.deleteRange(prefix, prefix);
    }

    @Override // org.apache.kafka.streams.state.internals.Segment
    public synchronized void deleteRange(Bytes bytes, Bytes bytes2) {
        this.physicalStore.deleteRange(this.prefixKeyFormatter.addPrefix(bytes), this.prefixKeyFormatter.addPrefix(bytes2));
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.apache.kafka.streams.state.KeyValueStore
    public synchronized void put(Bytes bytes, byte[] bArr) {
        this.physicalStore.put(this.prefixKeyFormatter.addPrefix(bytes), bArr);
    }

    @Override // org.apache.kafka.streams.state.KeyValueStore
    public synchronized byte[] putIfAbsent(Bytes bytes, byte[] bArr) {
        return this.physicalStore.putIfAbsent(this.prefixKeyFormatter.addPrefix(bytes), bArr);
    }

    @Override // org.apache.kafka.streams.state.KeyValueStore
    public synchronized void putAll(List<KeyValue<Bytes, byte[]>> list) {
        this.physicalStore.putAll((List) list.stream().map(keyValue -> {
            return new KeyValue(this.prefixKeyFormatter.addPrefix((Bytes) keyValue.key), keyValue.value);
        }).collect(Collectors.toList()));
    }

    @Override // org.apache.kafka.streams.state.KeyValueStore
    public synchronized byte[] delete(Bytes bytes) {
        return this.physicalStore.delete(this.prefixKeyFormatter.addPrefix(bytes));
    }

    @Override // org.apache.kafka.streams.processor.StateStore
    public String name() {
        return this.name;
    }

    @Override // org.apache.kafka.streams.processor.StateStore
    @Deprecated
    public void init(ProcessorContext processorContext, StateStore stateStore) {
        throw new UnsupportedOperationException("cannot initialize a logical segment");
    }

    @Override // org.apache.kafka.streams.processor.StateStore
    public void flush() {
        throw new UnsupportedOperationException("nothing to flush for logical segment");
    }

    @Override // org.apache.kafka.streams.processor.StateStore
    public synchronized void close() {
        HashSet hashSet;
        synchronized (this.openIterators) {
            hashSet = new HashSet(this.openIterators);
            this.openIterators.clear();
        }
        if (hashSet.size() != 0) {
            log.warn("Closing {} open iterators for store {}", Integer.valueOf(hashSet.size()), this.name);
            Iterator it = hashSet.iterator();
            while (it.hasNext()) {
                ((KeyValueIterator) it.next()).close();
            }
        }
    }

    @Override // org.apache.kafka.streams.processor.StateStore
    public boolean persistent() {
        return true;
    }

    @Override // org.apache.kafka.streams.processor.StateStore
    public boolean isOpen() {
        return true;
    }

    @Override // org.apache.kafka.streams.state.ReadOnlyKeyValueStore
    public synchronized byte[] get(Bytes bytes) {
        return get(bytes, Optional.empty());
    }

    public synchronized byte[] get(Bytes bytes, Snapshot snapshot) {
        return get(bytes, Optional.of(snapshot));
    }

    private synchronized byte[] get(Bytes bytes, Optional<Snapshot> optional) {
        if (!optional.isPresent()) {
            return this.physicalStore.get(this.prefixKeyFormatter.addPrefix(bytes));
        }
        ReadOptions readOptions = new ReadOptions();
        Throwable th = null;
        try {
            try {
                readOptions.setSnapshot(optional.get());
                byte[] bArr = this.physicalStore.get(this.prefixKeyFormatter.addPrefix(bytes), readOptions);
                if (readOptions != null) {
                    if (0 != 0) {
                        try {
                            readOptions.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        readOptions.close();
                    }
                }
                return bArr;
            } finally {
            }
        } catch (Throwable th3) {
            if (readOptions != null) {
                if (th != null) {
                    try {
                        readOptions.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    readOptions.close();
                }
            }
            throw th3;
        }
    }

    public Snapshot getSnapshot() {
        return this.physicalStore.getSnapshot();
    }

    public void releaseSnapshot(Snapshot snapshot) {
        this.physicalStore.releaseSnapshot(snapshot);
    }

    @Override // org.apache.kafka.streams.state.ReadOnlyKeyValueStore
    public synchronized KeyValueIterator<Bytes, byte[]> range(Bytes bytes, Bytes bytes2) {
        KeyValueIterator<Bytes, byte[]> range = this.physicalStore.range(bytes == null ? this.prefixKeyFormatter.getPrefix() : this.prefixKeyFormatter.addPrefix(bytes), bytes2 == null ? RocksDBStore.incrementWithoutOverflow(this.prefixKeyFormatter.getPrefix()) : this.prefixKeyFormatter.addPrefix(bytes2), this.openIterators);
        PrefixKeyFormatter prefixKeyFormatter = this.prefixKeyFormatter;
        prefixKeyFormatter.getClass();
        Function function = prefixKeyFormatter::removePrefix;
        PrefixKeyFormatter prefixKeyFormatter2 = this.prefixKeyFormatter;
        prefixKeyFormatter2.getClass();
        return new StrippedPrefixKeyValueIteratorAdapter(range, function, prefixKeyFormatter2::startsWithPrefix);
    }

    @Override // org.apache.kafka.streams.state.ReadOnlyKeyValueStore
    public synchronized KeyValueIterator<Bytes, byte[]> all() {
        KeyValueIterator<Bytes, byte[]> prefixScan = this.physicalStore.prefixScan(this.prefixKeyFormatter.getPrefix(), new BytesSerializer(), this.openIterators);
        PrefixKeyFormatter prefixKeyFormatter = this.prefixKeyFormatter;
        prefixKeyFormatter.getClass();
        return new StrippedPrefixKeyValueIteratorAdapter(prefixScan, prefixKeyFormatter::removePrefix);
    }

    @Override // org.apache.kafka.streams.state.ReadOnlyKeyValueStore
    public long approximateNumEntries() {
        throw new UnsupportedOperationException("Cannot estimate num entries for logical segment");
    }

    @Override // org.apache.kafka.streams.state.internals.BatchWritingStore
    public void addToBatch(KeyValue<byte[], byte[]> keyValue, WriteBatchInterface writeBatchInterface) throws RocksDBException {
        this.physicalStore.addToBatch(new KeyValue<>(this.prefixKeyFormatter.addPrefix(keyValue.key), keyValue.value), writeBatchInterface);
    }

    @Override // org.apache.kafka.streams.state.internals.BatchWritingStore
    public void write(WriteBatchInterface writeBatchInterface) throws RocksDBException {
        this.physicalStore.write(writeBatchInterface);
    }

    private static byte[] serializeLongToBytes(long j) {
        return ByteBuffer.allocate(8).putLong(j).array();
    }
}
