package org.noear.solon.data.rx.sql.impl;

import io.r2dbc.spi.Connection;
import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.Result;
import io.r2dbc.spi.Statement;
import java.util.Collection;
import java.util.function.Supplier;
import org.noear.solon.data.rx.sql.RxSqlCommand;
import org.noear.solon.data.rx.sql.RxSqlConfiguration;
import org.noear.solon.data.rx.sql.RxSqlQuerier;
import org.noear.solon.data.rx.sql.bound.RxRowConverter;
import org.noear.solon.data.rx.sql.bound.RxStatementBinder;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/noear/solon/data/rx/sql/impl/SimpleRxSqlQuerier.class */
public class SimpleRxSqlQuerier implements RxSqlQuerier {
    private static final DefaultRxBinder DEFAULT_BINDER = new DefaultRxBinder();
    private final ConnectionFactory dataSource;
    private final String commandText;
    private RxSqlCommand command;

    public SimpleRxSqlQuerier(ConnectionFactory connectionFactory, String str) {
        this.dataSource = connectionFactory;
        this.commandText = str;
    }

    @Override // org.noear.solon.data.rx.sql.RxSqlQuerier
    public RxSqlQuerier params(Object... objArr) {
        this.command = new RxSqlCommand(this.dataSource, this.commandText, objArr, DEFAULT_BINDER);
        return this;
    }

    @Override // org.noear.solon.data.rx.sql.RxSqlQuerier
    public <S> RxSqlQuerier params(S s, RxStatementBinder<S> rxStatementBinder) {
        this.command = new RxSqlCommand(this.dataSource, this.commandText, s, rxStatementBinder);
        return this;
    }

    @Override // org.noear.solon.data.rx.sql.RxSqlQuerier
    public RxSqlQuerier params(Collection<Object[]> collection) {
        this.command = new RxSqlCommand(this.dataSource, this.commandText, (Collection) collection, (RxStatementBinder) DEFAULT_BINDER);
        return this;
    }

    @Override // org.noear.solon.data.rx.sql.RxSqlQuerier
    public <S> RxSqlQuerier params(Collection<S> collection, Supplier<RxStatementBinder<S>> supplier) {
        this.command = new RxSqlCommand(this.dataSource, this.commandText, (Collection) collection, (RxStatementBinder) supplier.get());
        return this;
    }

    @Override // org.noear.solon.data.rx.sql.RxSqlQuerier
    public <T> Mono<T> queryValue(Class<T> cls) {
        return queryRow((row, rowMetadata) -> {
            return row.get(0);
        });
    }

    @Override // org.noear.solon.data.rx.sql.RxSqlQuerier
    public <T> Flux<T> queryValueList(Class<T> cls) {
        return queryRowList((row, rowMetadata) -> {
            return row.get(0);
        });
    }

    @Override // org.noear.solon.data.rx.sql.RxSqlQuerier
    public <T> Mono<T> queryRow(Class<T> cls) {
        return queryRow(DefaultRxConverter.getInstance().create(cls));
    }

    @Override // org.noear.solon.data.rx.sql.RxSqlQuerier
    public <T> Mono<T> queryRow(RxRowConverter<T> rxRowConverter) {
        return Mono.from(RxSqlConfiguration.doIntercept(this.command, () -> {
            return queryRowDo(rxRowConverter);
        }));
    }

    protected <T> Mono<T> queryRowDo(RxRowConverter<T> rxRowConverter) {
        return Mono.from(getConnection()).flatMapMany(connection -> {
            return buildStatement(connection, this.command, false);
        }).flatMap(result -> {
            rxRowConverter.getClass();
            return result.map(rxRowConverter::convert);
        }).take(1L).singleOrEmpty();
    }

    @Override // org.noear.solon.data.rx.sql.RxSqlQuerier
    public <T> Flux<T> queryRowList(Class<T> cls) {
        return queryRowList(DefaultRxConverter.getInstance().create(cls));
    }

    @Override // org.noear.solon.data.rx.sql.RxSqlQuerier
    public <T> Flux<T> queryRowList(RxRowConverter<T> rxRowConverter) {
        return Flux.from(RxSqlConfiguration.doIntercept(this.command, () -> {
            return queryRowListDo(rxRowConverter);
        }));
    }

    protected <T> Flux<T> queryRowListDo(RxRowConverter<T> rxRowConverter) {
        return Mono.from(getConnection()).flatMapMany(connection -> {
            return buildStatement(connection, this.command, false);
        }).flatMap(result -> {
            rxRowConverter.getClass();
            return result.map(rxRowConverter::convert);
        });
    }

    @Override // org.noear.solon.data.rx.sql.RxSqlQuerier
    public Mono<Long> update() {
        return Mono.from(RxSqlConfiguration.doIntercept(this.command, this::updateDo));
    }

    protected Mono<Long> updateDo() {
        return Mono.from(getConnection()).flatMapMany(connection -> {
            return buildStatement(connection, this.command, false);
        }).flatMap(result -> {
            return result.getRowsUpdated();
        }).take(1L).singleOrEmpty();
    }

    @Override // org.noear.solon.data.rx.sql.RxSqlQuerier
    public <T> Mono<T> updateReturnKey(Class<T> cls) {
        return Mono.from(RxSqlConfiguration.doIntercept(this.command, this::updateReturnKeyDo));
    }

    protected Mono updateReturnKeyDo() {
        return Mono.from(getConnection()).flatMapMany(connection -> {
            return buildStatement(connection, this.command, true);
        }).flatMap(result -> {
            return result.map(readable -> {
                return readable.get(0);
            });
        }).take(1L).singleOrEmpty();
    }

    @Override // org.noear.solon.data.rx.sql.RxSqlQuerier
    public Flux<Long> updateBatch() {
        return Flux.from(RxSqlConfiguration.doIntercept(this.command, this::updateBatchDo));
    }

    protected Flux<Long> updateBatchDo() {
        return Mono.from(getConnection()).flatMapMany(connection -> {
            return buildStatement(connection, this.command, false);
        }).flatMap(result -> {
            return result.getRowsUpdated();
        });
    }

    protected Publisher<? extends Result> buildStatement(Connection connection, RxSqlCommand rxSqlCommand, boolean z) {
        Statement createStatement = connection.createStatement(rxSqlCommand.getSql());
        if (z) {
            createStatement.returnGeneratedValues(new String[0]);
        }
        rxSqlCommand.fill(createStatement);
        return createStatement.execute();
    }

    protected Publisher<? extends Connection> getConnection() {
        return this.dataSource.create();
    }
}
