package modelengine.fit.http.client.support;

import java.lang.reflect.Type;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import modelengine.fit.http.client.HttpClassicClientRequest;
import modelengine.fit.http.client.HttpClassicClientResponse;
import modelengine.fit.http.entity.TextEvent;
import modelengine.fitframework.flowable.Choir;
import modelengine.fitframework.flowable.Subscriber;
import modelengine.fitframework.flowable.Subscription;
import modelengine.fitframework.flowable.choir.AbstractChoir;
import modelengine.fitframework.flowable.subscription.AbstractSubscription;
import modelengine.fitframework.flowable.util.worker.Worker;
import modelengine.fitframework.flowable.util.worker.WorkerObserver;
import modelengine.fitframework.inspection.Nonnull;
import modelengine.fitframework.util.LockUtils;
import modelengine.fitframework.util.ObjectUtils;
import modelengine.fitframework.util.StringUtils;

/* loaded from: input_file:FIT-INF/shared/fit-http-classic-3.5.0-SNAPSHOT.jar:modelengine/fit/http/client/support/TextStreamChoir.class */
public class TextStreamChoir<T> extends AbstractChoir<T> implements Choir<T> {
    private final HttpClassicClientRequest request;
    private final Type responseType;

    /* loaded from: input_file:FIT-INF/shared/fit-http-classic-3.5.0-SNAPSHOT.jar:modelengine/fit/http/client/support/TextStreamChoir$TextStreamSubscription.class */
    private static class TextStreamSubscription<T> extends AbstractSubscription implements WorkerObserver<T> {
        private static final int HTTP_SUCCESS_CODE_MIN = 200;
        private static final int HTTP_SUCCESS_CODE_MAX = 300;
        private final Subscriber<T> subscriber;
        private final HttpClassicClientRequest request;
        private final Type responseType;
        private final AtomicBoolean requested = new AtomicBoolean();
        private final AtomicBoolean completed = new AtomicBoolean();
        private final AtomicLong counter = new AtomicLong();
        private final Lock lock = LockUtils.newReentrantLock();
        private final Queue<T> buffer = new ArrayDeque();
        private volatile Exception error;

        TextStreamSubscription(Subscriber<T> subscriber, HttpClassicClientRequest httpClassicClientRequest, Type type) {
            this.subscriber = subscriber;
            this.request = httpClassicClientRequest;
            this.responseType = type;
        }

        protected void request0(long j) {
            long addAndGet = this.counter.addAndGet(j);
            if (this.requested.compareAndSet(false, true)) {
                exchange();
            }
            synchronized (this.lock) {
                for (int i = 0; i < addAndGet; i++) {
                    if (this.buffer.isEmpty()) {
                        handleBufferIsEmpty();
                        return;
                    } else {
                        this.subscriber.consume(this.buffer.remove());
                        this.counter.decrementAndGet();
                    }
                }
            }
        }

        private void exchange() {
            try {
                HttpClassicClientResponse<T> exchange = this.request.exchange(this.responseType);
                try {
                    if (!ObjectUtils.between(Integer.valueOf(exchange.statusCode()), Integer.valueOf(HTTP_SUCCESS_CODE_MIN), Integer.valueOf(HTTP_SUCCESS_CODE_MAX), true, false)) {
                        throw new IllegalStateException(StringUtils.format("Failed to exchange text event stream. [uri={0}, statusCode={1}, reason={2}]", new Object[]{this.request.requestUri(), Integer.valueOf(exchange.statusCode()), exchange.reasonPhrase()}));
                    }
                    Worker.create(this, exchange.textEventStreamEntity().orElseThrow(() -> {
                        return new IllegalStateException("No text event stream entity.");
                    }).stream().map(this::convert)).run();
                    if (exchange != null) {
                        exchange.close();
                    }
                } finally {
                }
            } catch (Exception e) {
                onWorkerFailed(e);
            }
        }

        private void handleBufferIsEmpty() {
            if (this.completed.get()) {
                if (this.error != null) {
                    this.subscriber.fail(this.error);
                } else {
                    this.subscriber.complete();
                }
            }
        }

        private T convert(TextEvent textEvent) {
            return this.responseType == TextEvent.class ? (T) ObjectUtils.cast(textEvent) : (T) ObjectUtils.cast(textEvent.data());
        }

        public void onWorkerSubscribed(Subscription subscription) {
            subscription.request(Long.MAX_VALUE);
        }

        public void onWorkerConsumed(T t, long j) {
            synchronized (this.lock) {
                this.buffer.add(t);
                if (this.counter.get() > 0) {
                    this.subscriber.consume(this.buffer.remove());
                    this.counter.decrementAndGet();
                }
            }
        }

        public void onWorkerFailed(Exception exc) {
            this.completed.set(true);
            this.error = exc;
            synchronized (this.lock) {
                if (this.buffer.isEmpty()) {
                    handleBufferIsEmpty();
                }
            }
        }

        public void onWorkerCompleted() {
            this.completed.set(true);
            synchronized (this.lock) {
                if (this.buffer.isEmpty()) {
                    handleBufferIsEmpty();
                }
            }
        }
    }

    public TextStreamChoir(HttpClassicClientRequest httpClassicClientRequest, Type type) {
        this.request = httpClassicClientRequest;
        this.responseType = type;
    }

    protected void subscribe0(@Nonnull Subscriber<T> subscriber) {
        subscriber.onSubscribed(new TextStreamSubscription(subscriber, this.request, this.responseType));
    }
}
