package io.debezium.server.redis;

import io.debezium.DebeziumException;
import io.debezium.config.Configuration;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.server.BaseChangeConsumer;
import io.debezium.storage.redis.RedisClient;
import io.debezium.storage.redis.RedisClientConnectionException;
import io.debezium.storage.redis.RedisConnection;
import io.debezium.util.DelayStrategy;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import jakarta.enterprise.context.Dependent;
import jakarta.inject.Named;
import java.time.Duration;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.ConfigProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Named("redis")
@Dependent
/* loaded from: input_file:io/debezium/server/redis/RedisStreamChangeConsumer.class */
public class RedisStreamChangeConsumer extends BaseChangeConsumer implements DebeziumEngine.ChangeConsumer<ChangeEvent<Object, Object>> {
    private static final Logger LOGGER = LoggerFactory.getLogger(RedisStreamChangeConsumer.class);
    private static final String DEBEZIUM_REDIS_SINK_CLIENT_NAME = "debezium:redis:sink";
    private static final String HEARTBEAT_PREFIX_CONFIG = "topic.heartbeat.prefix";
    private static final String DEFAULT_HEARTBEAT_PREFIX = "__debezium-heartbeat";
    private static final String EXTENDED_MESSAGE_KEY_KEY = "key";
    private static final String EXTENDED_MESSAGE_VALUE_KEY = "value";
    private RedisClient client;
    private Function<ChangeEvent<Object, Object>, Map<String, String>> recordMapFunction;
    private RedisMemoryThreshold redisMemoryThreshold;
    private RedisStreamChangeConsumerConfig config;
    private String heartbeatPrefix;

    @PostConstruct
    void connect() {
        Config config = ConfigProvider.getConfig();
        Map configSubset = getConfigSubset(config, "debezium.source.");
        this.config = new RedisStreamChangeConsumerConfig(Configuration.from(getConfigSubset(config, "")));
        this.heartbeatPrefix = (String) configSubset.getOrDefault(HEARTBEAT_PREFIX_CONFIG, DEFAULT_HEARTBEAT_PREFIX);
        LOGGER.info("Using heartbeat prefix: {}", this.heartbeatPrefix);
        String messageFormat = this.config.getMessageFormat();
        if ("extended".equals(messageFormat)) {
            this.recordMapFunction = changeEvent -> {
                LinkedHashMap linkedHashMap = new LinkedHashMap();
                String string = changeEvent.key() != null ? getString(changeEvent.key()) : this.config.getNullKey();
                String string2 = changeEvent.value() != null ? getString(changeEvent.value()) : this.config.getNullValue();
                Map convertHeaders = convertHeaders(changeEvent);
                linkedHashMap.put(EXTENDED_MESSAGE_KEY_KEY, string);
                linkedHashMap.put(EXTENDED_MESSAGE_VALUE_KEY, string2);
                for (Map.Entry entry : convertHeaders.entrySet()) {
                    linkedHashMap.put(((String) entry.getKey()).toUpperCase(Locale.ROOT), (String) entry.getValue());
                }
                return linkedHashMap;
            };
        } else if ("compact".equals(messageFormat)) {
            this.recordMapFunction = changeEvent2 -> {
                return Map.of(changeEvent2.key() != null ? getString(changeEvent2.key()) : this.config.getNullKey(), changeEvent2.value() != null ? getString(changeEvent2.value()) : this.config.getNullValue());
            };
        }
        this.client = new RedisConnection(this.config.getAddress(), this.config.getDbIndex(), this.config.getUser(), this.config.getPassword(), this.config.getConnectionTimeout().intValue(), this.config.getSocketTimeout().intValue(), this.config.isSslEnabled()).getRedisClient(DEBEZIUM_REDIS_SINK_CLIENT_NAME, this.config.isWaitEnabled(), this.config.getWaitTimeout(), this.config.isWaitRetryEnabled(), this.config.getWaitRetryDelay());
        this.redisMemoryThreshold = new RedisMemoryThreshold(this.client, this.config);
    }

    @PreDestroy
    void close() {
        try {
            if (this.client != null) {
                this.client.close();
            }
        } catch (Exception e) {
            LOGGER.warn("Exception while closing Jedis: {}", this.client, e);
        } finally {
            this.client = null;
        }
    }

    private <T> Stream<List<T>> batches(List<T> list, int i) {
        if (list.isEmpty()) {
            return Stream.empty();
        }
        int size = list.size();
        int i2 = (size - 1) / i;
        return IntStream.range(0, i2 + 1).mapToObj(i3 -> {
            return list.subList(i3 * i, i3 == i2 ? size : (i3 + 1) * i);
        });
    }

    public void handleBatch(List<ChangeEvent<Object, Object>> list, DebeziumEngine.RecordCommitter<ChangeEvent<Object, Object>> recordCommitter) throws InterruptedException {
        DelayStrategy exponential = DelayStrategy.exponential(Duration.ofMillis(this.config.getInitialRetryDelay().intValue()), Duration.ofMillis(this.config.getMaxRetryDelay().intValue()));
        DelayStrategy constant = DelayStrategy.constant(Duration.ofMillis(this.config.getWaitRetryDelay()));
        LOGGER.debug("Handling a batch of {} records", Integer.valueOf(list.size()));
        batches(list, this.config.getBatchSize()).forEach(list2 -> {
            ArrayList arrayList;
            boolean z = false;
            List<ChangeEvent<Object, Object>> list2 = (List) list2.stream().collect(Collectors.toList());
            while (!z) {
                if (this.client == null) {
                    try {
                        connect();
                    } catch (Exception e) {
                        close();
                        LOGGER.error("Can't connect to Redis", e);
                    }
                } else {
                    try {
                        LOGGER.debug("Preparing a Redis Pipeline of {} records", Integer.valueOf(list2.size()));
                        arrayList = new ArrayList(list2.size());
                        for (ChangeEvent<Object, Object> changeEvent : list2) {
                            String map = this.streamNameMapper.map(changeEvent.destination());
                            if (this.config.isSkipHeartbeatMessages() && map.startsWith(this.heartbeatPrefix)) {
                                recordCommitter.markProcessed(changeEvent);
                            } else {
                                arrayList.add(new AbstractMap.SimpleEntry(map, this.recordMapFunction.apply(changeEvent)));
                            }
                        }
                    } catch (Exception e2) {
                        LOGGER.error("Unexpected Exception", e2);
                        throw new DebeziumException(e2);
                    } catch (RedisClientConnectionException e3) {
                        LOGGER.error("Connection error", e3);
                        close();
                    }
                    if (arrayList.size() != 0) {
                        if (this.redisMemoryThreshold.checkMemory(getObjectSize((AbstractMap.SimpleEntry) arrayList.get(0)), arrayList.size(), this.config.getBufferFillRate())) {
                            List xadd = this.client.xadd(arrayList);
                            ArrayList arrayList2 = new ArrayList();
                            int i = 0;
                            int i2 = 0;
                            Iterator it = xadd.iterator();
                            while (it.hasNext()) {
                                if (((String) it.next()).contains("OOM command not allowed when used memory > 'maxmemory'")) {
                                    i2++;
                                } else {
                                    ChangeEvent changeEvent2 = (ChangeEvent) list2.get(i);
                                    recordCommitter.markProcessed(changeEvent2);
                                    arrayList2.add(changeEvent2);
                                }
                                i++;
                            }
                            list2.removeAll(arrayList2);
                            if (i2 > 0) {
                                LOGGER.info("Redis sink currently full, will retry ({} command(s) will be retried)", Integer.valueOf(i2));
                            }
                            if (list2.size() == 0) {
                                z = true;
                            }
                            exponential.sleepWhen(!z);
                        } else {
                            LOGGER.info("Stopped consuming records!\n");
                            constant.sleepWhen(true);
                        }
                    }
                }
            }
        });
        recordCommitter.markBatchFinished();
    }

    private static long getObjectSize(AbstractMap.SimpleEntry<String, Map<String, String>> simpleEntry) {
        long length = 0 + simpleEntry.getKey().getBytes().length;
        for (Map.Entry<String, String> entry : simpleEntry.getValue().entrySet()) {
            length += entry.getKey().getBytes().length + entry.getValue().getBytes().length;
        }
        LOGGER.debug("Estimated record size is {}", Long.valueOf(length));
        return length;
    }
}
