package io.atleon.kafka;

import io.atleon.core.StarterStopper;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;

/* loaded from: input_file:io/atleon/kafka/KafkaLagThresholdStarterStopper.class */
public class KafkaLagThresholdStarterStopper implements StarterStopper {
    private static final Duration DEFAULT_SAMPLE_INTERVAL = Duration.ofSeconds(5);
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaLagThresholdStarterStopper.class);
    private final KafkaConfigSource configSource;
    private final Collection<String> consumerGroupIds;
    private final Duration sampleDelay;
    private final long highTide;
    private final long lowTide;

    private KafkaLagThresholdStarterStopper(KafkaConfigSource kafkaConfigSource, Collection<String> collection, Duration duration, long j, long j2) {
        this.configSource = kafkaConfigSource;
        this.consumerGroupIds = collection;
        this.sampleDelay = duration;
        this.highTide = j;
        this.lowTide = j2;
    }

    public static KafkaLagThresholdStarterStopper create(KafkaConfigSource kafkaConfigSource, String str) {
        return create(kafkaConfigSource, Collections.singleton(str));
    }

    public static KafkaLagThresholdStarterStopper create(KafkaConfigSource kafkaConfigSource, Collection<String> collection) {
        return new KafkaLagThresholdStarterStopper(kafkaConfigSource, collection, DEFAULT_SAMPLE_INTERVAL, 0L, 0L);
    }

    public Flux<Boolean> startStop() {
        return ((Mono) this.configSource.create()).flatMapMany(this::startStop);
    }

    public KafkaLagThresholdStarterStopper withSampleDelay(Duration duration) {
        return new KafkaLagThresholdStarterStopper(this.configSource, this.consumerGroupIds, duration, this.highTide, this.lowTide);
    }

    public KafkaLagThresholdStarterStopper withThreshold(long j) {
        return withThresholds(j, j);
    }

    public KafkaLagThresholdStarterStopper withThresholds(long j, long j2) {
        if (j < j2) {
            throw new IllegalArgumentException("highTide must be greater-than-or-equal-to lowTide");
        }
        return new KafkaLagThresholdStarterStopper(this.configSource, this.consumerGroupIds, this.sampleDelay, j, j2);
    }

    private Flux<Boolean> startStop(KafkaConfig kafkaConfig) {
        return Flux.using(() -> {
            return ReactiveAdmin.create(kafkaConfig.nativeProperties());
        }, this::startStop, (v0) -> {
            v0.close();
        });
    }

    private Flux<Boolean> startStop(ReactiveAdmin reactiveAdmin) {
        return reactiveAdmin.listTopicPartitionGroupOffsets(this.consumerGroupIds).reduce(0L, (l, topicPartitionGroupOffsets) -> {
            return Long.valueOf(l.longValue() + topicPartitionGroupOffsets.estimateLag());
        }).doOnNext((v1) -> {
            logCalculatedLag(v1);
        }).retryWhen(Retry.fixedDelay(Long.MAX_VALUE, this.sampleDelay).doBeforeRetry(this::logCalculationFailure)).repeatWhen(flux -> {
            return flux.delayElements(this.sampleDelay);
        }).scan(false, (bool, l2) -> {
            return Boolean.valueOf(bool.booleanValue() ? l2.longValue() <= this.highTide : l2.longValue() <= this.lowTide);
        }).skip(1L).distinctUntilChanged().doOnNext((v1) -> {
            logStartStopSignal(v1);
        });
    }

    private void logCalculatedLag(long j) {
        LOGGER.debug("Calculated lag for consumerGroupsIds={} is {} where highTide={} and lowTide={}", new Object[]{this.consumerGroupIds, Long.valueOf(j), Long.valueOf(this.highTide), Long.valueOf(this.lowTide)});
    }

    private void logCalculationFailure(Retry.RetrySignal retrySignal) {
        LOGGER.error("Failed to calculate total lag for consumerGroupsIds={} where signal={}. This may cause stream hanging.", this.consumerGroupIds, retrySignal);
    }

    private void logStartStopSignal(boolean z) {
        if (z) {
            LOGGER.info("Start: Lag for consumerGroupsIds={} is at-or-below lowTide={}", this.consumerGroupIds, Long.valueOf(this.lowTide));
        } else {
            LOGGER.warn("Stop: Lag for consumerGroupsIds={} is above highTide={}", this.consumerGroupIds, Long.valueOf(this.highTide));
        }
    }
}
