package ai.wanaku.routing.service;

import ai.wanaku.core.exchange.ParsedToolInvokeRequest;
import ai.wanaku.core.exchange.ToolInvokeRequest;
import ai.wanaku.core.services.routing.Client;
import jakarta.enterprise.context.ApplicationScoped;
import java.util.Map;
import org.apache.camel.CamelContext;
import org.apache.camel.ConsumerTemplate;
import org.apache.camel.ProducerTemplate;
import org.jboss.logging.Logger;

@ApplicationScoped
/* loaded from: input_file:ai/wanaku/routing/service/KafkaClient.class */
public class KafkaClient implements Client {
    private static final Logger LOG = Logger.getLogger(KafkaClient.class);
    private final ProducerTemplate producer;
    private final ConsumerTemplate consumer;

    public KafkaClient(CamelContext camelContext) {
        this.producer = camelContext.createProducerTemplate();
        this.consumer = camelContext.createConsumerTemplate();
    }

    public Object exchange(ToolInvokeRequest toolInvokeRequest) {
        Map serviceConfigurationsMap = toolInvokeRequest.getServiceConfigurationsMap();
        String str = (String) serviceConfigurationsMap.get("bootstrapHost");
        ParsedToolInvokeRequest parseRequest = ParsedToolInvokeRequest.parseRequest(toolInvokeRequest);
        String format = String.format("%s?brokers=%s", parseRequest.uri(), str);
        String format2 = String.format("kafka://%s?brokers=%s", (String) serviceConfigurationsMap.get("replyToTopic"), str);
        LOG.infof("Invoking tool at URI: %s", format);
        try {
            this.producer.start();
            this.consumer.start();
            this.producer.sendBody(format, parseRequest.body());
            LOG.infof("Waiting for reply at at URI: %s", format2);
            Object receiveBody = this.consumer.receiveBody(format2);
            this.producer.stop();
            this.consumer.stop();
            return receiveBody;
        } catch (Throwable th) {
            this.producer.stop();
            this.consumer.stop();
            throw th;
        }
    }
}
