package org.apache.kafka.streams.kstream.internals;

import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.EmitStrategy;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.KStreamImplJoin;
import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.TimestampedWindowStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/AbstractKStreamTimeWindowAggregateProcessor.class */
public abstract class AbstractKStreamTimeWindowAggregateProcessor<KIn, VIn, VAgg> extends ContextualProcessor<KIn, VIn, Windowed<KIn>, Change<VAgg>> {
    private final String storeName;
    private final EmitStrategy emitStrategy;
    private final boolean sendOldValues;
    private TimestampedTupleForwarder<Windowed<KIn>, VAgg> tupleForwarder;
    protected TimestampedWindowStore<KIn, VAgg> windowStore;
    protected Sensor droppedRecordsSensor;
    protected Sensor emittedRecordsSensor;
    protected Sensor emitFinalLatencySensor;
    protected InternalProcessorContext<Windowed<KIn>, Change<VAgg>> internalProcessorContext;
    private final Time time = Time.SYSTEM;
    protected final KStreamImplJoin.TimeTracker timeTracker = new KStreamImplJoin.TimeTracker();
    protected long lastEmitWindowCloseTime = -1;
    protected long observedStreamTime = -1;

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractKStreamTimeWindowAggregateProcessor(String str, EmitStrategy emitStrategy, boolean z) {
        this.storeName = str;
        this.emitStrategy = emitStrategy;
        this.sendOldValues = z;
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.kafka.streams.processor.api.ContextualProcessor, org.apache.kafka.streams.processor.api.Processor
    public void init(ProcessorContext<Windowed<KIn>, Change<VAgg>> processorContext) {
        super.init(processorContext);
        this.internalProcessorContext = (InternalProcessorContext) processorContext;
        StreamsMetricsImpl metrics = this.internalProcessorContext.metrics();
        String name = Thread.currentThread().getName();
        String name2 = this.internalProcessorContext.currentNode().name();
        this.droppedRecordsSensor = TaskMetrics.droppedRecordsSensor(name, processorContext.taskId().toString(), metrics);
        this.emittedRecordsSensor = ProcessorNodeMetrics.emittedRecordsSensor(name, processorContext.taskId().toString(), name2, metrics);
        this.emitFinalLatencySensor = ProcessorNodeMetrics.emitFinalLatencySensor(name, processorContext.taskId().toString(), name2, metrics);
        this.windowStore = (TimestampedWindowStore) processorContext.getStateStore(this.storeName);
        if (this.emitStrategy.type() != EmitStrategy.StrategyType.ON_WINDOW_CLOSE) {
            this.tupleForwarder = new TimestampedTupleForwarder<>(this.windowStore, processorContext, new TimestampedCacheFlushListener(processorContext), this.sendOldValues);
            return;
        }
        Long processorMetadataForKey = this.internalProcessorContext.processorMetadataForKey(this.storeName);
        if (processorMetadataForKey != null) {
            this.lastEmitWindowCloseTime = processorMetadataForKey.longValue();
        }
        this.timeTracker.setEmitInterval(StreamsConfig.InternalConfig.getLong(processorContext.appConfigs(), StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_WINDOWED_AGGREGATION, 1000L));
        this.tupleForwarder = new TimestampedTupleForwarder<>(processorContext, this.sendOldValues);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void maybeForwardUpdate(Record<KIn, VIn> record, Window window, VAgg vagg, VAgg vagg2, long j) {
        if (this.emitStrategy.type() == EmitStrategy.StrategyType.ON_WINDOW_CLOSE) {
            return;
        }
        this.tupleForwarder.maybeForward(record.withKey(new Windowed(record.key(), window)).withValue(new Change(vagg2, this.sendOldValues ? vagg : null)).withTimestamp(j));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void maybeForwardFinalResult(Record<KIn, VIn> record, long j) {
        if (shouldEmitFinal(j)) {
            long emitRangeUpperBound = emitRangeUpperBound(j);
            if (emitRangeUpperBound >= 0) {
                long emitRangeLowerBound = emitRangeLowerBound(j);
                if (shouldRangeFetch(emitRangeLowerBound, emitRangeUpperBound)) {
                    fetchAndEmit(record, j, emitRangeLowerBound, emitRangeUpperBound);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void logSkippedRecordForExpiredWindow(Logger logger, long j, long j2, String str) {
        if (context().recordMetadata().isPresent()) {
            RecordMetadata recordMetadata = context().recordMetadata().get();
            logger.warn("Skipping record for expired window. topic=[{}] partition=[{}] offset=[{}] timestamp=[{}] window={} expiration=[{}] streamTime=[{}]", recordMetadata.topic(), Integer.valueOf(recordMetadata.partition()), Long.valueOf(recordMetadata.offset()), Long.valueOf(j), str, Long.valueOf(j2), Long.valueOf(this.observedStreamTime));
        } else {
            logger.warn("Skipping record for expired window. Topic, partition, and offset not known. timestamp=[{}] window={} expiration=[{}] streamTime=[{}]", Long.valueOf(j), str, Long.valueOf(j2), Long.valueOf(this.observedStreamTime));
        }
        this.droppedRecordsSensor.record();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void updateObservedStreamTime(long j) {
        this.observedStreamTime = Math.max(this.observedStreamTime, j);
    }

    private boolean shouldEmitFinal(long j) {
        if (this.emitStrategy.type() != EmitStrategy.StrategyType.ON_WINDOW_CLOSE) {
            return false;
        }
        long currentSystemTimeMs = this.internalProcessorContext.currentSystemTimeMs();
        if (currentSystemTimeMs < this.timeTracker.nextTimeToEmit) {
            return false;
        }
        this.timeTracker.nextTimeToEmit = currentSystemTimeMs;
        this.timeTracker.advanceNextTimeToEmit();
        return this.lastEmitWindowCloseTime == -1 || this.lastEmitWindowCloseTime < j;
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void fetchAndEmit(Record<KIn, VIn> record, long j, long j2, long j3) {
        long milliseconds = this.time.milliseconds();
        KeyValueIterator<Windowed<KIn>, VAgg> fetchAll = this.windowStore.fetchAll(j2, j3);
        Throwable th = null;
        int i = 0;
        while (fetchAll.hasNext()) {
            try {
                try {
                    i++;
                    KeyValue next = fetchAll.next();
                    this.tupleForwarder.maybeForward(record.withKey(next.key).withValue(new Change(((ValueAndTimestamp) next.value).value(), null)).withTimestamp(((ValueAndTimestamp) next.value).timestamp()).withHeaders(record.headers()));
                } finally {
                }
            } catch (Throwable th2) {
                if (fetchAll != null) {
                    if (th != null) {
                        try {
                            fetchAll.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        fetchAll.close();
                    }
                }
                throw th2;
            }
        }
        this.emittedRecordsSensor.record(i);
        this.emitFinalLatencySensor.record(this.time.milliseconds() - milliseconds);
        if (fetchAll != null) {
            if (0 != 0) {
                try {
                    fetchAll.close();
                } catch (Throwable th4) {
                    th.addSuppressed(th4);
                }
            } else {
                fetchAll.close();
            }
        }
        this.lastEmitWindowCloseTime = j;
        this.internalProcessorContext.addProcessorMetadataKeyValue(this.storeName, j);
    }

    protected abstract long emitRangeLowerBound(long j);

    protected abstract long emitRangeUpperBound(long j);

    protected abstract boolean shouldRangeFetch(long j, long j2);
}
