package io.debezium.server;

import io.debezium.DebeziumException;
import io.debezium.embedded.ClientProvided;
import io.debezium.embedded.Connect;
import io.debezium.embedded.async.ConvertingAsyncEngineBuilderFactory;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.format.Avro;
import io.debezium.engine.format.Binary;
import io.debezium.engine.format.CloudEvents;
import io.debezium.engine.format.Json;
import io.debezium.engine.format.JsonByteArray;
import io.debezium.engine.format.Protobuf;
import io.debezium.engine.format.SimpleString;
import io.debezium.server.events.ConnectorCompletedEvent;
import io.quarkus.runtime.Quarkus;
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.Startup;
import jakarta.annotation.PostConstruct;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.context.spi.CreationalContext;
import jakarta.enterprise.event.Observes;
import jakarta.enterprise.inject.spi.Bean;
import jakarta.enterprise.inject.spi.BeanManager;
import jakarta.inject.Inject;
import java.nio.file.Paths;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.ConfigProvider;
import org.eclipse.microprofile.health.Liveness;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ApplicationScoped
@Startup
/* loaded from: input_file:io/debezium/server/DebeziumServer.class */
public class DebeziumServer {
    private static final String PROP_PREFIX = "debezium.";
    private static final String PROP_SOURCE_PREFIX = "debezium.source.";
    private static final String PROP_SINK_PREFIX = "debezium.sink.";
    private static final String PROP_FORMAT_PREFIX = "debezium.format.";
    private static final String PROP_PREDICATES_PREFIX = "debezium.predicates.";
    private static final String PROP_TRANSFORMS_PREFIX = "debezium.transforms.";
    private static final String PROP_HEADER_FORMAT_PREFIX = "debezium.format.header.";
    private static final String PROP_KEY_FORMAT_PREFIX = "debezium.format.key.";
    private static final String PROP_VALUE_FORMAT_PREFIX = "debezium.format.value.";
    private static final String PROP_OFFSET_STORAGE_PREFIX = "offset.storage.";
    private static final String PROP_PREDICATES = "debezium.predicates";
    private static final String PROP_TRANSFORMS = "debezium.transforms";
    private static final String PROP_SINK_TYPE = "debezium.sink.type";
    private static final String PROP_HEADER_FORMAT = "debezium.format.header";
    private static final String PROP_KEY_FORMAT = "debezium.format.key";
    private static final String PROP_VALUE_FORMAT = "debezium.format.value";
    private static final String PROP_TERMINATION_WAIT = "debezium.termination.wait";
    private static final String PROP_ENGINE_FACTORY = "debezium.engine.factory";

    @Inject
    BeanManager beanManager;

    @Inject
    @Liveness
    ConnectorLifecycle health;
    private Bean<DebeziumEngine.ChangeConsumer<ChangeEvent<Object, Object>>> consumerBean;
    private CreationalContext<DebeziumEngine.ChangeConsumer<ChangeEvent<Object, Object>>> consumerBeanCreationalContext;
    private DebeziumEngine.ChangeConsumer<ChangeEvent<Object, Object>> consumer;
    private DebeziumEngine<?> engine;
    private static final Logger LOGGER = LoggerFactory.getLogger(DebeziumServer.class);
    private static final String FORMAT_JSON = Json.class.getSimpleName().toLowerCase();
    private static final String FORMAT_JSON_BYTE_ARRAY = JsonByteArray.class.getSimpleName().toLowerCase();
    private static final String FORMAT_CLOUDEVENT = CloudEvents.class.getSimpleName().toLowerCase();
    private static final String FORMAT_AVRO = Avro.class.getSimpleName().toLowerCase();
    private static final String FORMAT_PROTOBUF = Protobuf.class.getSimpleName().toLowerCase();
    private static final String FORMAT_BINARY = Binary.class.getSimpleName().toLowerCase();
    private static final String FORMAT_STRING = SimpleString.class.getSimpleName().toLowerCase();
    private static final String FORMAT_CONNECT = Connect.class.getSimpleName().toLowerCase();
    private static final String FORMAT_CLIENT_PROVIDED = ClientProvided.class.getSimpleName().toLowerCase();
    private static final Pattern SHELL_PROPERTY_NAME_PATTERN = Pattern.compile("^[a-zA-Z0-9_]+_+[a-zA-Z0-9_]+$");
    private ExecutorService executor = Executors.newSingleThreadExecutor();
    private int returnCode = 0;
    private final Properties props = new Properties();

    @PostConstruct
    public void start() {
        Config loadConfigOrDie = loadConfigOrDie();
        String str = (String) loadConfigOrDie.getValue(PROP_SINK_TYPE, String.class);
        Set set = (Set) this.beanManager.getBeans(str).stream().filter(bean -> {
            return DebeziumEngine.ChangeConsumer.class.isAssignableFrom(bean.getBeanClass());
        }).collect(Collectors.toSet());
        LOGGER.debug("Found {} candidate consumer(s)", Integer.valueOf(set.size()));
        if (set.size() == 0) {
            throw new DebeziumException("No Debezium consumer named '" + str + "' is available");
        }
        if (set.size() > 1) {
            throw new DebeziumException("Multiple Debezium consumers named '" + str + "' were found");
        }
        this.consumerBean = (Bean) set.iterator().next();
        this.consumerBeanCreationalContext = this.beanManager.createCreationalContext(this.consumerBean);
        this.consumer = (DebeziumEngine.ChangeConsumer) this.consumerBean.create(this.consumerBeanCreationalContext);
        LOGGER.info("Consumer '{}' instantiated", this.consumer.getClass().getName());
        Class<?> format = getFormat(loadConfigOrDie, PROP_KEY_FORMAT);
        Class<?> format2 = getFormat(loadConfigOrDie, PROP_VALUE_FORMAT);
        Class<?> headerFormat = getHeaderFormat(loadConfigOrDie);
        configToProperties(loadConfigOrDie, this.props, PROP_SOURCE_PREFIX, "", true);
        configToProperties(loadConfigOrDie, this.props, PROP_FORMAT_PREFIX, "key.converter.", true);
        configToProperties(loadConfigOrDie, this.props, PROP_FORMAT_PREFIX, "value.converter.", true);
        configToProperties(loadConfigOrDie, this.props, PROP_FORMAT_PREFIX, "header.converter.", true);
        configToProperties(loadConfigOrDie, this.props, PROP_KEY_FORMAT_PREFIX, "key.converter.", true);
        configToProperties(loadConfigOrDie, this.props, PROP_VALUE_FORMAT_PREFIX, "value.converter.", true);
        configToProperties(loadConfigOrDie, this.props, PROP_HEADER_FORMAT_PREFIX, "header.converter.", true);
        configToProperties(loadConfigOrDie, this.props, "debezium.sink." + str + ".", "schema.history.internal." + str + ".", false);
        configToProperties(loadConfigOrDie, this.props, "debezium.sink." + str + ".", "offset.storage." + str + ".", false);
        Optional optionalValue = loadConfigOrDie.getOptionalValue(PROP_TRANSFORMS, String.class);
        if (optionalValue.isPresent()) {
            this.props.setProperty("transforms", (String) optionalValue.get());
            configToProperties(loadConfigOrDie, this.props, PROP_TRANSFORMS_PREFIX, "transforms.", true);
        }
        Optional optionalValue2 = loadConfigOrDie.getOptionalValue(PROP_PREDICATES, String.class);
        if (optionalValue2.isPresent()) {
            this.props.setProperty("predicates", (String) optionalValue2.get());
            configToProperties(loadConfigOrDie, this.props, PROP_PREDICATES_PREFIX, "predicates.", true);
        }
        this.props.setProperty("name", str);
        LOGGER.debug("Configuration for DebeziumEngine: {}", this.props);
        this.engine = DebeziumEngine.create(format, format2, headerFormat, (String) loadConfigOrDie.getOptionalValue(PROP_ENGINE_FACTORY, String.class).orElse(ConvertingAsyncEngineBuilderFactory.class.getName())).using(this.props).using(this.health).using(this.health).notifying(this.consumer).build();
        this.executor.execute(() -> {
            try {
                this.engine.run();
            } finally {
                Quarkus.asyncExit(this.returnCode);
            }
        });
        LOGGER.info("Engine executor started");
    }

    private void configToProperties(Config config, Properties properties, String str, String str2, boolean z) {
        for (String str3 : config.getPropertyNames()) {
            String lowerCase = SHELL_PROPERTY_NAME_PATTERN.matcher(str3).matches() ? str3.replace("_", ".").toLowerCase() : null;
            if (lowerCase != null && lowerCase.startsWith(str)) {
                String str4 = str2 + lowerCase.substring(str.length());
                if (z || !properties.containsKey(str4)) {
                    properties.setProperty(str4, (String) config.getOptionalValue(str3, String.class).orElse(""));
                }
            } else if (str3.startsWith(str)) {
                String str5 = str2 + str3.substring(str.length());
                if (z || !properties.containsKey(str5)) {
                    properties.setProperty(str5, config.getConfigValue(str3).getValue());
                }
            }
        }
    }

    private Class<?> getFormat(Config config, String str) {
        String str2 = (String) config.getOptionalValue(str, String.class).orElse(FORMAT_JSON);
        if (FORMAT_JSON.equals(str2)) {
            return Json.class;
        }
        if (FORMAT_JSON_BYTE_ARRAY.equals(str2)) {
            return JsonByteArray.class;
        }
        if (FORMAT_CLOUDEVENT.equals(str2)) {
            return CloudEvents.class;
        }
        if (FORMAT_AVRO.equals(str2)) {
            return Avro.class;
        }
        if (FORMAT_PROTOBUF.equals(str2)) {
            return Protobuf.class;
        }
        if (FORMAT_BINARY.equals(str2)) {
            return Binary.class;
        }
        if (FORMAT_STRING.equals(str2)) {
            return SimpleString.class;
        }
        if (FORMAT_CONNECT.equalsIgnoreCase(str2)) {
            return Connect.class;
        }
        if (FORMAT_CLIENT_PROVIDED.equals(str2)) {
            return ClientProvided.class;
        }
        throw new DebeziumException("Unknown format '" + str2 + "' for option '" + str + "'");
    }

    private Class<?> getHeaderFormat(Config config) {
        String str = (String) config.getOptionalValue(PROP_HEADER_FORMAT, String.class).orElse(FORMAT_JSON);
        if (FORMAT_JSON.equals(str)) {
            return Json.class;
        }
        if (FORMAT_JSON_BYTE_ARRAY.equals(str)) {
            return JsonByteArray.class;
        }
        if (FORMAT_CONNECT.equals(str)) {
            return Connect.class;
        }
        if (FORMAT_CLIENT_PROVIDED.equals(str)) {
            return ClientProvided.class;
        }
        throw new DebeziumException("Unknown format '" + str + "' for option 'debezium.format.header'");
    }

    public void stop(@Observes ShutdownEvent shutdownEvent) {
        try {
            LOGGER.info("Received request to stop the engine");
            Config config = ConfigProvider.getConfig();
            try {
                this.engine.close();
            } catch (IllegalStateException e) {
                LOGGER.info("Cannot shut down engine now: ", e.getMessage());
            }
            this.executor.shutdown();
            this.executor.awaitTermination(((Integer) config.getOptionalValue(PROP_TERMINATION_WAIT, Integer.class).orElse(10)).intValue(), TimeUnit.SECONDS);
        } catch (Exception e2) {
            LOGGER.error("Exception while shutting down Debezium", e2);
        }
        this.consumerBean.destroy(this.consumer, this.consumerBeanCreationalContext);
    }

    void connectorCompleted(@Observes ConnectorCompletedEvent connectorCompletedEvent) {
        if (connectorCompletedEvent.isSuccess()) {
            return;
        }
        this.returnCode = 1;
    }

    private Config loadConfigOrDie() {
        Config config = ConfigProvider.getConfig();
        try {
            config.getValue(PROP_SINK_TYPE, String.class);
        } catch (NoSuchElementException e) {
            String format = String.format("Failed to load mandatory config value '%s'. Please check you have a correct Debezium server config in %s or required properties are defined via system or environment variables.", PROP_SINK_TYPE, Paths.get(System.getProperty("user.dir"), "config", "application.properties").toString());
            System.err.println(format);
            LOGGER.error(format);
            Quarkus.asyncExit();
        }
        return config;
    }

    DebeziumEngine.ChangeConsumer<?> getConsumer() {
        return this.consumer;
    }

    public Properties getProps() {
        return this.props;
    }

    public DebeziumEngine.Signaler getSignaler() {
        return this.engine.getSignaler();
    }
}
