package io.debezium.platform.domain;

import io.debezium.platform.environment.logs.LogReader;
import io.quarkus.virtual.threads.VirtualThreads;
import jakarta.enterprise.context.ApplicationScoped;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.jboss.logging.Logger;

@ApplicationScoped
/* loaded from: input_file:io/debezium/platform/domain/LogStreamingService.class */
public class LogStreamingService {
    public static final int NO_DATA_SLEEP_MS = 1000;
    private final Logger logger;
    private final ExecutorService executorService;

    /* loaded from: input_file:io/debezium/platform/domain/LogStreamingService$LogStreamingTask.class */
    public static class LogStreamingTask implements Runnable, Closeable {
        private final Logger logger;
        private final String name;
        private final AtomicBoolean running = new AtomicBoolean(false);
        private final Supplier<LogReader> supplier;
        private final Consumer<String> consumer;

        public LogStreamingTask(String str, Supplier<LogReader> supplier, Consumer<String> consumer, Logger logger) {
            this.name = str;
            this.supplier = supplier;
            this.consumer = consumer;
            this.logger = logger;
        }

        public String getName() {
            return this.name;
        }

        public boolean isRunning() {
            return this.running.get();
        }

        public void stop() {
            if (this.running.compareAndSet(true, false)) {
                return;
            }
            this.logger.infof("Stopping log streamer for '%s'", this.name);
        }

        @Override // java.lang.Runnable
        public void run() {
            if (this.running.compareAndSet(false, true)) {
                this.logger.infof("Starting log streamer for '%s'", this.name);
                try {
                    try {
                        try {
                            LogReader logReader = this.supplier.get();
                            try {
                                doStream(logReader);
                                this.logger.infof("Finished streaming from log %s", this.name);
                                if (logReader != null) {
                                    logReader.close();
                                }
                                stop();
                            } catch (Throwable th) {
                                if (logReader != null) {
                                    try {
                                        logReader.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                }
                                throw th;
                            }
                        } catch (IOException e) {
                            this.logger.errorf("Error streaming from log %s", this.name);
                            stop();
                        }
                    } catch (InterruptedException e2) {
                        this.logger.errorf("Interrupted while waiting for more logs from log %s", this.name);
                        Thread.currentThread().interrupt();
                        stop();
                    }
                } catch (Throwable th3) {
                    stop();
                    throw th3;
                }
            }
        }

        private void doStream(LogReader logReader) throws InterruptedException, IOException {
            while (isRunning()) {
                String readLine = logReader.readLine();
                if (readLine == null) {
                    Thread.sleep(1000L);
                }
                this.consumer.accept(readLine);
            }
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() {
            stop();
        }
    }

    public LogStreamingService(Logger logger, @VirtualThreads ExecutorService executorService) {
        this.logger = logger;
        this.executorService = executorService;
    }

    public LogStreamingTask stream(String str, Supplier<LogReader> supplier, Consumer<String> consumer) {
        this.logger.infof("Starting log streamer for log %s", str);
        LogStreamingTask logStreamingTask = new LogStreamingTask(str, supplier, consumer, this.logger);
        this.executorService.submit(logStreamingTask);
        return logStreamingTask;
    }
}
