package org.apache.kafka.streams.kstream.internals.suppress;

import java.util.Objects;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.KTableImpl;
import org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier;
import org.apache.kafka.streams.kstream.internals.KTableValueGetter;
import org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier;
import org.apache.kafka.streams.kstream.internals.suppress.TimeDefinitions;
import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.SerdeGetter;
import org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.internals.Maybe;
import org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBuffer;

/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.class */
public class KTableSuppressProcessorSupplier<K, V> implements KTableProcessorSupplier<K, V, K, V> {
    private final SuppressedInternal<K> suppress;
    private final String storeName;
    private final KTableImpl<K, ?, V> parentKTable;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier$KTableSuppressProcessor.class */
    public static final class KTableSuppressProcessor<K, V> extends ContextualProcessor<K, Change<V>, K, Change<V>> {
        private final long maxRecords;
        private final long maxBytes;
        private final long suppressDurationMillis;
        private final TimeDefinitions.TimeDefinition<K> bufferTimeDefinition;
        private final BufferFullStrategy bufferFullStrategy;
        private final boolean safeToDropTombstones;
        private final String storeName;
        private TimeOrderedKeyValueBuffer<K, V, Change<V>> buffer;
        private InternalProcessorContext<K, Change<V>> internalProcessorContext;
        private Sensor suppressionEmitSensor;
        private long observedStreamTime;

        private KTableSuppressProcessor(SuppressedInternal<K> suppressedInternal, String str) {
            this.observedStreamTime = -1L;
            this.storeName = str;
            Objects.requireNonNull(suppressedInternal);
            this.maxRecords = suppressedInternal.bufferConfig().maxRecords();
            this.maxBytes = suppressedInternal.bufferConfig().maxBytes();
            this.suppressDurationMillis = suppressedInternal.timeToWaitForMoreEvents().toMillis();
            this.bufferTimeDefinition = suppressedInternal.timeDefinition();
            this.bufferFullStrategy = suppressedInternal.bufferConfig().bufferFullStrategy();
            this.safeToDropTombstones = suppressedInternal.safeToDropTombstones();
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // org.apache.kafka.streams.processor.api.ContextualProcessor, org.apache.kafka.streams.processor.api.Processor
        public void init(ProcessorContext<K, Change<V>> processorContext) {
            super.init(processorContext);
            this.internalProcessorContext = (InternalProcessorContext) processorContext;
            this.suppressionEmitSensor = ProcessorNodeMetrics.suppressionEmitSensor(Thread.currentThread().getName(), processorContext.taskId().toString(), this.internalProcessorContext.currentNode().name(), this.internalProcessorContext.metrics());
            this.buffer = (TimeOrderedKeyValueBuffer) Objects.requireNonNull(processorContext.getStateStore(this.storeName));
            this.buffer.setSerdesIfNull(new SerdeGetter(processorContext));
        }

        @Override // org.apache.kafka.streams.processor.api.Processor
        public void process(Record<K, Change<V>> record) {
            this.observedStreamTime = Math.max(this.observedStreamTime, record.timestamp());
            buffer(record);
            enforceConstraints();
        }

        private void buffer(Record<K, Change<V>> record) {
            this.buffer.put(this.bufferTimeDefinition.time(this.internalProcessorContext, record.key()), record, this.internalProcessorContext.recordContext());
        }

        private void enforceConstraints() {
            long j = this.observedStreamTime - this.suppressDurationMillis;
            this.buffer.evictWhile(() -> {
                return Boolean.valueOf(this.buffer.minTimestamp() <= j);
            }, this::emit);
            if (overCapacity()) {
                switch (this.bufferFullStrategy) {
                    case EMIT:
                        this.buffer.evictWhile(this::overCapacity, this::emit);
                        return;
                    case SHUT_DOWN:
                        throw new StreamsException(String.format("%s buffer exceeded its max capacity. Currently [%d/%d] records and [%d/%d] bytes.", this.internalProcessorContext.currentNode().name(), Integer.valueOf(this.buffer.numRecords()), Long.valueOf(this.maxRecords), Long.valueOf(this.buffer.bufferSize()), Long.valueOf(this.maxBytes)));
                    default:
                        throw new UnsupportedOperationException("The bufferFullStrategy [" + this.bufferFullStrategy + "] is not implemented. This is a bug in Kafka Streams.");
                }
            }
        }

        private boolean overCapacity() {
            return ((long) this.buffer.numRecords()) > this.maxRecords || this.buffer.bufferSize() > this.maxBytes;
        }

        private void emit(TimeOrderedKeyValueBuffer.Eviction<K, Change<V>> eviction) {
            if (shouldForward(eviction.value())) {
                ProcessorRecordContext recordContext = this.internalProcessorContext.recordContext();
                this.internalProcessorContext.setRecordContext(eviction.recordContext());
                try {
                    this.internalProcessorContext.forward(eviction.record().withTimestamp(eviction.recordContext().timestamp()).withHeaders(eviction.recordContext().headers()));
                    this.suppressionEmitSensor.record(1.0d, this.internalProcessorContext.currentSystemTimeMs());
                } finally {
                    this.internalProcessorContext.setRecordContext(recordContext);
                }
            }
        }

        private boolean shouldForward(Change<V> change) {
            return (change.newValue == null && this.safeToDropTombstones) ? false : true;
        }
    }

    public KTableSuppressProcessorSupplier(SuppressedInternal<K> suppressedInternal, String str, KTableImpl<K, ?, V> kTableImpl) {
        this.suppress = suppressedInternal;
        this.storeName = str;
        this.parentKTable = kTableImpl;
        kTableImpl.enableSendingOldValues(true);
    }

    @Override // org.apache.kafka.streams.processor.api.ProcessorSupplier, java.util.function.Supplier
    public Processor<K, Change<V>, K, Change<V>> get() {
        return new KTableSuppressProcessor(this.suppress, this.storeName);
    }

    @Override // org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier
    public KTableValueGetterSupplier<K, V> view() {
        final KTableValueGetterSupplier<K, V> valueGetterSupplier = this.parentKTable.valueGetterSupplier();
        return new KTableValueGetterSupplier<K, V>() { // from class: org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessorSupplier.1
            @Override // org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier
            public KTableValueGetter<K, V> get() {
                final KTableValueGetter<K, V> kTableValueGetter = valueGetterSupplier.get();
                return new KTableValueGetter<K, V>() { // from class: org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessorSupplier.1.1
                    private TimeOrderedKeyValueBuffer<K, V, Change<V>> buffer;

                    @Override // org.apache.kafka.streams.kstream.internals.KTableValueGetter
                    public void init(ProcessorContext<?, ?> processorContext) {
                        kTableValueGetter.init(processorContext);
                        this.buffer = (TimeOrderedKeyValueBuffer) Objects.requireNonNull(processorContext.getStateStore(KTableSuppressProcessorSupplier.this.storeName));
                    }

                    @Override // org.apache.kafka.streams.kstream.internals.KTableValueGetter
                    public ValueAndTimestamp<V> get(K k) {
                        Maybe<ValueAndTimestamp<V>> priorValueForBuffered = this.buffer.priorValueForBuffered(k);
                        return priorValueForBuffered.isDefined() ? priorValueForBuffered.getNullableValue() : kTableValueGetter.get(k);
                    }

                    @Override // org.apache.kafka.streams.kstream.internals.KTableValueGetter
                    public boolean isVersioned() {
                        return false;
                    }

                    @Override // org.apache.kafka.streams.kstream.internals.KTableValueGetter
                    public void close() {
                        kTableValueGetter.close();
                    }
                };
            }

            @Override // org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier
            public String[] storeNames() {
                String[] storeNames = valueGetterSupplier.storeNames();
                String[] strArr = new String[1 + storeNames.length];
                System.arraycopy(storeNames, 0, strArr, 1, storeNames.length);
                strArr[0] = KTableSuppressProcessorSupplier.this.storeName;
                return strArr;
            }
        };
    }

    @Override // org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier
    public boolean enableSendingOldValues(boolean z) {
        return this.parentKTable.enableSendingOldValues(z);
    }
}
