package org.apache.kafka.streams.kstream.internals;

import java.time.Duration;
import java.util.Objects;
import java.util.Optional;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.SerdeGetter;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/kafka-streams-3.8.1.jar:org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.class */
class KStreamKTableJoinProcessor<K1, K2, V1, V2, VOut> extends ContextualProcessor<K1, V1, K1, VOut> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) KStreamKTableJoin.class);
    private final KTableValueGetter<K2, V2> valueGetter;
    private final KeyValueMapper<? super K1, ? super V1, ? extends K2> keyMapper;
    private final ValueJoinerWithKey<? super K1, ? super V1, ? super V2, ? extends VOut> joiner;
    private final boolean leftJoin;
    private Sensor droppedRecordsSensor;
    private final Optional<Duration> gracePeriod;
    private TimeOrderedKeyValueBuffer<K1, V1, V1> buffer;
    protected long observedStreamTime = -1;
    private InternalProcessorContext internalProcessorContext;
    private final boolean useBuffer;
    private final String storeName;

    /* JADX INFO: Access modifiers changed from: package-private */
    public KStreamKTableJoinProcessor(KTableValueGetter<K2, V2> kTableValueGetter, KeyValueMapper<? super K1, ? super V1, ? extends K2> keyValueMapper, ValueJoinerWithKey<? super K1, ? super V1, ? super V2, ? extends VOut> valueJoinerWithKey, boolean z, Optional<Duration> optional, Optional<String> optional2) {
        this.valueGetter = kTableValueGetter;
        this.keyMapper = keyValueMapper;
        this.joiner = valueJoinerWithKey;
        this.leftJoin = z;
        this.useBuffer = optional.isPresent();
        this.gracePeriod = optional;
        this.storeName = optional2.orElse("");
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.kafka.streams.processor.api.ContextualProcessor, org.apache.kafka.streams.processor.api.Processor
    public void init(ProcessorContext<K1, VOut> processorContext) {
        super.init(processorContext);
        this.droppedRecordsSensor = TaskMetrics.droppedRecordsSensor(Thread.currentThread().getName(), processorContext.taskId().toString(), (StreamsMetricsImpl) processorContext.metrics());
        this.valueGetter.init(processorContext);
        this.internalProcessorContext = ProcessorContextUtils.asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext) processorContext);
        if (this.useBuffer) {
            if (!this.valueGetter.isVersioned() && this.gracePeriod.isPresent()) {
                throw new IllegalArgumentException("KTable must be versioned to use a grace period in a stream table join.");
            }
            this.buffer = (TimeOrderedKeyValueBuffer) Objects.requireNonNull(processorContext.getStateStore(this.storeName));
            this.buffer.setSerdesIfNull(new SerdeGetter(processorContext));
        }
    }

    @Override // org.apache.kafka.streams.processor.api.Processor
    public void process(Record<K1, V1> record) {
        this.internalProcessorContext = ProcessorContextUtils.asInternalProcessorContext((org.apache.kafka.streams.processor.ProcessorContext) context());
        updateObservedStreamTime(record.timestamp());
        if (maybeDropRecord(record)) {
            return;
        }
        if (!this.useBuffer) {
            doJoin(record);
        } else if (this.buffer.put(this.observedStreamTime, record, this.internalProcessorContext.recordContext())) {
            this.buffer.evictWhile(() -> {
                return true;
            }, this::emit);
        } else {
            doJoin(record);
        }
    }

    private void emit(TimeOrderedKeyValueBuffer.Eviction<K1, V1> eviction) {
        Record<K1, V1> withHeaders = new Record(eviction.key(), eviction.value(), eviction.recordContext().timestamp()).withHeaders(eviction.recordContext().headers());
        ProcessorRecordContext recordContext = this.internalProcessorContext.recordContext();
        try {
            this.internalProcessorContext.setRecordContext(eviction.recordContext());
            doJoin(withHeaders);
            this.internalProcessorContext.setRecordContext(recordContext);
        } catch (Throwable th) {
            this.internalProcessorContext.setRecordContext(recordContext);
            throw th;
        }
    }

    protected void updateObservedStreamTime(long j) {
        this.observedStreamTime = Math.max(this.observedStreamTime, j);
    }

    private void doJoin(Record<K1, V1> record) {
        Object value2 = getValue2(record, this.keyMapper.apply(record.key(), record.value()));
        if (this.leftJoin || value2 != null) {
            this.internalProcessorContext.forward((Record) record.withValue(this.joiner.apply(record.key(), record.value(), value2)));
        }
    }

    private V2 getValue2(Record<K1, V1> record, K2 k2) {
        if (k2 == null) {
            return null;
        }
        return (V2) ValueAndTimestamp.getValueOrNull(this.valueGetter.isVersioned() ? this.valueGetter.get(k2, record.timestamp()) : this.valueGetter.get(k2));
    }

    private boolean maybeDropRecord(Record<K1, V1> record) {
        K2 apply = this.keyMapper.apply(record.key(), record.value());
        if (this.leftJoin && record.key() == null && record.value() != null) {
            return false;
        }
        if (apply != null && record.value() != null) {
            return false;
        }
        if (context().recordMetadata().isPresent()) {
            RecordMetadata recordMetadata = context().recordMetadata().get();
            LOG.warn("Skipping record due to null join key or value. topic=[{}] partition=[{}] offset=[{}]", recordMetadata.topic(), Integer.valueOf(recordMetadata.partition()), Long.valueOf(recordMetadata.offset()));
        } else {
            LOG.warn("Skipping record due to null join key or value. Topic, partition, and offset not known.");
        }
        this.droppedRecordsSensor.record();
        return true;
    }

    @Override // org.apache.kafka.streams.processor.api.Processor
    public void close() {
        this.valueGetter.close();
    }
}
