package ai.jobbeacon.service;

import ai.jobbeacon.model.CompletionEvent;
import ai.jobbeacon.model.UploadEvent;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.Branched;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Produced;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.support.serializer.JsonSerde;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:BOOT-INF/classes/ai/jobbeacon/service/DocumentProcessorStream.class */
public class DocumentProcessorStream {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) DocumentProcessorStream.class);

    @Value("${jb.document.uploaded.topic:jb-document-uploaded}")
    private String JB_DOCUMENT_UPLOADED_TOPIC;

    @Value("${jb.document.process.success.topic:jb-document-process-success}")
    private String JB_DOCUMENT_PROCESS_SUCCESS_TOPIC;

    @Value("${jb.document.process.failure.topic:jb-document-process-failure}")
    private String JB_DOCUMENT_PROCESS_FAILURE_TOPIC;
    private final DocumentService documentService;
    private final EmbeddingService embeddingService;

    public DocumentProcessorStream(DocumentService documentService, EmbeddingService embeddingService) {
        this.documentService = documentService;
        this.embeddingService = embeddingService;
    }

    @Autowired
    public void buildPipeline(StreamsBuilder streamsBuilder) {
        streamsBuilder.stream(this.JB_DOCUMENT_UPLOADED_TOPIC, Consumed.with(Serdes.String(), new JsonSerde(UploadEvent.class))).map((str, uploadEvent) -> {
            logger.info("Received upload event for user: {} at location: {}", uploadEvent.getUsername(), uploadEvent.getLocation());
            return new KeyValue(str, processDocument(uploadEvent));
        }).split().branch((str2, completionEvent) -> {
            return completionEvent.getErrorDetails() == null || completionEvent.getErrorDetails().isEmpty();
        }, Branched.withConsumer(kStream -> {
            kStream.to(this.JB_DOCUMENT_PROCESS_SUCCESS_TOPIC, Produced.with(Serdes.String(), new JsonSerde(CompletionEvent.class)));
        })).branch((str3, completionEvent2) -> {
            return (completionEvent2.getErrorDetails() == null || completionEvent2.getErrorDetails().isEmpty()) ? false : true;
        }, Branched.withConsumer(kStream2 -> {
            kStream2.to(this.JB_DOCUMENT_PROCESS_FAILURE_TOPIC, Produced.with(Serdes.String(), new JsonSerde(CompletionEvent.class)));
        }));
        logger.info("Resume processing pipeline configured successfully");
    }

    private CompletionEvent processDocument(UploadEvent uploadEvent) {
        try {
            String extractTextContent = this.documentService.extractTextContent(uploadEvent.getLocation());
            if (extractTextContent == null || extractTextContent.trim().isEmpty()) {
                throw new RuntimeException("No text content extracted from resume");
            }
            String processAndStoreEmbeddings = this.embeddingService.processAndStoreEmbeddings(uploadEvent.getUsername(), extractTextContent);
            CompletionEvent completionEvent = new CompletionEvent(uploadEvent.getUsername(), uploadEvent.getLocation(), "Resume processed successfully");
            completionEvent.setEmbeddingId(processAndStoreEmbeddings);
            logger.info("Successfully processed resume for user: {} with embedding ID: {}", uploadEvent.getUsername(), processAndStoreEmbeddings);
            return completionEvent;
        } catch (Exception e) {
            logger.error("Error processing resume: {}", e.getMessage(), e);
            return new CompletionEvent(uploadEvent.getUsername(), uploadEvent.getLocation(), "Resume processing failed", e.getMessage());
        }
    }
}
