package io.vertx.cassandra.impl;

import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import com.datastax.oss.driver.api.core.cql.Statement;
import com.datastax.oss.driver.api.core.metadata.Metadata;
import io.vertx.cassandra.CassandraClient;
import io.vertx.cassandra.CassandraClientOptions;
import io.vertx.cassandra.CassandraRowStream;
import io.vertx.cassandra.ResultSet;
import io.vertx.cassandra.impl.tracing.QueryRequest;
import io.vertx.cassandra.impl.tracing.RequestTags;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.spi.tracing.SpanKind;
import io.vertx.core.spi.tracing.TagExtractor;
import io.vertx.core.spi.tracing.VertxTracer;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collector;

/* loaded from: input_file:io/vertx/cassandra/impl/CassandraClientImpl.class */
public class CassandraClientImpl implements CassandraClient {
    static final String HOLDERS_LOCAL_MAP_NAME = "__vertx.cassandraClient.sessionHolders";
    final VertxInternal vertx;
    private final VertxTracer tracer;
    private final String clientName;
    private final CassandraClientOptions options;
    private final Map<String, SessionHolder> holders;
    private final ContextInternal creatingContext;
    private boolean closed;

    public CassandraClientImpl(Vertx vertx, String str, CassandraClientOptions cassandraClientOptions) {
        Objects.requireNonNull(vertx, "vertx");
        Objects.requireNonNull(str, "clientName");
        Objects.requireNonNull(cassandraClientOptions, "options");
        this.vertx = (VertxInternal) vertx;
        this.tracer = ((VertxInternal) vertx).tracer();
        this.clientName = str;
        this.options = cassandraClientOptions;
        this.creatingContext = ((VertxInternal) vertx).getOrCreateContext();
        this.holders = vertx.sharedData().getLocalMap(HOLDERS_LOCAL_MAP_NAME);
        this.holders.compute(str, (str2, sessionHolder) -> {
            return sessionHolder == null ? new SessionHolder() : sessionHolder.increment();
        });
        this.creatingContext.addCloseHook((v1) -> {
            close(v1);
        });
    }

    @Override // io.vertx.cassandra.CassandraClient
    public synchronized boolean isConnected() {
        CqlSession cqlSession;
        return (this.closed || (cqlSession = this.holders.get(this.clientName).session) == null || cqlSession.isClosed()) ? false : true;
    }

    @Override // io.vertx.cassandra.CassandraClient
    public CassandraClient executeWithFullFetch(String str, Handler<AsyncResult<List<Row>>> handler) {
        Util.setHandler(executeWithFullFetch(str), handler);
        return this;
    }

    @Override // io.vertx.cassandra.CassandraClient
    public Future<List<Row>> executeWithFullFetch(String str) {
        return executeWithFullFetch((Statement) SimpleStatement.newInstance(str));
    }

    @Override // io.vertx.cassandra.CassandraClient
    public CassandraClient executeWithFullFetch(Statement statement, Handler<AsyncResult<List<Row>>> handler) {
        Util.setHandler(executeWithFullFetch(statement), handler);
        return this;
    }

    @Override // io.vertx.cassandra.CassandraClient
    public Future<List<Row>> executeWithFullFetch(Statement statement) {
        return execute(statement).flatMap((v0) -> {
            return v0.all();
        });
    }

    @Override // io.vertx.cassandra.CassandraClient
    public CassandraClient execute(String str, Handler<AsyncResult<ResultSet>> handler) {
        Util.setHandler(execute(str), handler);
        return this;
    }

    @Override // io.vertx.cassandra.CassandraClient
    public Future<ResultSet> execute(String str) {
        return execute((Statement) SimpleStatement.newInstance(str));
    }

    @Override // io.vertx.cassandra.CassandraClient
    public <R> CassandraClient execute(String str, Collector<Row, ?, R> collector, Handler<AsyncResult<R>> handler) {
        Util.setHandler(execute(str, collector), handler);
        return this;
    }

    @Override // io.vertx.cassandra.CassandraClient
    public <R> Future<R> execute(String str, Collector<Row, ?, R> collector) {
        return execute((Statement) SimpleStatement.newInstance(str), (Collector) collector);
    }

    @Override // io.vertx.cassandra.CassandraClient
    public CassandraClient execute(Statement statement, Handler<AsyncResult<ResultSet>> handler) {
        Util.setHandler(execute(statement), handler);
        return this;
    }

    @Override // io.vertx.cassandra.CassandraClient
    public Future<ResultSet> execute(Statement statement) {
        return executeInternal(statement).map(asyncResultSet -> {
            return new ResultSetImpl(asyncResultSet, this.vertx);
        });
    }

    private Future<AsyncResultSet> executeInternal(Statement statement) {
        return getSession(this.vertx.getOrCreateContext()).flatMap(cqlSession -> {
            Object sendRequest = this.tracer != null ? sendRequest(cqlSession, statement) : null;
            Future fromCompletionStage = Future.fromCompletionStage(cqlSession.executeAsync(statement), this.vertx.getContext());
            if (this.tracer != null) {
                Object obj = sendRequest;
                fromCompletionStage = fromCompletionStage.onComplete(asyncResult -> {
                    receiveResponse(obj, asyncResult);
                });
            }
            return fromCompletionStage;
        });
    }

    private Object sendRequest(CqlSession cqlSession, Statement statement) {
        return this.tracer.sendRequest(this.vertx.getContext(), SpanKind.RPC, this.options.getTracingPolicy(), new QueryRequest(cqlSession, statement), "Query", (obj, obj2) -> {
        }, RequestTags.REQUEST_TAG_EXTRACTOR);
    }

    private void receiveResponse(Object obj, AsyncResult<AsyncResultSet> asyncResult) {
        this.tracer.receiveResponse(this.vertx.getContext(), (Object) null, obj, asyncResult.cause(), TagExtractor.empty());
    }

    @Override // io.vertx.cassandra.CassandraClient
    public <R> CassandraClient execute(Statement statement, Collector<Row, ?, R> collector, Handler<AsyncResult<R>> handler) {
        Util.setHandler(execute(statement, collector), handler);
        return this;
    }

    @Override // io.vertx.cassandra.CassandraClient
    public <R> Future<R> execute(Statement statement, Collector<Row, ?, R> collector) {
        return executeAndCollect(statement, collector);
    }

    private <C, R> Future<R> executeAndCollect(Statement statement, Collector<Row, C, R> collector) {
        C c = collector.supplier().get();
        BiConsumer<C, Row> accumulator = collector.accumulator();
        Function<C, R> finisher = collector.finisher();
        return queryStream(statement).flatMap(cassandraRowStream -> {
            Promise promise = Promise.promise();
            cassandraRowStream.endHandler(r6 -> {
                promise.complete(finisher.apply(c));
            });
            cassandraRowStream.handler(row -> {
                accumulator.accept(c, row);
            });
            promise.getClass();
            cassandraRowStream.exceptionHandler(promise::fail);
            return promise.future();
        });
    }

    @Override // io.vertx.cassandra.CassandraClient
    public CassandraClient prepare(String str, Handler<AsyncResult<PreparedStatement>> handler) {
        Util.setHandler(prepare(str), handler);
        return this;
    }

    @Override // io.vertx.cassandra.CassandraClient
    public Future<PreparedStatement> prepare(String str) {
        return getSession(this.vertx.getOrCreateContext()).flatMap(cqlSession -> {
            return Future.fromCompletionStage(cqlSession.prepareAsync(str), this.vertx.getContext());
        });
    }

    @Override // io.vertx.cassandra.CassandraClient
    public CassandraClient prepare(SimpleStatement simpleStatement, Handler<AsyncResult<PreparedStatement>> handler) {
        Util.setHandler(prepare(simpleStatement), handler);
        return this;
    }

    @Override // io.vertx.cassandra.CassandraClient
    public Future<PreparedStatement> prepare(SimpleStatement simpleStatement) {
        return getSession(this.vertx.getOrCreateContext()).flatMap(cqlSession -> {
            return Future.fromCompletionStage(cqlSession.prepareAsync(simpleStatement), this.vertx.getContext());
        });
    }

    @Override // io.vertx.cassandra.CassandraClient
    public CassandraClient queryStream(String str, Handler<AsyncResult<CassandraRowStream>> handler) {
        return queryStream((Statement) SimpleStatement.newInstance(str), handler);
    }

    @Override // io.vertx.cassandra.CassandraClient
    public Future<CassandraRowStream> queryStream(String str) {
        Promise promise = Promise.promise();
        queryStream(str, (Handler<AsyncResult<CassandraRowStream>>) promise);
        return promise.future();
    }

    @Override // io.vertx.cassandra.CassandraClient
    public CassandraClient queryStream(Statement statement, Handler<AsyncResult<CassandraRowStream>> handler) {
        Util.setHandler(queryStream(statement), handler);
        return this;
    }

    @Override // io.vertx.cassandra.CassandraClient
    public Future<CassandraRowStream> queryStream(Statement statement) {
        return executeInternal(statement).map(asyncResultSet -> {
            return new CassandraRowStreamImpl(this.vertx.getContext(), new ResultSetImpl(asyncResultSet, this.vertx));
        });
    }

    @Override // io.vertx.cassandra.CassandraClient
    public Future<Void> close() {
        ContextInternal orCreateContext = this.vertx.getOrCreateContext();
        if (raiseCloseFlag()) {
            while (true) {
                SessionHolder sessionHolder = this.holders.get(this.clientName);
                SessionHolder decrement = sessionHolder.decrement();
                if (decrement.refCount == 0) {
                    if (this.holders.remove(this.clientName, sessionHolder)) {
                        if (sessionHolder.session != null) {
                            return Future.fromCompletionStage(sessionHolder.session.closeAsync(), orCreateContext);
                        }
                    }
                } else if (this.holders.replace(this.clientName, sessionHolder, decrement)) {
                    break;
                }
            }
        }
        return orCreateContext.succeededFuture();
    }

    @Override // io.vertx.cassandra.CassandraClient
    public CassandraClient close(Handler<AsyncResult<Void>> handler) {
        Util.setHandler(close(), handler);
        return this;
    }

    @Override // io.vertx.cassandra.CassandraClient
    public Future<Metadata> metadata() {
        return getSession(this.vertx.getOrCreateContext()).map((v0) -> {
            return v0.getMetadata();
        });
    }

    @Override // io.vertx.cassandra.CassandraClient
    public void metadata(Handler<AsyncResult<Metadata>> handler) {
        metadata().onComplete(handler);
    }

    private synchronized boolean raiseCloseFlag() {
        if (this.closed) {
            return false;
        }
        this.closed = true;
        return true;
    }

    synchronized Future<CqlSession> getSession(ContextInternal contextInternal) {
        if (this.closed) {
            return contextInternal.failedFuture("Client is closed");
        }
        SessionHolder sessionHolder = this.holders.get(this.clientName);
        return sessionHolder.session != null ? contextInternal.succeededFuture(sessionHolder.session) : contextInternal.executeBlocking(promise -> {
            connect(promise);
        }, sessionHolder.connectionQueue);
    }

    private void connect(Promise<CqlSession> promise) {
        SessionHolder sessionHolder = this.holders.get(this.clientName);
        if (sessionHolder == null) {
            promise.fail("Client closed while connecting");
            return;
        }
        if (sessionHolder.session != null) {
            promise.complete(sessionHolder.session);
            return;
        }
        CqlSession cqlSession = (CqlSession) this.options.dataStaxClusterBuilder().build();
        SessionHolder compute = this.holders.compute(this.clientName, (str, sessionHolder2) -> {
            if (sessionHolder2 == null) {
                return null;
            }
            return sessionHolder2.connected(cqlSession);
        });
        if (compute != null) {
            promise.complete(compute.session);
        } else {
            try {
                cqlSession.close();
            } catch (Exception e) {
            }
            promise.fail("Client closed while connecting");
        }
    }
}
