package io.machinic.stream.spliterator;

import io.machinic.stream.MxStream;
import io.machinic.stream.StreamEventException;
import io.machinic.stream.StreamException;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.Spliterator;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/machinic/stream/spliterator/AsyncMapSpliterator.class */
public class AsyncMapSpliterator<IN, OUT> extends AbstractSpliterator<IN, OUT> {
    private static Logger logger = LoggerFactory.getLogger(MxStream.class);
    private final Supplier<Function<? super IN, ? extends OUT>> supplier;
    private final Function<? super IN, ? extends OUT> mapper;
    private final int parallelism;
    private final ExecutorService executorService;
    private final Queue<Future<OUT>> queue;

    /* loaded from: input_file:io/machinic/stream/spliterator/AsyncMapSpliterator$AsyncTask.class */
    public class AsyncTask implements Callable<OUT> {
        private final IN input;

        private AsyncTask(IN in) {
            this.input = in;
        }

        @Override // java.util.concurrent.Callable
        public OUT call() throws Exception {
            try {
                return AsyncMapSpliterator.this.mapper.apply(this.input);
            } catch (Exception e) {
                AsyncMapSpliterator.this.getStream().exceptionHandler().onException(e, this.input);
                throw new StreamEventException(this.input, String.format("Event %s failed. Caused by %s", this.input, e.getMessage()), e);
            }
        }
    }

    public AsyncMapSpliterator(MxStream<IN> mxStream, Spliterator<IN> spliterator, int i, ExecutorService executorService, Supplier<Function<? super IN, ? extends OUT>> supplier) {
        super(mxStream, spliterator);
        this.supplier = supplier;
        this.mapper = supplier.get();
        this.parallelism = i;
        this.executorService = executorService;
        this.queue = new ArrayDeque(i + 2);
    }

    private AsyncMapSpliterator<IN, OUT>.AsyncTask createTask(IN in) {
        return new AsyncTask(in);
    }

    private void enqueue(AsyncMapSpliterator<IN, OUT>.AsyncTask asyncTask) {
        this.queue.add(this.executorService.submit(asyncTask));
    }

    private Future<OUT> nonBlockingDequeue() {
        Future<OUT> peek = this.queue.peek();
        if (peek == null || !peek.isDone()) {
            return null;
        }
        return this.queue.poll();
    }

    private Future<OUT> dequeue() {
        return this.queue.poll();
    }

    protected int getQueueSize() {
        int size;
        if (!isParallel()) {
            return this.queue.size();
        }
        synchronized (this.queue) {
            size = this.queue.size();
        }
        return size;
    }

    @Override // java.util.Spliterator
    public boolean tryAdvance(Consumer<? super OUT> consumer) {
        boolean tryAdvance;
        Future<OUT> nonBlockingDequeue = nonBlockingDequeue();
        if (nonBlockingDequeue == null && getQueueSize() >= this.parallelism) {
            nonBlockingDequeue = dequeue();
        }
        if (nonBlockingDequeue != null) {
            try {
                consumer.accept(nonBlockingDequeue.get());
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } catch (ExecutionException e2) {
                if (e2.getCause() == null || !(e2.getCause() instanceof StreamEventException)) {
                    if (e2.getCause() == null || !(e2.getCause() instanceof StreamException)) {
                        throw new StreamException(String.format("mapAsync failed. Caused by %s", e2.getMessage()), e2);
                    }
                    throw ((StreamException) e2.getCause());
                }
                logger.debug("asyncMap operation skipping message due to caught exception {}", e2.getCause().getMessage());
            } catch (Exception e3) {
                throw new StreamException(String.format("mapAsync failed. Caused by %s", e3.getMessage()), e3);
            }
        }
        do {
            tryAdvance = this.previousSpliterator.tryAdvance(obj -> {
                enqueue(createTask(obj));
            });
            if (!tryAdvance) {
                break;
            }
        } while (getQueueSize() <= this.parallelism);
        return tryAdvance || getQueueSize() != 0;
    }

    @Override // io.machinic.stream.spliterator.AbstractSpliterator
    public AbstractSpliterator<IN, OUT> split(Spliterator<IN> spliterator) {
        return new AsyncMapSpliterator(this.stream, spliterator, this.parallelism, this.executorService, this.supplier);
    }
}
