package org.qubership.integration.platform.engine.service.deployment.processing.actions.context.before;

import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import org.apache.camel.spring.SpringCamelContext;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.junit.jupiter.api.IndicativeSentencesGeneration;
import org.qubership.integration.platform.engine.configuration.PredeployCheckKafkaConfiguration;
import org.qubership.integration.platform.engine.errorhandling.DeploymentRetriableException;
import org.qubership.integration.platform.engine.model.ChainElementType;
import org.qubership.integration.platform.engine.model.ElementOptions;
import org.qubership.integration.platform.engine.model.constants.CamelConstants;
import org.qubership.integration.platform.engine.model.deployment.update.DeploymentInfo;
import org.qubership.integration.platform.engine.model.deployment.update.ElementProperties;
import org.qubership.integration.platform.engine.service.VariablesService;
import org.qubership.integration.platform.engine.service.deployment.processing.ElementProcessingAction;
import org.qubership.integration.platform.engine.service.deployment.processing.qualifiers.OnBeforeDeploymentContextCreated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;

@OnBeforeDeploymentContextCreated
@ConditionalOnProperty(name = {"qip.camel.component.kafka.predeploy-check-enabled"}, havingValue = "true", matchIfMissing = true)
@Component
/* loaded from: input_file:BOOT-INF/classes/org/qubership/integration/platform/engine/service/deployment/processing/actions/context/before/KafkaTopicAndConnectionCheckAction.class */
public class KafkaTopicAndConnectionCheckAction extends ElementProcessingAction {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) KafkaTopicAndConnectionCheckAction.class);
    private final VariablesService variablesService;
    private final PredeployCheckKafkaConfiguration predeployCheckKafkaConfiguration;

    @Autowired
    public KafkaTopicAndConnectionCheckAction(VariablesService variablesService, PredeployCheckKafkaConfiguration predeployCheckKafkaConfiguration) {
        this.variablesService = variablesService;
        this.predeployCheckKafkaConfiguration = predeployCheckKafkaConfiguration;
    }

    @Override // org.qubership.integration.platform.engine.service.deployment.processing.ElementProcessingAction
    public boolean applicableTo(ElementProperties elementProperties) {
        return ChainElementType.isKafkaAsyncElement(ChainElementType.fromString(elementProperties.getProperties().get(CamelConstants.ChainProperties.ELEMENT_TYPE)));
    }

    @Override // org.qubership.integration.platform.engine.service.deployment.processing.ElementProcessingAction
    public void apply(SpringCamelContext springCamelContext, ElementProperties elementProperties, DeploymentInfo deploymentInfo) {
        Map<String, String> properties = elementProperties.getProperties();
        try {
            try {
                String prop = getProp(properties, ElementOptions.BROKERS);
                String prop2 = getProp(properties, ElementOptions.SECURITY_PROTOCOL);
                String prop3 = getProp(properties, ElementOptions.SASL_MECHANISM);
                String prop4 = getProp(properties, ElementOptions.SASL_JAAS_CONFIG);
                String prop5 = getProp(properties, ElementOptions.TOPICS);
                if (prop == null) {
                    log.debug("Element with id {} not contains kafka connection params, skipping", elementProperties.getElementId());
                    return;
                }
                AdminClient create = AdminClient.create(this.predeployCheckKafkaConfiguration.createValidationKafkaAdminConfig(prop, prop2, prop3, prop4));
                try {
                    Set<String> set = create.listTopics().names().get();
                    HashSet hashSet = new HashSet(List.of((Object[]) prop5.split(",")));
                    if (hashSet.isEmpty()) {
                        throw new KafkaException("Topic property can't be empty");
                    }
                    hashSet.removeAll(set);
                    if (create != null) {
                        create.close();
                    }
                    if (!hashSet.isEmpty()) {
                        throw new DeploymentRetriableException("Kafka topics (" + String.join(IndicativeSentencesGeneration.DEFAULT_SEPARATOR, hashSet) + ") not found, check if this topics exists in kafka");
                    }
                } catch (Throwable th) {
                    if (create != null) {
                        try {
                            create.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } catch (ExecutionException | KafkaException e) {
                if ((e instanceof AuthorizationException) || (e.getCause() instanceof AuthorizationException)) {
                    log.warn("Kafka predeploy check is failed with AuthorizationException. Exception not thrown", (Throwable) e);
                } else {
                    log.warn("Kafka predeploy check is failed. Connection configuration is invalid, topics not found or broker is unavailable", (Throwable) e);
                    throw new DeploymentRetriableException("Kafka predeploy check is failed. Connection configuration is invalid, topics not found or broker is unavailable", e);
                }
            }
        } catch (DeploymentRetriableException e2) {
            log.warn("Kafka predeploy check is failed with retriable exception", (Throwable) e2);
            throw e2;
        } catch (Exception e3) {
            log.warn("Failed to check kafka topic(s) or connection for deployment: {}, element: {}", deploymentInfo.getDeploymentId(), elementProperties.getElementId(), e3);
        }
    }

    private String getProp(Map<String, String> map, String str) {
        return this.variablesService.injectVariables(map.get(str));
    }
}
