package io.debezium.platform.environment.watcher;

import io.debezium.config.Configuration;
import io.debezium.connector.postgresql.PostgresConnector;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.embedded.Connect;
import io.debezium.embedded.EmbeddedEngineConfig;
import io.debezium.engine.DebeziumEngine;
import io.debezium.platform.config.OffsetConfigGroup;
import io.debezium.platform.environment.watcher.config.OutboxConfigGroup;
import io.debezium.platform.environment.watcher.config.WatcherConfig;
import io.debezium.platform.environment.watcher.consumers.OutboxParentEventConsumer;
import io.debezium.transforms.outbox.EventRouter;
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.Startup;
import jakarta.annotation.PostConstruct;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.jboss.logging.Logger;

@ApplicationScoped
@Startup
/* loaded from: input_file:io/debezium/platform/environment/watcher/ConductorEnvironmentWatcher.class */
public class ConductorEnvironmentWatcher {
    public static final String CONFIG_PORTION = "\\.config";
    public static final String OFFSET_STORAGE_PREFIX = "offset.storage.";
    public static final String OFFSET_PREFIX = "offset.";
    private final Logger logger;
    private final OutboxParentEventConsumer eventConsumer;
    private final WatcherConfig watcherConfig;
    private final ExecutorService executor = Executors.newSingleThreadExecutor();
    private DebeziumEngine<?> engine;

    public ConductorEnvironmentWatcher(Logger logger, WatcherConfig watcherConfig, OutboxParentEventConsumer outboxParentEventConsumer) {
        this.logger = logger;
        this.watcherConfig = watcherConfig;
        this.eventConsumer = outboxParentEventConsumer;
    }

    @PostConstruct
    public void start() {
        if (!this.watcherConfig.watcher().enabled()) {
            this.logger.info("Skipping watcher because it is not enabled");
            return;
        }
        WatcherConfig.ConnectionConfig connection = this.watcherConfig.connection();
        OffsetConfigGroup offset = this.watcherConfig.watcher().offset();
        OutboxConfigGroup outbox = this.watcherConfig.outbox();
        Configuration.Builder with = Configuration.create().with(EmbeddedEngineConfig.ENGINE_NAME, "conductor").with(EmbeddedEngineConfig.CONNECTOR_CLASS, PostgresConnector.class.getName()).with(PostgresConnectorConfig.TOPIC_PREFIX, "conductor").with(PostgresConnectorConfig.HOSTNAME, connection.host()).with(PostgresConnectorConfig.PORT, connection.port()).with(PostgresConnectorConfig.USER, connection.username()).with(PostgresConnectorConfig.PASSWORD, connection.password()).with(PostgresConnectorConfig.DATABASE_NAME, connection.database()).with(PostgresConnectorConfig.PLUGIN_NAME, PostgresConnectorConfig.LogicalDecoder.PGOUTPUT.getValue()).with(PostgresConnectorConfig.INCLUDE_SCHEMA_CHANGES, false).with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "public.%s".formatted(outbox.table())).with(PostgresConnectorConfig.PUBLICATION_AUTOCREATE_MODE, PostgresConnectorConfig.AutoCreateMode.FILTERED).with("transforms", "outbox").with("transforms.outbox.type", EventRouter.class.getName()).with("transforms.outbox.table.fields.additional.placement", (String) Stream.of((Object[]) new String[]{outbox.aggregateColumn(), outbox.aggregateIdColumn(), outbox.typeColumn()}).map(str -> {
            return str + ":envelope";
        }).collect(Collectors.joining(",")));
        Map<String, String> offsetConfigurations = offsetConfigurations(offset);
        Objects.requireNonNull(with);
        offsetConfigurations.forEach(with::with);
        Configuration build = with.build();
        this.logger.info("Creating Debezium engine");
        this.engine = DebeziumEngine.create(Connect.class).using(build.asProperties()).notifying(this.eventConsumer).build();
        this.logger.info("Attempting to start debezium engine");
        this.executor.execute(this.engine);
    }

    private Map<String, String> offsetConfigurations(OffsetConfigGroup offsetConfigGroup) {
        HashMap hashMap = new HashMap();
        hashMap.put(EmbeddedEngineConfig.OFFSET_STORAGE.name(), offsetConfigGroup.storage().type());
        offsetConfigGroup.storage().config().forEach((str, str2) -> {
            hashMap.put(buildKey(OFFSET_STORAGE_PREFIX, str), str2);
        });
        offsetConfigGroup.config().forEach((str3, str4) -> {
            hashMap.put(buildKey(OFFSET_PREFIX, str3), str4);
        });
        return hashMap;
    }

    private String buildKey(String str, String str2) {
        return str + str2.replaceAll(CONFIG_PORTION, "");
    }

    public void stop(@Observes ShutdownEvent shutdownEvent) {
        if (this.engine == null) {
            return;
        }
        try {
            this.logger.info("Attempting to stop Debezium");
            this.engine.close();
            this.executor.shutdown();
            this.executor.awaitTermination(10L, TimeUnit.SECONDS);
        } catch (Exception e) {
            this.logger.error("Exception while shutting down Debezium", e);
        }
    }
}
