package io.debezium.server.infinispan;

import io.debezium.DebeziumException;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.server.BaseChangeConsumer;
import io.debezium.server.CustomConsumerBuilder;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import jakarta.enterprise.context.Dependent;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.ConfigProvider;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.RemoteCacheManager;
import org.infinispan.client.hotrod.configuration.ConfigurationBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Named("infinispan")
@Dependent
/* loaded from: input_file:io/debezium/server/infinispan/InfinispanSinkConsumer.class */
public class InfinispanSinkConsumer extends BaseChangeConsumer implements DebeziumEngine.ChangeConsumer<ChangeEvent<Object, Object>> {
    private static final Logger LOGGER = LoggerFactory.getLogger(InfinispanSinkConsumer.class);
    private static final String CONF_PREFIX = "debezium.sink.infinispan.";
    private static final String SERVER_HOST = "debezium.sink.infinispan.server.host";
    private static final String SERVER_PORT = "debezium.sink.infinispan.server.port";
    private static final String CACHE_NAME = "debezium.sink.infinispan.cache";
    private static final String USER_NAME = "debezium.sink.infinispan.user";
    private static final String PASSWORD = "debezium.sink.infinispan.password";
    private RemoteCacheManager remoteCacheManager;
    private RemoteCache cache;

    @Inject
    @CustomConsumerBuilder
    Instance<RemoteCache> customCache;

    @PostConstruct
    void connect() {
        if (this.customCache.isResolvable()) {
            this.cache = (RemoteCache) this.customCache.get();
            LOGGER.info("Obtained custom cache with configuration '{}'", this.cache.getRemoteCacheContainer().getConfiguration());
            return;
        }
        Config config = ConfigProvider.getConfig();
        String str = (String) config.getValue(SERVER_HOST, String.class);
        String str2 = (String) config.getValue(CACHE_NAME, String.class);
        Integer num = (Integer) config.getOptionalValue(SERVER_PORT, Integer.class).orElse(11222);
        Optional optionalValue = config.getOptionalValue(USER_NAME, String.class);
        Optional optionalValue2 = config.getOptionalValue(PASSWORD, String.class);
        ConfigurationBuilder configurationBuilder = new ConfigurationBuilder();
        String format = (optionalValue.isPresent() && optionalValue2.isPresent()) ? String.format("hotrod://%s:%s@%s:%d", optionalValue.get(), optionalValue2.get(), str, num) : String.format("hotrod://%s:%d", str, num);
        LOGGER.info("Connecting to the Infinispan server using URI '{}'", format);
        configurationBuilder.uri(format);
        this.remoteCacheManager = new RemoteCacheManager(configurationBuilder.build());
        this.cache = this.remoteCacheManager.getCache(str2);
        LOGGER.info("Connected to the Infinispan server {}", this.remoteCacheManager.getServers()[0]);
    }

    @PreDestroy
    void close() {
        try {
            if (this.remoteCacheManager != null) {
                this.remoteCacheManager.close();
                LOGGER.info("Connection to Infinispan server closed.");
            }
        } catch (Exception e) {
            throw new DebeziumException(e);
        }
    }

    public void handleBatch(List<ChangeEvent<Object, Object>> list, DebeziumEngine.RecordCommitter<ChangeEvent<Object, Object>> recordCommitter) throws InterruptedException {
        HashMap hashMap = new HashMap(list.size());
        for (ChangeEvent<Object, Object> changeEvent : list) {
            if (changeEvent.value() != null) {
                LOGGER.trace("Received event {} = '{}'", getString(changeEvent.key()), getString(changeEvent.value()));
                hashMap.put(changeEvent.key(), changeEvent.value());
            }
        }
        try {
            this.cache.putAll(hashMap);
            Iterator<ChangeEvent<Object, Object>> it = list.iterator();
            while (it.hasNext()) {
                recordCommitter.markProcessed(it.next());
            }
            recordCommitter.markBatchFinished();
        } catch (Exception e) {
            throw new DebeziumException(e);
        }
    }
}
