package com.binaryigor.eventsql.internal;

import com.binaryigor.eventsql.EventPublication;
import com.binaryigor.eventsql.EventSQLPublisher;
import com.binaryigor.eventsql.TopicDefinition;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/binaryigor/eventsql/internal/DefaultEventSQLPublisher.class */
public class DefaultEventSQLPublisher implements EventSQLPublisher {
    private static final Logger logger = LoggerFactory.getLogger(DefaultEventSQLPublisher.class);
    private final TopicDefinitionsCache topicDefinitionsCache;
    private final Transactions transactions;
    private final EventRepository eventRepository;
    private volatile Thread flushPublishBufferThread;
    private final int flushPublishBufferSize;
    private final Duration flushPublishBufferDelay;
    private final AtomicBoolean running = new AtomicBoolean(true);
    private final AtomicBoolean published = new AtomicBoolean(false);
    private final Set<String> publishedTopics = Collections.newSetFromMap(new ConcurrentHashMap());
    private final AtomicBoolean flushPublishBufferThreadSet = new AtomicBoolean(false);
    private EventSQLPublisher.Partitioner partitioner = new DefaultPartitioner();

    public DefaultEventSQLPublisher(TopicDefinitionsCache topicDefinitionsCache, Transactions transactions, EventRepository eventRepository, int i, Duration duration) {
        this.topicDefinitionsCache = topicDefinitionsCache;
        this.transactions = transactions;
        this.eventRepository = eventRepository;
        this.flushPublishBufferSize = i;
        this.flushPublishBufferDelay = duration;
    }

    @Override // com.binaryigor.eventsql.EventSQLPublisher
    public void publish(EventPublication eventPublication) {
        publish(eventPublication.topic(), List.of(eventPublication));
    }

    @Override // com.binaryigor.eventsql.EventSQLPublisher
    public void publishAll(Collection<EventPublication> collection) {
        Map map = (Map) collection.stream().collect(Collectors.groupingBy((v0) -> {
            return v0.topic();
        }));
        this.transactions.execute(() -> {
            map.forEach((v1, v2) -> {
                publish(v1, v2);
            });
        });
    }

    private void publish(String str, Collection<EventPublication> collection) {
        TopicDefinition findTopicDefinition = findTopicDefinition(str);
        this.eventRepository.createAll(collection.stream().map(eventPublication -> {
            EventInput eventInput = new EventInput(eventPublication, (short) this.partitioner.partition(eventPublication, findTopicDefinition.partitions()));
            validateNewEvent(eventInput, findTopicDefinition);
            return eventInput;
        }).toList());
        this.published.set(true);
        this.publishedTopics.add(str);
        if (!this.flushPublishBufferThreadSet.compareAndExchange(false, true)) {
            startFlushPublishBufferThread();
        }
    }

    private void startFlushPublishBufferThread() {
        this.flushPublishBufferThread = Thread.startVirtualThread(() -> {
            boolean z = false;
            while (true) {
                if (!z && !this.running.get()) {
                    return;
                }
                if (!z) {
                    try {
                        Thread.sleep(this.flushPublishBufferDelay);
                    } catch (Exception e) {
                        logger.error("Fail to flush publish buffer:", e);
                    }
                }
                if (z || this.published.getAndSet(false)) {
                    z = this.eventRepository.flushBuffer(this.publishedTopics, this.flushPublishBufferSize) > this.flushPublishBufferSize;
                }
            }
        });
    }

    private void validateNewEvent(EventInput eventInput, TopicDefinition topicDefinition) {
        if (eventInput.partition() < -1) {
            throw new IllegalArgumentException("Illegal partition value: " + eventInput.partition());
        }
        if (topicDefinition.partitions() == -1 && eventInput.partition() != -1) {
            throw new IllegalArgumentException("%s topic is not partitioned, but publication to %d partition was requested".formatted(topicDefinition.name(), Short.valueOf(eventInput.partition())));
        }
        if (topicDefinition.partitions() > -1 && eventInput.partition() + 1 > topicDefinition.partitions()) {
            throw new IllegalArgumentException("%s topic has only %d partitions, but publishing to %d was requested".formatted(topicDefinition.name(), Integer.valueOf(topicDefinition.partitions()), Short.valueOf(eventInput.partition())));
        }
    }

    private TopicDefinition findTopicDefinition(String str) {
        return this.topicDefinitionsCache.getLoadingIf(str).orElseThrow(() -> {
            return new IllegalArgumentException("topic of %s name doesn't exist".formatted(str));
        });
    }

    @Override // com.binaryigor.eventsql.EventSQLPublisher
    public void configurePartitioner(EventSQLPublisher.Partitioner partitioner) {
        this.partitioner = partitioner;
    }

    @Override // com.binaryigor.eventsql.EventSQLPublisher
    public EventSQLPublisher.Partitioner partitioner() {
        return this.partitioner;
    }

    @Override // com.binaryigor.eventsql.EventSQLPublisher
    public void stop(Duration duration) {
        logger.info("Stopping publisher...");
        this.running.set(false);
        try {
            if (waitForPublisherToFinishAsync().await(duration.toMillis(), TimeUnit.MILLISECONDS)) {
                logger.info("Publisher has stopped gracefully!");
            } else {
                logger.warn("Publisher didn't finish in {}, exiting in any case", duration);
            }
        } catch (Exception e) {
            logger.error("Problem while stopping publisher:", e);
        }
    }

    private CountDownLatch waitForPublisherToFinishAsync() {
        if (!this.flushPublishBufferThreadSet.get()) {
            return new CountDownLatch(0);
        }
        CountDownLatch countDownLatch = new CountDownLatch(1);
        Thread.startVirtualThread(() -> {
            while (this.flushPublishBufferThread.isAlive()) {
                try {
                    logger.info("Publisher is still alive, waiting for it to finish...");
                    Thread.sleep(500L);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
            countDownLatch.countDown();
        });
        return countDownLatch;
    }
}
