package io.github.panghy.javaflow.core;

import io.github.panghy.javaflow.Flow;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;

/* loaded from: input_file:io/github/panghy/javaflow/core/PromiseStream.class */
public class PromiseStream<T> {
    private final Queue<T> buffer = new ConcurrentLinkedQueue();
    private final Queue<FlowPromise<T>> nextPromises = new ConcurrentLinkedQueue();
    private final Queue<FlowPromise<Boolean>> hasNextPromises = new ConcurrentLinkedQueue();
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final AtomicReference<Throwable> closeException = new AtomicReference<>();
    private final FutureStream<T> futureStream = new FutureStreamImpl(this);

    /* loaded from: input_file:io/github/panghy/javaflow/core/PromiseStream$FutureStreamImpl.class */
    private static class FutureStreamImpl<E> implements FutureStream<E> {
        private final PromiseStream<E> parent;

        FutureStreamImpl(PromiseStream<E> promiseStream) {
            this.parent = promiseStream;
        }

        @Override // io.github.panghy.javaflow.core.FlowStream
        public FlowFuture<E> nextAsync() {
            if (((PromiseStream) this.parent).closed.get() && ((PromiseStream) this.parent).buffer.isEmpty()) {
                Throwable th = ((PromiseStream) this.parent).closeException.get();
                if (th == null) {
                    th = new StreamClosedException();
                }
                return FlowFuture.failed(th);
            }
            E poll = ((PromiseStream) this.parent).buffer.poll();
            if (poll != null) {
                return FlowFuture.completed(poll);
            }
            FlowFuture<E> flowFuture = new FlowFuture<>();
            FlowPromise<E> promise = flowFuture.getPromise();
            if (!((PromiseStream) this.parent).closed.get()) {
                ((PromiseStream) this.parent).nextPromises.add(promise);
                return flowFuture;
            }
            Throwable th2 = ((PromiseStream) this.parent).closeException.get();
            if (th2 == null) {
                th2 = new StreamClosedException();
            }
            promise.completeExceptionally(th2);
            return flowFuture;
        }

        @Override // io.github.panghy.javaflow.core.FlowStream
        public FlowFuture<Boolean> hasNextAsync() {
            if (!((PromiseStream) this.parent).buffer.isEmpty()) {
                return FlowFuture.completed(true);
            }
            if (((PromiseStream) this.parent).closed.get()) {
                return FlowFuture.completed(false);
            }
            FlowFuture<Boolean> flowFuture = new FlowFuture<>();
            FlowPromise<Boolean> promise = flowFuture.getPromise();
            if (!((PromiseStream) this.parent).buffer.isEmpty()) {
                promise.complete(true);
            } else if (((PromiseStream) this.parent).closed.get()) {
                promise.complete(false);
            } else {
                ((PromiseStream) this.parent).hasNextPromises.add(promise);
            }
            return flowFuture;
        }

        @Override // io.github.panghy.javaflow.core.FlowStream
        public FlowFuture<Void> closeExceptionally(Throwable th) {
            this.parent.closeExceptionally(th);
            return FlowFuture.completed(null);
        }

        @Override // io.github.panghy.javaflow.core.FlowStream
        public FlowFuture<Void> close() {
            this.parent.close();
            return FlowFuture.completed(null);
        }

        @Override // io.github.panghy.javaflow.core.FlowStream
        public boolean isClosed() {
            return this.parent.isClosed();
        }

        @Override // io.github.panghy.javaflow.core.FlowStream
        public <R> FlowStream<R> map(Function<? super E, ? extends R> function) {
            PromiseStream promiseStream = new PromiseStream();
            Flow.startActor(() -> {
                while (!promiseStream.isClosed()) {
                    try {
                        if (!((Boolean) Flow.await(hasNextAsync())).booleanValue()) {
                            promiseStream.close();
                            return null;
                        }
                        promiseStream.send(function.apply(Flow.await(nextAsync())));
                    } catch (Exception e) {
                        promiseStream.closeExceptionally(e);
                        return null;
                    }
                }
                return null;
            });
            return promiseStream.getFutureStream();
        }

        @Override // io.github.panghy.javaflow.core.FlowStream
        public FlowStream<E> filter(Predicate<? super E> predicate) {
            PromiseStream promiseStream = new PromiseStream();
            Flow.startActor(() -> {
                while (!promiseStream.isClosed()) {
                    try {
                        if (!((Boolean) Flow.await(hasNextAsync())).booleanValue()) {
                            promiseStream.close();
                            return null;
                        }
                        Object await = Flow.await(nextAsync());
                        if (predicate.test(await)) {
                            promiseStream.send(await);
                        }
                    } catch (Exception e) {
                        promiseStream.closeExceptionally(e);
                        return null;
                    }
                }
                return null;
            });
            return promiseStream.getFutureStream();
        }

        @Override // io.github.panghy.javaflow.core.FlowStream
        public FlowFuture<Void> forEach(Consumer<? super E> consumer) {
            E poll;
            FlowFuture<Void> flowFuture = new FlowFuture<>();
            FlowPromise<Void> promise = flowFuture.getPromise();
            if (((PromiseStream) this.parent).closed.get() && ((PromiseStream) this.parent).closeException.get() != null) {
                promise.completeExceptionally(((PromiseStream) this.parent).closeException.get());
                return flowFuture;
            }
            while (!((PromiseStream) this.parent).buffer.isEmpty() && (poll = ((PromiseStream) this.parent).buffer.poll()) != null) {
                try {
                    consumer.accept(poll);
                } catch (Exception e) {
                    promise.completeExceptionally(e);
                    return flowFuture;
                }
            }
            if (((PromiseStream) this.parent).closed.get() && ((PromiseStream) this.parent).buffer.isEmpty()) {
                promise.complete(null);
                return flowFuture;
            }
            Flow.startActor(() -> {
                while (((Boolean) Flow.await(hasNextAsync())).booleanValue()) {
                    try {
                        consumer.accept(Flow.await(nextAsync()));
                        Flow.await(Flow.yieldF());
                    } catch (Exception e2) {
                        promise.completeExceptionally(e2);
                        return null;
                    }
                }
                promise.complete(null);
                return null;
            });
            return flowFuture;
        }
    }

    public FutureStream<T> getFutureStream() {
        return this.futureStream;
    }

    public boolean send(T t) {
        if (this.closed.get()) {
            return false;
        }
        FlowPromise<T> poll = this.nextPromises.poll();
        if (poll != null) {
            poll.complete(t);
        } else {
            this.buffer.add(t);
        }
        while (true) {
            FlowPromise<Boolean> poll2 = this.hasNextPromises.poll();
            if (poll2 == null) {
                return true;
            }
            poll2.complete(true);
        }
    }

    public boolean close() {
        return closeExceptionally(new StreamClosedException());
    }

    public boolean closeExceptionally(Throwable th) {
        if (!this.closed.compareAndSet(false, true)) {
            return false;
        }
        this.closeException.set(th);
        while (true) {
            FlowPromise<T> poll = this.nextPromises.poll();
            if (poll == null) {
                break;
            }
            poll.completeExceptionally(th);
        }
        while (true) {
            FlowPromise<Boolean> poll2 = this.hasNextPromises.poll();
            if (poll2 == null) {
                return true;
            }
            poll2.complete(false);
        }
    }

    public boolean isClosed() {
        return this.closed.get();
    }
}
