package org.qubership.integration.platform.engine.service.debugger.sessions;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.annotation.Nullable;
import org.apache.commons.lang3.tuple.Pair;
import org.opensearch.client.opensearch.core.BulkRequest;
import org.opensearch.client.opensearch.core.BulkResponse;
import org.opensearch.client.opensearch.core.bulk.BulkOperation;
import org.opensearch.client.opensearch.core.bulk.BulkResponseItem;
import org.opensearch.client.opensearch.core.bulk.IndexOperation;
import org.qubership.integration.platform.engine.model.Session;
import org.qubership.integration.platform.engine.model.opensearch.QueueElement;
import org.qubership.integration.platform.engine.model.opensearch.SessionElementElastic;
import org.qubership.integration.platform.engine.opensearch.OpenSearchClientSupplier;
import org.qubership.integration.platform.engine.service.ExecutionStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

@Component
/* loaded from: input_file:org/qubership/integration/platform/engine/service/debugger/sessions/OpenSearchWriter.class */
public class OpenSearchWriter implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(OpenSearchWriter.class);
    private final int queueMaxSizeBytes;
    private final int bulkRequestMaxSizeBytes;
    private final int bulkRequestPayloadSizeThresholdBytes;
    private final int bulkRequestElementsCountThreshold;
    private final OpenSearchClientSupplier openSearchClientSupplier;
    private final ObjectMapper mapper;
    private final BlockingQueue<QueueElement> sessionElementsQueue;
    private final AtomicLong queueTotalPayloadSize = new AtomicLong(0);
    private final ConcurrentMap<String, Pair<ReadWriteLock, Session>> sessionsCache = new ConcurrentHashMap();
    private final ConcurrentMap<String, ConcurrentMap<String, SessionElementElastic>> sessionElementsCache = new ConcurrentHashMap();
    private final ConcurrentMap<String, SessionElementElastic> singleElementCache = new ConcurrentHashMap();
    private long currentWriteTimeout = 0;

    @Value("${qip.opensearch.write.batch.count}")
    private int queueDrainThreshold;

    @Value("${qip.opensearch.write.retry.timeout.minimum}")
    private long writeTimeoutDefaultMin;

    @Value("${qip.opensearch.write.retry.timeout.maximum}")
    private long writeTimeoutDefaultMax;

    @Value("${qip.opensearch.index.elements.name}-session-elements")
    private String indexName;
    private static final int EXCEPTION_COOLDOWN_DELAY = 10000;
    private static final int WRITE_TIMEOUT_MULTIPLIER = 2;
    private static final int ERROR_MESSAGE_COUNT_THRESHOLD = 3;
    private static final int RETRY_COUNT_ON_WRITE_ERROR = 5;
    private static final double REPEATED_ELEMENTS_RATIO = 2.2d;

    @Autowired
    public OpenSearchWriter(@Value("${qip.sessions.queue.capacity}") int i, @Value("${qip.sessions.queue.max-size-mb}") int i2, @Value("${qip.sessions.bulk-request.max-size-kb}") int i3, @Value("${qip.sessions.bulk-request.payload-size-threshold-kb}") int i4, @Value("${qip.sessions.bulk-request.elements-count-threshold}") int i5, OpenSearchClientSupplier openSearchClientSupplier, @Qualifier("jsonMapper") ObjectMapper objectMapper) {
        this.sessionElementsQueue = new LinkedBlockingQueue(i);
        this.queueMaxSizeBytes = (int) (i2 * 1024 * 1024 * REPEATED_ELEMENTS_RATIO);
        this.bulkRequestMaxSizeBytes = i3 * 1024;
        this.bulkRequestPayloadSizeThresholdBytes = i4 * 1024;
        this.bulkRequestElementsCountThreshold = i5;
        this.openSearchClientSupplier = openSearchClientSupplier;
        this.mapper = objectMapper;
        new Thread(this).start();
    }

    @Override // java.lang.Runnable
    public void run() {
        ArrayList arrayList = new ArrayList(this.queueDrainThreshold);
        resetWriteTimeout();
        while (true) {
            try {
                try {
                    arrayList.add(this.sessionElementsQueue.take());
                    this.sessionElementsQueue.drainTo(arrayList, this.queueDrainThreshold - 1);
                    arrayList.forEach(queueElement -> {
                        this.queueTotalPayloadSize.addAndGet(-queueElement.getCalculatedPayloadSize());
                    });
                    LinkedHashSet<QueueElement> linkedHashSet = new LinkedHashSet<>(arrayList);
                    if (!CollectionUtils.isEmpty(linkedHashSet)) {
                        saveElements(linkedHashSet);
                    }
                    arrayList.clear();
                } catch (InterruptedException e) {
                }
            } catch (Exception e2) {
                log.error("Failed to commit sessions to opensearch", e2);
                try {
                    Thread.sleep(10000L);
                } catch (InterruptedException e3) {
                    throw new RuntimeException(e3);
                }
            }
        }
    }

    private void saveElements(LinkedHashSet<QueueElement> linkedHashSet) {
        int i = 0;
        int i2 = 0;
        boolean z = false;
        ArrayList arrayList = new ArrayList();
        Iterator<QueueElement> it = linkedHashSet.iterator();
        loop0: while (it.hasNext()) {
            SessionElementElastic element = it.next().getElement();
            try {
                int length = this.mapper.writeValueAsBytes(element).length;
                BulkOperation bulkOperation = (BulkOperation) new BulkOperation.Builder().index(IndexOperation.of(builder -> {
                    return builder.index(this.openSearchClientSupplier.normalize(this.indexName)).id(element.getId()).requireAlias(true).document(element);
                })).build();
                do {
                    try {
                        if (length >= this.bulkRequestPayloadSizeThresholdBytes || linkedHashSet.size() <= this.bulkRequestElementsCountThreshold) {
                            waitBeforeRequest();
                            executeBulk(new ArrayList(List.of(bulkOperation)));
                        } else if (i == 0) {
                            arrayList.add(bulkOperation);
                            i2 += length;
                        }
                        z = i2 >= this.bulkRequestMaxSizeBytes || !(it.hasNext() || arrayList.isEmpty());
                        if (z) {
                            waitBeforeRequest();
                            if (executeBulk(arrayList)) {
                                throw new RuntimeException();
                                break loop0;
                            } else {
                                i2 = 0;
                                z = false;
                            }
                        }
                    } catch (Exception e) {
                        log.error("While sessions writing an error has occurred", e);
                        increaseWriteTimeout();
                        if (i < RETRY_COUNT_ON_WRITE_ERROR) {
                            i++;
                        } else if (z) {
                            i2 = 0;
                            arrayList.clear();
                            z = false;
                        }
                    }
                    if (i < RETRY_COUNT_ON_WRITE_ERROR) {
                        resetWriteTimeout();
                    }
                    i = 0;
                } while (i > 0);
            } catch (JsonProcessingException e2) {
                log.error("Failed to parse sessions write request. Element skipped");
                resetWriteTimeout();
            }
        }
    }

    private boolean executeBulk(List<BulkOperation> list) throws IOException {
        BulkResponse bulk = this.openSearchClientSupplier.getClient().bulk(new BulkRequest.Builder().index(this.openSearchClientSupplier.normalize(this.indexName)).requireAlias(true).operations(list).build());
        list.clear();
        return checkAndLogFailedElements(bulk);
    }

    private boolean checkAndLogFailedElements(BulkResponse bulkResponse) {
        int i = 0;
        String lineSeparator = System.lineSeparator();
        StringBuilder sb = new StringBuilder(lineSeparator);
        for (BulkResponseItem bulkResponseItem : bulkResponse.items()) {
            if (bulkResponseItem.error() != null) {
                if (i < ERROR_MESSAGE_COUNT_THRESHOLD) {
                    sb.append(bulkResponseItem.error().reason());
                    sb.append(lineSeparator);
                }
                i++;
            }
        }
        if (i > 0) {
            sb.insert(0, "Some sessions elements can't be saved to opensearch:");
            if (i > ERROR_MESSAGE_COUNT_THRESHOLD) {
                sb.append("...and {} more");
                log.error(sb.toString(), Integer.valueOf(i - ERROR_MESSAGE_COUNT_THRESHOLD));
            } else {
                log.error(sb.toString());
            }
        }
        return i > 0;
    }

    private void resetWriteTimeout() {
        this.currentWriteTimeout = this.writeTimeoutDefaultMin;
        log.trace("OpenSearch write timeout has been reset to {}", Long.valueOf(this.currentWriteTimeout));
    }

    private void increaseWriteTimeout() {
        if (this.currentWriteTimeout == this.writeTimeoutDefaultMax) {
            return;
        }
        this.currentWriteTimeout = Math.max(this.writeTimeoutDefaultMin, this.currentWriteTimeout);
        this.currentWriteTimeout *= 2;
        this.currentWriteTimeout = Math.min(this.writeTimeoutDefaultMax, this.currentWriteTimeout);
        log.info("OpenSearch write timeout has been increased to {}", Long.valueOf(this.currentWriteTimeout));
    }

    private void waitBeforeRequest() {
        try {
            Thread.sleep(this.currentWriteTimeout);
        } catch (InterruptedException e) {
        }
    }

    public void scheduleElementToLog(SessionElementElastic sessionElementElastic) {
        scheduleElementToLog(sessionElementElastic, false);
    }

    private void scheduleElementToLog(SessionElementElastic sessionElementElastic, boolean z) {
        long calculatePayloadSizeInBytes = calculatePayloadSizeInBytes(sessionElementElastic);
        if (this.queueTotalPayloadSize.get() >= this.queueMaxSizeBytes || !this.sessionElementsQueue.offer(QueueElement.builder().element(sessionElementElastic).calculatedPayloadSize(calculatePayloadSizeInBytes).build())) {
            log.error("Queue of opensearch elements is full, element is not added");
        } else {
            this.queueTotalPayloadSize.addAndGet(calculatePayloadSizeInBytes);
        }
        if (z) {
            putSessionElementToCache(sessionElementElastic);
        }
    }

    public void scheduleElementToLogAndCache(SessionElementElastic sessionElementElastic) {
        Pair<ReadWriteLock, Session> pair = this.sessionsCache.get(sessionElementElastic.getSessionId());
        if (pair == null) {
            sessionElementElastic.setExecutionStatus(ExecutionStatus.CANCELLED_OR_UNKNOWN);
            scheduleElementToLog(sessionElementElastic, false);
            return;
        }
        ((ReadWriteLock) pair.getLeft()).readLock().lock();
        try {
            if (this.sessionsCache.containsKey(sessionElementElastic.getSessionId())) {
                scheduleElementToLog(sessionElementElastic, true);
            } else {
                sessionElementElastic.setExecutionStatus(ExecutionStatus.CANCELLED_OR_UNKNOWN);
                scheduleElementToLog(sessionElementElastic, false);
            }
        } finally {
            ((ReadWriteLock) pair.getLeft()).readLock().unlock();
        }
    }

    public void putSessionToCache(Session session) {
        this.sessionsCache.put(session.getId(), Pair.of(new ReentrantReadWriteLock(), session));
    }

    @Nullable
    public Pair<ReadWriteLock, Session> getSessionFromCache(String str) {
        Pair<ReadWriteLock, Session> pair = this.sessionsCache.get(str);
        if (pair == null || pair.getRight() == null) {
            log.warn("Unable to get session from cache {}", str);
        }
        return pair;
    }

    private void putSessionElementToCache(SessionElementElastic sessionElementElastic) {
        String sessionId = sessionElementElastic.getSessionId();
        if (!this.sessionElementsCache.containsKey(sessionId)) {
            this.sessionElementsCache.put(sessionId, new ConcurrentHashMap());
        }
        this.sessionElementsCache.get(sessionId).put(sessionElementElastic.getId(), sessionElementElastic);
    }

    @Nullable
    public SessionElementElastic getSessionElementFromCache(String str, String str2) {
        ConcurrentMap<String, SessionElementElastic> concurrentMap = this.sessionElementsCache.get(str);
        if (concurrentMap != null) {
            return concurrentMap.get(str2);
        }
        return null;
    }

    public Collection<SessionElementElastic> getSessionElementsFromCache(String str) {
        ConcurrentMap<String, SessionElementElastic> concurrentMap = this.sessionElementsCache.get(str);
        return concurrentMap != null ? concurrentMap.values() : Collections.emptyList();
    }

    public void putToSingleElementCache(String str, SessionElementElastic sessionElementElastic) {
        runWithSessionReadLock(str, () -> {
            this.singleElementCache.put(str, sessionElementElastic);
        });
    }

    public SessionElementElastic moveFromSingleElementCacheToElementCache(String str) {
        AtomicReference atomicReference = new AtomicReference();
        runWithSessionReadLock(str, () -> {
            atomicReference.set(this.singleElementCache.remove(str));
            if (atomicReference.get() != null) {
                putSessionElementToCache((SessionElementElastic) atomicReference.get());
            }
        });
        return (SessionElementElastic) atomicReference.get();
    }

    public void clearSessionCache(String str) {
        this.sessionsCache.remove(str);
        this.sessionElementsCache.remove(str);
        this.singleElementCache.remove(str);
    }

    private void runWithSessionReadLock(String str, Runnable runnable) {
        Pair<ReadWriteLock, Session> pair = this.sessionsCache.get(str);
        if (pair != null) {
            ((ReadWriteLock) pair.getLeft()).readLock().lock();
            try {
                if (this.sessionsCache.containsKey(str)) {
                    runnable.run();
                    ((ReadWriteLock) pair.getLeft()).readLock().unlock();
                    return;
                }
                ((ReadWriteLock) pair.getLeft()).readLock().unlock();
            } catch (Throwable th) {
                ((ReadWriteLock) pair.getLeft()).readLock().unlock();
                throw th;
            }
        }
        log.debug("Session {} is not alive, skip sessions cache update", str);
    }

    private long calculatePayloadSizeInBytes(SessionElementElastic sessionElementElastic) {
        long j = 0;
        String bodyBefore = sessionElementElastic.getBodyBefore();
        String bodyAfter = sessionElementElastic.getBodyAfter();
        if (bodyBefore != null) {
            j = 0 + bodyBefore.length();
        }
        if (bodyAfter != null) {
            j += bodyAfter.length();
        }
        return j;
    }
}
