package io.vertx.cassandra.impl;

import com.datastax.oss.driver.api.core.cql.ColumnDefinitions;
import com.datastax.oss.driver.api.core.cql.ExecutionInfo;
import com.datastax.oss.driver.api.core.cql.Row;
import io.vertx.cassandra.CassandraRowStream;
import io.vertx.cassandra.ResultSet;
import io.vertx.core.Context;
import io.vertx.core.Handler;
import io.vertx.core.internal.ContextInternal;
import io.vertx.core.internal.EventExecutor;
import io.vertx.core.internal.concurrent.InboundMessageQueue;
import io.vertx.core.streams.ReadStream;
import io.vertx.core.streams.StreamBase;
import java.util.Iterator;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/* loaded from: input_file:io/vertx/cassandra/impl/CassandraRowStreamImpl.class */
public class CassandraRowStreamImpl implements CassandraRowStream {
    private static final Object DONE = new Object();
    private final ContextInternal context;
    private final Queue internalQueue;
    private Handler<Row> rowHandler;
    private Handler<Throwable> exceptionHandler;
    private Handler<Void> endHandler;
    private ExecutionInfo executionInfo;
    private ColumnDefinitions columnDefinitions;
    private final Lock lock = new ReentrantLock();
    private final EventExecutor executor = new EventExecutor() { // from class: io.vertx.cassandra.impl.CassandraRowStreamImpl.1
        public boolean inThread() {
            return true;
        }

        public void execute(Runnable runnable) {
            CassandraRowStreamImpl.this.lock.lock();
            try {
                runnable.run();
            } finally {
                CassandraRowStreamImpl.this.lock.unlock();
            }
        }
    };

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/vertx/cassandra/impl/CassandraRowStreamImpl$Queue.class */
    public class Queue extends InboundMessageQueue<Object> {
        private ResultSet resultSet;
        private boolean paused;

        public Queue(ContextInternal contextInternal) {
            super(CassandraRowStreamImpl.this.executor, contextInternal.executor());
        }

        void init(ResultSet resultSet) {
            transfer(resultSet);
        }

        private void transfer(ResultSet resultSet) {
            Iterable<Row> currentPage = resultSet.currentPage();
            CassandraRowStreamImpl.this.lock.lock();
            try {
                Iterator<Row> it = currentPage.iterator();
                while (it.hasNext()) {
                    write(new StreamItem(resultSet, it.next()));
                }
                if (resultSet.hasMorePages()) {
                    resultSet.fetchNextPage().onComplete((resultSet2, th) -> {
                        if (th != null) {
                            write(th);
                            return;
                        }
                        this.resultSet = resultSet2;
                        if (this.paused) {
                            return;
                        }
                        transfer(resultSet2);
                    });
                } else {
                    write(CassandraRowStreamImpl.DONE);
                }
            } finally {
                CassandraRowStreamImpl.this.lock.unlock();
            }
        }

        protected void handleResume() {
            this.paused = false;
            ResultSet resultSet = this.resultSet;
            this.resultSet = null;
            if (resultSet != null) {
                transfer(resultSet);
            }
        }

        protected void handlePause() {
            this.paused = true;
        }

        protected void handleMessage(Object obj) {
            Handler<Throwable> handler;
            Handler<Row> handler2;
            Handler<Void> handler3;
            if (obj == CassandraRowStreamImpl.DONE) {
                synchronized (CassandraRowStreamImpl.this) {
                    handler3 = CassandraRowStreamImpl.this.endHandler;
                }
                if (handler3 != null) {
                    CassandraRowStreamImpl.this.context.emit((Object) null, handler3);
                    return;
                }
                return;
            }
            if (!(obj instanceof StreamItem)) {
                if (obj instanceof Throwable) {
                    Throwable th = (Throwable) obj;
                    synchronized (CassandraRowStreamImpl.this) {
                        handler = CassandraRowStreamImpl.this.exceptionHandler;
                    }
                    if (handler != null) {
                        CassandraRowStreamImpl.this.context.emit(th, handler);
                        return;
                    }
                    return;
                }
                return;
            }
            StreamItem streamItem = (StreamItem) obj;
            synchronized (CassandraRowStreamImpl.this) {
                handler2 = CassandraRowStreamImpl.this.rowHandler;
            }
            CassandraRowStreamImpl.this.executionInfo = streamItem.executionInfo;
            CassandraRowStreamImpl.this.columnDefinitions = streamItem.columnDefinitions;
            if (handler2 != null) {
                CassandraRowStreamImpl.this.context.emit(streamItem.row, handler2);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/vertx/cassandra/impl/CassandraRowStreamImpl$StreamItem.class */
    public static class StreamItem {
        public final ExecutionInfo executionInfo;
        public final ColumnDefinitions columnDefinitions;
        public final Row row;

        StreamItem(ResultSet resultSet, Row row) {
            this.executionInfo = resultSet.getExecutionInfo();
            this.columnDefinitions = resultSet.getColumnDefinitions();
            this.row = row;
        }
    }

    public CassandraRowStreamImpl(Context context) {
        Queue queue = new Queue((ContextInternal) context);
        queue.pause();
        this.context = (ContextInternal) context;
        this.internalQueue = queue;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void init(ResultSet resultSet) {
        this.executionInfo = resultSet.getExecutionInfo();
        this.columnDefinitions = resultSet.getColumnDefinitions();
        this.internalQueue.init(resultSet);
    }

    @Override // io.vertx.cassandra.CassandraRowStream
    public synchronized CassandraRowStream exceptionHandler(Handler<Throwable> handler) {
        this.exceptionHandler = handler;
        return this;
    }

    @Override // io.vertx.cassandra.CassandraRowStream
    public CassandraRowStream handler(Handler<Row> handler) {
        synchronized (this) {
            this.rowHandler = handler;
        }
        if (handler == null) {
            mo3pause();
        } else {
            mo2resume();
        }
        return this;
    }

    @Override // io.vertx.cassandra.CassandraRowStream
    public synchronized CassandraRowStream endHandler(Handler<Void> handler) {
        this.endHandler = handler;
        return this;
    }

    @Override // io.vertx.cassandra.CassandraRowStream
    /* renamed from: pause */
    public CassandraRowStream mo3pause() {
        this.internalQueue.pause();
        return this;
    }

    @Override // io.vertx.cassandra.CassandraRowStream
    /* renamed from: resume */
    public CassandraRowStream mo2resume() {
        return mo1fetch(Long.MAX_VALUE);
    }

    @Override // io.vertx.cassandra.CassandraRowStream
    /* renamed from: fetch */
    public CassandraRowStream mo1fetch(long j) {
        this.internalQueue.fetch(j);
        return this;
    }

    @Override // io.vertx.cassandra.CassandraRowStream
    public ExecutionInfo executionInfo() {
        return this.executionInfo;
    }

    @Override // io.vertx.cassandra.CassandraRowStream
    public ColumnDefinitions columnDefinitions() {
        return this.columnDefinitions;
    }

    @Override // io.vertx.cassandra.CassandraRowStream
    /* renamed from: endHandler */
    public /* bridge */ /* synthetic */ ReadStream mo0endHandler(Handler handler) {
        return endHandler((Handler<Void>) handler);
    }

    @Override // io.vertx.cassandra.CassandraRowStream
    /* renamed from: handler */
    public /* bridge */ /* synthetic */ ReadStream mo4handler(Handler handler) {
        return handler((Handler<Row>) handler);
    }

    @Override // io.vertx.cassandra.CassandraRowStream
    /* renamed from: exceptionHandler */
    public /* bridge */ /* synthetic */ ReadStream mo5exceptionHandler(Handler handler) {
        return exceptionHandler((Handler<Throwable>) handler);
    }

    @Override // io.vertx.cassandra.CassandraRowStream
    /* renamed from: exceptionHandler */
    public /* bridge */ /* synthetic */ StreamBase mo6exceptionHandler(Handler handler) {
        return exceptionHandler((Handler<Throwable>) handler);
    }
}
