package org.apache.kafka.streams.state.internals;

import java.io.File;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
import org.apache.kafka.streams.processor.internals.StoreToProcessorContextAdapter;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.internals.PrefixedWindowKeySchemas;
import org.apache.kafka.streams.state.internals.Segment;
import org.apache.kafka.streams.state.internals.SegmentedBytesStore;
import org.rocksdb.RocksDBException;
import org.rocksdb.WriteBatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.class */
public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore<S extends Segment> implements SegmentedBytesStore {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) AbstractDualSchemaRocksDBSegmentedBytesStore.class);
    private final String name;
    protected final AbstractSegments<S> segments;
    protected final SegmentedBytesStore.KeySchema baseKeySchema;
    protected final Optional<SegmentedBytesStore.KeySchema> indexKeySchema;
    private final long retentionPeriod;
    protected ProcessorContext context;
    private StateStoreContext stateStoreContext;
    private Sensor expiredRecordSensor;
    protected long observedStreamTime = -1;
    protected boolean consistencyEnabled = false;
    protected Position position;
    protected OffsetCheckpoint positionCheckpoint;
    private volatile boolean open;

    /* JADX INFO: Access modifiers changed from: package-private */
    public AbstractDualSchemaRocksDBSegmentedBytesStore(String str, SegmentedBytesStore.KeySchema keySchema, Optional<SegmentedBytesStore.KeySchema> optional, AbstractSegments<S> abstractSegments, long j) {
        this.name = str;
        this.baseKeySchema = keySchema;
        this.indexKeySchema = optional;
        this.segments = abstractSegments;
        this.retentionPeriod = j;
    }

    @Override // org.apache.kafka.streams.state.internals.SegmentedBytesStore
    public KeyValueIterator<Bytes, byte[]> all() {
        long actualFrom = getActualFrom(0L, this.baseKeySchema instanceof PrefixedWindowKeySchemas.TimeFirstWindowKeySchema);
        List<S> allSegments = this.segments.allSegments(true);
        return new SegmentIterator(allSegments.iterator(), this.baseKeySchema.hasNextCondition(null, null, actualFrom, Long.MAX_VALUE, true), this.baseKeySchema.lowerRange(null, actualFrom), this.baseKeySchema.upperRange(null, Long.MAX_VALUE), true);
    }

    @Override // org.apache.kafka.streams.state.internals.SegmentedBytesStore
    public KeyValueIterator<Bytes, byte[]> backwardAll() {
        long actualFrom = getActualFrom(0L, this.baseKeySchema instanceof PrefixedWindowKeySchemas.TimeFirstWindowKeySchema);
        List<S> allSegments = this.segments.allSegments(false);
        return new SegmentIterator(allSegments.iterator(), this.baseKeySchema.hasNextCondition(null, null, actualFrom, Long.MAX_VALUE, false), this.baseKeySchema.lowerRange(null, actualFrom), this.baseKeySchema.upperRange(null, Long.MAX_VALUE), false);
    }

    @Override // org.apache.kafka.streams.state.internals.SegmentedBytesStore
    public void remove(Bytes bytes) {
        long segmentTimestamp = this.baseKeySchema.segmentTimestamp(bytes);
        this.observedStreamTime = Math.max(this.observedStreamTime, segmentTimestamp);
        S segmentForTimestamp = this.segments.getSegmentForTimestamp(segmentTimestamp);
        if (segmentForTimestamp == null) {
            return;
        }
        segmentForTimestamp.delete(bytes);
        if (hasIndex()) {
            segmentForTimestamp.delete(getIndexKeyValue(bytes, null).key);
        }
    }

    protected abstract KeyValue<Bytes, byte[]> getIndexKeyValue(Bytes bytes, byte[] bArr);

    /* JADX INFO: Access modifiers changed from: protected */
    public long getActualFrom(long j, boolean z) {
        return z ? Math.max(j, this.observedStreamTime - this.retentionPeriod) : Math.max(j, (this.observedStreamTime - this.retentionPeriod) + 1);
    }

    void putIndex(Bytes bytes, byte[] bArr) {
        if (!hasIndex()) {
            throw new IllegalStateException("Index store doesn't exist");
        }
        S orCreateSegmentIfLive = this.segments.getOrCreateSegmentIfLive(this.segments.segmentId(this.indexKeySchema.get().segmentTimestamp(bytes)), this.context, this.observedStreamTime);
        if (orCreateSegmentIfLive != null) {
            orCreateSegmentIfLive.put(bytes, bArr);
        }
    }

    byte[] getIndex(Bytes bytes) {
        if (!hasIndex()) {
            throw new IllegalStateException("Index store doesn't exist");
        }
        S orCreateSegmentIfLive = this.segments.getOrCreateSegmentIfLive(this.segments.segmentId(this.indexKeySchema.get().segmentTimestamp(bytes)), this.context, this.observedStreamTime);
        if (orCreateSegmentIfLive != null) {
            return (byte[]) orCreateSegmentIfLive.get(bytes);
        }
        return null;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void removeIndex(Bytes bytes) {
        if (!hasIndex()) {
            throw new IllegalStateException("Index store doesn't exist");
        }
        S orCreateSegmentIfLive = this.segments.getOrCreateSegmentIfLive(this.segments.segmentId(this.indexKeySchema.get().segmentTimestamp(bytes)), this.context, this.observedStreamTime);
        if (orCreateSegmentIfLive != null) {
            orCreateSegmentIfLive.delete(bytes);
        }
    }

    @Override // org.apache.kafka.streams.state.internals.SegmentedBytesStore
    public void put(Bytes bytes, byte[] bArr) {
        long segmentTimestamp = this.baseKeySchema.segmentTimestamp(bytes);
        this.observedStreamTime = Math.max(this.observedStreamTime, segmentTimestamp);
        S orCreateSegmentIfLive = this.segments.getOrCreateSegmentIfLive(this.segments.segmentId(segmentTimestamp), this.context, this.observedStreamTime);
        if (orCreateSegmentIfLive == null) {
            this.expiredRecordSensor.record(1.0d, this.context.currentSystemTimeMs());
            LOG.warn("Skipping record for expired segment.");
            return;
        }
        synchronized (this.position) {
            StoreQueryUtils.updatePosition(this.position, this.stateStoreContext);
            if (hasIndex()) {
                KeyValue<Bytes, byte[]> indexKeyValue = getIndexKeyValue(bytes, bArr);
                orCreateSegmentIfLive.put(indexKeyValue.key, indexKeyValue.value);
            }
            orCreateSegmentIfLive.put(bytes, bArr);
        }
    }

    @Override // org.apache.kafka.streams.state.internals.SegmentedBytesStore
    public byte[] get(Bytes bytes) {
        long segmentTimestamp = this.baseKeySchema.segmentTimestamp(bytes);
        if (this.baseKeySchema instanceof PrefixedWindowKeySchemas.TimeFirstWindowKeySchema) {
            if (segmentTimestamp < this.observedStreamTime - this.retentionPeriod) {
                LOG.debug("Record with key {} is expired as timestamp from key ({}) < actual stream time ({})", bytes.toString(), Long.valueOf(segmentTimestamp), Long.valueOf(this.observedStreamTime - this.retentionPeriod));
                return null;
            }
        } else if (segmentTimestamp < (this.observedStreamTime - this.retentionPeriod) + 1) {
            LOG.debug("Record with key {} is expired as timestamp from key ({}) < actual stream time ({})", bytes.toString(), Long.valueOf(segmentTimestamp), Long.valueOf((this.observedStreamTime - this.retentionPeriod) + 1));
            return null;
        }
        S segmentForTimestamp = this.segments.getSegmentForTimestamp(segmentTimestamp);
        if (segmentForTimestamp == null) {
            return null;
        }
        return (byte[]) segmentForTimestamp.get(bytes);
    }

    @Override // org.apache.kafka.streams.processor.StateStore
    public String name() {
        return this.name;
    }

    @Override // org.apache.kafka.streams.processor.StateStore
    @Deprecated
    public void init(ProcessorContext processorContext, StateStore stateStore) {
        this.context = processorContext;
        this.expiredRecordSensor = TaskMetrics.droppedRecordsSensor(Thread.currentThread().getName(), processorContext.taskId().toString(), ProcessorContextUtils.getMetricsImpl(processorContext));
        this.positionCheckpoint = new OffsetCheckpoint(new File(processorContext.stateDir(), name() + ".position"));
        this.position = StoreQueryUtils.readPositionFromCheckpoint(this.positionCheckpoint);
        this.segments.setPosition(this.position);
        this.segments.openExisting(processorContext, this.observedStreamTime);
        this.stateStoreContext.register(stateStore, this::restoreAllInternal, () -> {
            StoreQueryUtils.checkpointPosition(this.positionCheckpoint, this.position);
        });
        this.open = true;
        this.consistencyEnabled = StreamsConfig.InternalConfig.getBoolean(processorContext.appConfigs(), StreamsConfig.InternalConfig.IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED, false);
    }

    @Override // org.apache.kafka.streams.processor.StateStore
    public void init(StateStoreContext stateStoreContext, StateStore stateStore) {
        this.stateStoreContext = stateStoreContext;
        init(StoreToProcessorContextAdapter.adapt(stateStoreContext), stateStore);
    }

    @Override // org.apache.kafka.streams.processor.StateStore
    public void flush() {
        this.segments.flush();
    }

    @Override // org.apache.kafka.streams.processor.StateStore
    public void close() {
        this.open = false;
        this.segments.close();
    }

    @Override // org.apache.kafka.streams.processor.StateStore
    public boolean persistent() {
        return true;
    }

    @Override // org.apache.kafka.streams.processor.StateStore
    public boolean isOpen() {
        return this.open;
    }

    List<S> getSegments() {
        return this.segments.allSegments(false);
    }

    void restoreAllInternal(Collection<ConsumerRecord<byte[], byte[]>> collection) {
        synchronized (this.position) {
            try {
                for (Map.Entry<S, WriteBatch> entry : getWriteBatches(collection).entrySet()) {
                    S key = entry.getKey();
                    WriteBatch value = entry.getValue();
                    key.write(value);
                    value.close();
                }
            } catch (RocksDBException e) {
                throw new ProcessorStateException("Error restoring batch to store " + this.name, e);
            }
        }
    }

    abstract Map<S, WriteBatch> getWriteBatches(Collection<ConsumerRecord<byte[], byte[]>> collection);

    @Override // org.apache.kafka.streams.processor.StateStore
    public Position getPosition() {
        return this.position;
    }

    public boolean hasIndex() {
        return this.indexKeySchema.isPresent();
    }
}
