package org.apache.kafka.streams.kstream.internals;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.kstream.EmitStrategy;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.StoreFactory;
import org.apache.kafka.streams.state.DslWindowParams;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;

/* loaded from: input_file:BOOT-INF/lib/kafka-streams-3.8.1.jar:org/apache/kafka/streams/kstream/internals/StreamJoinedStoreFactory.class */
public class StreamJoinedStoreFactory<K, V1, V2> extends AbstractConfigurableStoreFactory {
    private final String name;
    private final JoinWindows windows;
    private final Serde<?> valueSerde;
    private final WindowBytesStoreSupplier storeSupplier;
    private final StreamJoinedInternal<K, V1, V2> joinedInternal;
    private boolean loggingEnabled;
    private final Map<String, String> logConfig;

    /* loaded from: input_file:BOOT-INF/lib/kafka-streams-3.8.1.jar:org/apache/kafka/streams/kstream/internals/StreamJoinedStoreFactory$Type.class */
    public enum Type {
        THIS,
        OTHER
    }

    public StreamJoinedStoreFactory(String str, JoinWindows joinWindows, StreamJoinedInternal<K, V1, V2> streamJoinedInternal, Type type) {
        super(streamJoinedInternal.dslStoreSuppliers());
        this.name = str + "-store";
        this.joinedInternal = streamJoinedInternal;
        this.windows = joinWindows;
        this.loggingEnabled = streamJoinedInternal.loggingEnabled();
        this.logConfig = new HashMap(streamJoinedInternal.logConfig());
        this.logConfig.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE);
        switch (type) {
            case THIS:
                this.valueSerde = streamJoinedInternal.valueSerde();
                this.storeSupplier = streamJoinedInternal.thisStoreSupplier();
                return;
            case OTHER:
                this.valueSerde = streamJoinedInternal.otherValueSerde();
                this.storeSupplier = streamJoinedInternal.otherStoreSupplier();
                return;
            default:
                throw new IllegalStateException("Unexpected value: " + type);
        }
    }

    @Override // org.apache.kafka.streams.processor.internals.StoreFactory
    public StateStore build() {
        StoreBuilder windowStoreBuilder = Stores.windowStoreBuilder(this.storeSupplier == null ? dslStoreSuppliers().windowStore(new DslWindowParams(this.name, Duration.ofMillis(retentionPeriod()), Duration.ofMillis(this.windows.size()), true, EmitStrategy.onWindowUpdate(), false, false)) : this.storeSupplier, this.joinedInternal.keySerde(), this.valueSerde);
        if (this.joinedInternal.loggingEnabled()) {
            windowStoreBuilder.withLoggingEnabled(this.logConfig);
        } else {
            windowStoreBuilder.withLoggingDisabled();
        }
        return windowStoreBuilder.build();
    }

    @Override // org.apache.kafka.streams.processor.internals.StoreFactory
    public long retentionPeriod() {
        return this.windows.size() + this.windows.gracePeriodMs();
    }

    @Override // org.apache.kafka.streams.processor.internals.StoreFactory
    public long historyRetention() {
        throw new IllegalStateException("historyRetention is not supported when not a versioned store");
    }

    @Override // org.apache.kafka.streams.processor.internals.StoreFactory
    public boolean loggingEnabled() {
        return this.loggingEnabled;
    }

    @Override // org.apache.kafka.streams.processor.internals.StoreFactory
    public String name() {
        return this.name;
    }

    @Override // org.apache.kafka.streams.processor.internals.StoreFactory
    public boolean isWindowStore() {
        return true;
    }

    @Override // org.apache.kafka.streams.processor.internals.StoreFactory
    public boolean isVersionedStore() {
        return false;
    }

    @Override // org.apache.kafka.streams.processor.internals.StoreFactory
    public Map<String, String> logConfig() {
        return this.logConfig;
    }

    @Override // org.apache.kafka.streams.processor.internals.StoreFactory
    public StoreFactory withCachingDisabled() {
        return this;
    }

    @Override // org.apache.kafka.streams.processor.internals.StoreFactory
    public StoreFactory withLoggingDisabled() {
        this.loggingEnabled = false;
        return this;
    }

    @Override // org.apache.kafka.streams.processor.internals.StoreFactory
    public boolean isCompatibleWith(StoreFactory storeFactory) {
        return (storeFactory instanceof StreamJoinedStoreFactory) && ((StreamJoinedStoreFactory) storeFactory).joinedInternal.equals(this.joinedInternal);
    }
}
