package org.codelibs.fess.ingest.ndjson;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.Writer;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.codelibs.core.exception.IORuntimeException;
import org.codelibs.core.io.CloseableUtil;
import org.codelibs.core.lang.StringUtil;
import org.codelibs.core.lang.ThreadUtil;
import org.codelibs.core.stream.StreamUtil;
import org.codelibs.fess.crawler.Constants;
import org.codelibs.fess.crawler.entity.AccessResult;
import org.codelibs.fess.crawler.entity.AccessResultData;
import org.codelibs.fess.crawler.entity.ResponseData;
import org.codelibs.fess.crawler.entity.ResultData;
import org.codelibs.fess.crawler.transformer.Transformer;
import org.codelibs.fess.entity.DataStoreParams;
import org.codelibs.fess.ingest.Ingester;
import org.codelibs.fess.util.ComponentUtil;

/* loaded from: input_file:org/codelibs/fess/ingest/ndjson/NdjsonIngester.class */
public class NdjsonIngester extends Ingester {
    private static final Logger logger = LogManager.getLogger(NdjsonIngester.class);
    private static final String NONE = "none";
    protected String outputDir;
    protected String filePrefix;
    protected ObjectMapper objectMapper = new ObjectMapper();
    protected BlockingQueue<Map<String, Object>> queue = null;
    protected Set<String> filterKeySet = null;
    protected Thread writerThread = null;
    protected boolean running;
    protected Writer currentWriter;

    @PostConstruct
    public void init() {
        this.outputDir = ComponentUtil.getFessConfig().getSystemProperty("ingest.ndjson.path");
        if (StringUtil.isBlank(this.outputDir)) {
            this.outputDir = null;
            return;
        }
        this.filePrefix = ComponentUtil.getFessConfig().getSystemProperty("ingest.ndjson.prefix", "fess-");
        if (logger.isInfoEnabled()) {
            logger.info("Output Path: {}/{}*.ndjson", this.outputDir, this.filePrefix);
        }
        int parseInt = Integer.parseInt(ComponentUtil.getFessConfig().getSystemProperty("ingest.ndjson.max.lines", "10000"));
        String systemProperty = ComponentUtil.getFessConfig().getSystemProperty("ingest.ndjson.capacity");
        this.queue = new LinkedBlockingQueue(StringUtil.isBlank(systemProperty) ? Integer.MAX_VALUE : Integer.parseInt(systemProperty));
        String systemProperty2 = ComponentUtil.getFessConfig().getSystemProperty("ingest.ndjson.filter.keys");
        if (StringUtil.isBlank(systemProperty2)) {
            this.filterKeySet = null;
        } else {
            this.filterKeySet = (Set) StreamUtil.split(systemProperty2, ",").get(stream -> {
                return (Set) stream.map((v0) -> {
                    return v0.trim();
                }).filter(StringUtil::isNotEmpty).collect(Collectors.toSet());
            });
        }
        this.running = true;
        this.writerThread = new Thread(() -> {
            int i;
            Map<String, Object> map;
            try {
                i = 0;
                newWriter();
                map = null;
            } catch (InterruptedException e) {
                logger.warn("interrupted.", e);
                return;
            }
            while (true) {
                if (!this.running && map == null) {
                    break;
                }
                map = this.queue.poll(10L, TimeUnit.SECONDS);
                if (logger.isDebugEnabled()) {
                    logger.debug("processing {}", map);
                }
                if (map != null) {
                    if (i >= parseInt) {
                        newWriter();
                        i = 0;
                    }
                    try {
                        this.currentWriter.write(this.objectMapper.writeValueAsString(map));
                        this.currentWriter.write("\n");
                        i++;
                    } catch (Exception e2) {
                        logger.warn("Failed to write {}", map, e2);
                    }
                }
                logger.warn("interrupted.", e);
                return;
            }
            if (logger.isDebugEnabled()) {
                logger.debug("finishing writer.");
            }
        }, "ingest-ndjson");
        this.writerThread.start();
    }

    protected void newWriter() {
        if (this.currentWriter != null) {
            try {
                this.currentWriter.flush();
                this.currentWriter.close();
            } catch (IOException e) {
                logger.warn("Failed to close writer.", e);
            }
        }
        try {
            this.currentWriter = new BufferedWriter(new FileWriter(new File(this.outputDir, this.filePrefix + ComponentUtil.getSystemHelper().getCurrentTimeAsLong() + ".ndjson"), Constants.UTF_8_CHARSET));
        } catch (IOException e2) {
            throw new IORuntimeException(e2);
        }
    }

    @PreDestroy
    public void destroy() {
        this.running = false;
        if (this.outputDir != null) {
            try {
                this.writerThread.join();
            } catch (InterruptedException e) {
                logger.warn("interrupted.", e);
            }
            try {
                this.currentWriter.flush();
            } catch (IOException e2) {
                logger.warn("Failed to flush the writer.", e2);
            } finally {
                CloseableUtil.closeQuietly(this.currentWriter);
            }
        }
    }

    public Map<String, Object> process(Map<String, Object> map, DataStoreParams dataStoreParams) {
        if (this.outputDir == null) {
            return map;
        }
        offer(map);
        return map;
    }

    private void offer(Map<String, Object> map) {
        if (map == null) {
            return;
        }
        Map<String, Object> map2 = this.filterKeySet == null ? map : (Map) map.entrySet().stream().filter(entry -> {
            return this.filterKeySet.contains(entry.getKey());
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }));
        if (logger.isDebugEnabled()) {
            logger.debug("filtered object: {}", map2);
        }
        while (!this.queue.offer(map2)) {
            logger.warn("Failed to add {}. retrying...", map);
            ThreadUtil.sleep(100L);
        }
    }

    public ResultData process(ResultData resultData, ResponseData responseData) {
        try {
            AccessResult accessResult = (AccessResult) ComponentUtil.getComponent("accessResult");
            accessResult.init(responseData, resultData);
            AccessResultData accessResultData = accessResult.getAccessResultData();
            String transformerName = accessResultData.getTransformerName();
            if (StringUtil.isNotBlank(transformerName) && !NONE.equalsIgnoreCase(transformerName)) {
                offer((Map) ((Transformer) ComponentUtil.getComponent(transformerName)).getData(accessResultData));
            }
        } catch (Exception e) {
            logger.warn("Failed to add {}.", resultData, e);
        }
        return resultData;
    }
}
