package io.vertx.ext.mongo.impl;

import io.vertx.core.Context;
import io.vertx.core.Handler;
import io.vertx.core.internal.ContextInternal;
import io.vertx.core.internal.EventExecutor;
import io.vertx.core.internal.concurrent.InboundMessageQueue;
import io.vertx.core.streams.ReadStream;
import io.vertx.core.streams.StreamBase;
import java.util.Objects;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;

/* loaded from: input_file:io/vertx/ext/mongo/impl/PublisherAdapter.class */
public class PublisherAdapter<T> implements ReadStream<T> {
    private final ContextInternal context;
    private final Publisher<T> publisher;
    private final int batchSize;
    private Handler<T> handler;
    private Handler<Throwable> exceptionHandler;
    private Handler<Void> endHandler;
    private PublisherAdapter<T>.Subscriber subscriber;
    private long demand;
    private final Lock lock = new ReentrantLock();
    private final EventExecutor syncExec = new EventExecutor() { // from class: io.vertx.ext.mongo.impl.PublisherAdapter.1
        public boolean inThread() {
            return true;
        }

        public void execute(Runnable runnable) {
            PublisherAdapter.this.lock.lock();
            try {
                runnable.run();
            } finally {
                PublisherAdapter.this.lock.unlock();
            }
        }
    };
    private static final Object END = new Object();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/vertx/ext/mongo/impl/PublisherAdapter$Subscriber.class */
    public class Subscriber extends InboundMessageQueue<Object> implements org.reactivestreams.Subscriber<T> {
        private Subscription subscription;
        private boolean paused;
        private int inflight;

        public Subscriber() {
            super(PublisherAdapter.this.syncExec, PublisherAdapter.this.context.executor());
            this.inflight = PublisherAdapter.this.batchSize;
        }

        protected void handleResume() {
            this.paused = false;
            if (this.inflight == 0) {
                this.inflight += PublisherAdapter.this.batchSize;
                this.subscription.request(PublisherAdapter.this.batchSize);
            }
        }

        protected void handlePause() {
            this.paused = true;
        }

        /* JADX WARN: Multi-variable type inference failed */
        protected void handleMessage(Object obj) {
            Handler handler;
            synchronized (PublisherAdapter.this) {
                if (obj == PublisherAdapter.END) {
                    obj = null;
                    handler = PublisherAdapter.this.endHandler;
                } else {
                    handler = obj instanceof Throwable ? PublisherAdapter.this.exceptionHandler : PublisherAdapter.this.handler;
                }
            }
            if (handler != null) {
                PublisherAdapter.this.context.dispatch(obj, handler);
            }
        }

        public void onSubscribe(Subscription subscription) {
            synchronized (PublisherAdapter.this) {
                this.subscription = subscription;
            }
            subscription.request(PublisherAdapter.this.batchSize);
        }

        void cancel() {
            this.subscription.cancel();
        }

        public void onNext(T t) {
            PublisherAdapter.this.lock.lock();
            this.inflight--;
            try {
                write(t);
                if (this.inflight == 0 && !this.paused) {
                    this.inflight += PublisherAdapter.this.batchSize;
                    this.subscription.request(PublisherAdapter.this.batchSize);
                }
            } finally {
                PublisherAdapter.this.lock.unlock();
            }
        }

        public void onError(Throwable th) {
            PublisherAdapter.this.lock.lock();
            try {
                write(th);
            } finally {
                PublisherAdapter.this.lock.unlock();
            }
        }

        public void onComplete() {
            PublisherAdapter.this.lock.lock();
            try {
                write(PublisherAdapter.END);
            } finally {
                PublisherAdapter.this.lock.unlock();
            }
        }
    }

    public PublisherAdapter(Context context, Publisher<T> publisher, int i) {
        Objects.requireNonNull(context, "context is null");
        Objects.requireNonNull(publisher, "publisher is null");
        this.context = (ContextInternal) context;
        this.publisher = publisher;
        this.batchSize = i;
        this.demand = Long.MAX_VALUE;
    }

    public synchronized ReadStream<T> exceptionHandler(Handler<Throwable> handler) {
        this.exceptionHandler = handler;
        return this;
    }

    public synchronized ReadStream<T> endHandler(Handler<Void> handler) {
        this.endHandler = handler;
        return this;
    }

    public ReadStream<T> handler(Handler<T> handler) {
        PublisherAdapter<T>.Subscriber subscriber;
        if (handler == null) {
            synchronized (this) {
                this.handler = handler;
                subscriber = this.subscriber;
                this.subscriber = null;
                this.demand = Long.MAX_VALUE;
            }
            if (subscriber != null) {
                subscriber.cancel();
            }
        } else {
            synchronized (this) {
                this.handler = handler;
                if (this.subscriber != null) {
                    return this;
                }
                PublisherAdapter<T>.Subscriber subscriber2 = new Subscriber();
                this.subscriber = subscriber2;
                long j = this.demand;
                if (j > 0) {
                    subscriber2.fetch(j);
                } else {
                    subscriber2.pause();
                }
                this.publisher.subscribe(subscriber2);
            }
        }
        return this;
    }

    public ReadStream<T> pause() {
        PublisherAdapter<T>.Subscriber subscriber;
        synchronized (this) {
            this.demand = 0L;
            subscriber = this.subscriber;
        }
        if (subscriber != null) {
            subscriber.pause();
        }
        return this;
    }

    public ReadStream<T> resume() {
        return fetch(Long.MAX_VALUE);
    }

    public synchronized ReadStream<T> fetch(long j) {
        long j2;
        PublisherAdapter<T>.Subscriber subscriber;
        if (j < 0) {
            throw new IllegalArgumentException();
        }
        if (j == 0) {
            return this;
        }
        synchronized (this) {
            this.demand += j;
            if (this.demand < 0) {
                this.demand = Long.MAX_VALUE;
            }
            j2 = this.demand;
            subscriber = this.subscriber;
        }
        if (subscriber != null) {
            subscriber.fetch(j2);
        }
        return this;
    }

    /* renamed from: exceptionHandler, reason: collision with other method in class */
    public /* bridge */ /* synthetic */ StreamBase m10exceptionHandler(Handler handler) {
        return exceptionHandler((Handler<Throwable>) handler);
    }
}
