package com.binaryigor.eventsql.internal;

import com.binaryigor.eventsql.Event;
import com.binaryigor.eventsql.EventPublication;
import com.binaryigor.eventsql.EventSQLConsumers;
import com.binaryigor.eventsql.EventSQLConsumptionException;
import com.binaryigor.eventsql.EventSQLPublisher;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/binaryigor/eventsql/internal/DefaultEventSQLConsumers.class */
public class DefaultEventSQLConsumers implements EventSQLConsumers {
    private static final Logger logger = LoggerFactory.getLogger(DefaultEventSQLConsumers.class);
    private final Transactions transactions;
    private final ConsumerRepository consumerRepository;
    private final EventRepository eventRepository;
    private final EventSQLPublisher publisher;
    private final Clock clock;
    private EventSQLConsumers.DLTEventFactory dltEventFactory;
    private final AtomicBoolean running = new AtomicBoolean(true);
    private final Map<ConsumerId, Thread> consumerThreads = new ConcurrentHashMap();

    public DefaultEventSQLConsumers(TopicDefinitionsCache topicDefinitionsCache, Transactions transactions, ConsumerRepository consumerRepository, EventRepository eventRepository, EventSQLPublisher eventSQLPublisher, Clock clock) {
        this.transactions = transactions;
        this.consumerRepository = consumerRepository;
        this.eventRepository = eventRepository;
        this.publisher = eventSQLPublisher;
        this.clock = clock;
        this.dltEventFactory = new DefaultDLTEventFactory(topicDefinitionsCache);
    }

    @Override // com.binaryigor.eventsql.EventSQLConsumers
    public void startConsumer(String str, String str2, java.util.function.Consumer<Event> consumer) {
        startConsumer(str, str2, consumer, DEFAULT_POLLING_DELAY);
    }

    @Override // com.binaryigor.eventsql.EventSQLConsumers
    public void startConsumer(String str, String str2, java.util.function.Consumer<Event> consumer, Duration duration) {
        startConsumer(str, str2, consumer, duration, 10);
    }

    @Override // com.binaryigor.eventsql.EventSQLConsumers
    public void startConsumer(String str, String str2, java.util.function.Consumer<Event> consumer, Duration duration, int i) {
        startBatchConsumer(str, str2, new ConsumerWrapper(consumer), new EventSQLConsumers.ConsumptionConfig(1, i, duration, duration));
    }

    @Override // com.binaryigor.eventsql.EventSQLConsumers
    public void configureDLTEventFactory(EventSQLConsumers.DLTEventFactory dLTEventFactory) {
        this.dltEventFactory = dLTEventFactory;
    }

    private List<Consumer> findPartitionedConsumers(String str, String str2) {
        List<Consumer> allOf = this.consumerRepository.allOf(str, str2);
        if (allOf.isEmpty()) {
            throw new IllegalArgumentException("There are no consumers of %s topic and %s name".formatted(str, str2));
        }
        return allOf;
    }

    private void consumeEvents(ConsumerId consumerId, java.util.function.Consumer<List<Event>> consumer, EventSQLConsumers.ConsumptionConfig consumptionConfig) {
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        AtomicReference atomicReference = new AtomicReference(this.clock.instant());
        while (this.running.get()) {
            try {
                if (atomicBoolean.get()) {
                    Thread.sleep(consumptionConfig.pollingDelay());
                }
                this.transactions.execute(() -> {
                    atomicBoolean.set(consumeNextEvents(consumerId, consumptionConfig, consumer, atomicReference));
                });
            } catch (Exception e) {
                logger.error("Problem while consuming events for {} consumer: ", consumer, e);
            }
        }
        this.consumerThreads.remove(consumerId);
    }

    private boolean consumeNextEvents(ConsumerId consumerId, EventSQLConsumers.ConsumptionConfig consumptionConfig, java.util.function.Consumer<List<Event>> consumer, AtomicReference<Instant> atomicReference) {
        long id;
        boolean z;
        Long l;
        Optional<Consumer> ofIdForUpdateSkippingLocked = this.consumerRepository.ofIdForUpdateSkippingLocked(consumerId);
        if (ofIdForUpdateSkippingLocked.isEmpty()) {
            return true;
        }
        Consumer consumer2 = ofIdForUpdateSkippingLocked.get();
        List<Event> nextEvents = nextEvents(consumer2, consumptionConfig.maxEvents());
        if (nextEvents.isEmpty()) {
            return true;
        }
        if (nextEvents.size() < consumptionConfig.minEvents() && shouldWaitForMinEvents(atomicReference.get(), consumptionConfig.maxPollingDelay())) {
            return true;
        }
        try {
            consumer.accept(nextEvents);
            l = Long.valueOf(((Event) nextEvents.getFirst()).id());
            id = ((Event) nextEvents.getLast()).id();
            z = nextEvents.size() < consumptionConfig.maxEvents();
        } catch (EventSQLConsumptionException e) {
            Optional<EventPublication> create = this.dltEventFactory.create(e, consumerId.name());
            if (create.isPresent()) {
                logger.error("Problem while consuming event for {} consumer, publishing it to dlt: ", consumerId, e);
                id = e.event().id();
                this.publisher.publish(create.get());
                z = false;
            } else {
                logger.error("Problem while consuming event for {} consumer: ", consumerId, e);
                id = e.event().id() - 1;
                z = true;
            }
            l = null;
        }
        Instant instant = this.clock.instant();
        this.consumerRepository.update(consumer2.withUpdatedStats(l, id, instant, nextEvents.size()));
        atomicReference.set(instant);
        return z;
    }

    private List<Event> nextEvents(Consumer consumer, int i) {
        return consumer.partition() == -1 ? this.eventRepository.nextEvents(consumer.topic(), consumer.lastEventId(), i) : this.eventRepository.nextEvents(consumer.topic(), consumer.partition(), consumer.lastEventId(), i);
    }

    private boolean shouldWaitForMinEvents(Instant instant, Duration duration) {
        return Duration.between(instant, this.clock.instant()).compareTo(duration) < 0;
    }

    @Override // com.binaryigor.eventsql.EventSQLConsumers
    public void startBatchConsumer(String str, String str2, java.util.function.Consumer<List<Event>> consumer, EventSQLConsumers.ConsumptionConfig consumptionConfig) {
        for (Consumer consumer2 : findPartitionedConsumers(str, str2)) {
            ConsumerId consumerId = new ConsumerId(consumer2.topic(), consumer2.name(), consumer2.partition());
            if (this.consumerThreads.containsKey(consumerId)) {
                logger.info("Consumer {} is registered already, skipping", consumerId);
                return;
            }
            this.consumerThreads.put(consumerId, Thread.startVirtualThread(() -> {
                consumeEvents(consumerId, consumer, consumptionConfig);
            }));
        }
    }

    @Override // com.binaryigor.eventsql.EventSQLConsumers
    public void stop(Duration duration) {
        logger.info("Stopping consumers...");
        this.running.set(false);
        try {
            if (waitForConsumersToFinishAsync().await(duration.toMillis(), TimeUnit.MILLISECONDS)) {
                logger.info("Consumers have stopped gracefully!");
            } else {
                logger.warn("Some consumers didn't finish in {}, exiting in any case", duration);
            }
        } catch (Exception e) {
            logger.error("Problem while stopping consumers:", e);
        }
    }

    private CountDownLatch waitForConsumersToFinishAsync() {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        Thread.startVirtualThread(() -> {
            while (true) {
                List list = this.consumerThreads.entrySet().stream().filter(entry -> {
                    return ((Thread) entry.getValue()).isAlive();
                }).map((v0) -> {
                    return v0.getKey();
                }).toList();
                if (list.isEmpty()) {
                    countDownLatch.countDown();
                    return;
                }
                try {
                    logger.info("Some consumers ({}) are still alive, waiting for them to finish...", list);
                    Thread.sleep(500L);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        return countDownLatch;
    }

    @Override // com.binaryigor.eventsql.EventSQLConsumers
    public EventSQLConsumers.DLTEventFactory dltEventFactory() {
        return this.dltEventFactory;
    }
}
