package com.github.j5ik2o.pekko.persistence.dynamodb.client.v2;

import com.github.j5ik2o.pekko.persistence.dynamodb.client.StreamSupport;
import com.github.j5ik2o.pekko.persistence.dynamodb.config.BackoffConfig;
import com.github.j5ik2o.pekko.persistence.dynamodb.context.PluginContext;
import com.github.j5ik2o.pekko.persistence.dynamodb.utils.DispatcherUtils$;
import com.github.j5ik2o.pekko.persistence.dynamodb.utils.DispatcherUtils$ApplyV2DispatcherOps$;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.pekko.NotUsed;
import org.apache.pekko.japi.function.Function;
import org.apache.pekko.stream.Graph;
import org.apache.pekko.stream.javadsl.Flow$;
import org.apache.pekko.stream.scaladsl.Concat$;
import org.apache.pekko.stream.scaladsl.Flow;
import org.apache.pekko.stream.scaladsl.Source;
import org.apache.pekko.stream.scaladsl.Source$;
import scala.$less$colon$less$;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Map;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Vector;
import scala.compat.java8.OptionConverters$;
import scala.compat.java8.OptionConverters$RichOptionalGeneric$;
import scala.jdk.CollectionConverters$;
import scala.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.ScalaRunTime$;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemRequest;
import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse;
import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
import software.amazon.awssdk.services.dynamodb.model.DeleteItemResponse;
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
import software.amazon.awssdk.services.dynamodb.model.PutItemResponse;
import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
import software.amazon.awssdk.services.dynamodb.model.UpdateItemResponse;

/* compiled from: StreamWriteClient.scala */
@ScalaSignature(bytes = "\u0006\u0005\u0005Eb\u0001B\b\u0011\u0005\u0005B\u0001\u0002\f\u0001\u0003\u0006\u0004%\t!\f\u0005\ti\u0001\u0011\t\u0011)A\u0005]!AQ\u0007\u0001BC\u0002\u0013\u0005a\u0007\u0003\u0005H\u0001\t\u0005\t\u0015!\u00038\u0011!A\u0005A!b\u0001\n\u0003I\u0005\u0002\u0003(\u0001\u0005\u0003\u0005\u000b\u0011\u0002&\t\u0011=\u0003!Q1A\u0005\u0002AC\u0001b\u0016\u0001\u0003\u0002\u0003\u0006I!\u0015\u0005\u00061\u0002!\t!\u0017\u0005\u0006A\u0002!\t!\u0019\u0005\u0006y\u0002!\t! \u0005\u0007\u0003\u0017\u0001A\u0011A?\t\u000f\u00055\u0001\u0001\"\u0001\u0002\u0010!9\u0011q\u0004\u0001\u0005\u0002\u0005\u0005\"!E*ue\u0016\fWn\u0016:ji\u0016\u001cE.[3oi*\u0011\u0011CE\u0001\u0003mJR!a\u0005\u000b\u0002\r\rd\u0017.\u001a8u\u0015\t)b#\u0001\u0005es:\fWn\u001c3c\u0015\t9\u0002$A\u0006qKJ\u001c\u0018n\u001d;f]\u000e,'BA\r\u001b\u0003\u0015\u0001Xm[6p\u0015\tYB$\u0001\u0004kk%\\'g\u001c\u0006\u0003;y\taaZ5uQV\u0014'\"A\u0010\u0002\u0007\r|Wn\u0001\u0001\u0014\u0007\u0001\u0011\u0003\u0006\u0005\u0002$M5\tAEC\u0001&\u0003\u0015\u00198-\u00197b\u0013\t9CE\u0001\u0004B]f\u0014VM\u001a\t\u0003S)j\u0011AE\u0005\u0003WI\u0011Qb\u0015;sK\u0006l7+\u001e9q_J$\u0018!\u00049mk\u001eLgnQ8oi\u0016DH/F\u0001/!\ty#'D\u00011\u0015\t\tD#A\u0004d_:$X\r\u001f;\n\u0005M\u0002$!\u0004)mk\u001eLgnQ8oi\u0016DH/\u0001\bqYV<\u0017N\\\"p]R,\u0007\u0010\u001e\u0011\u0002\u0017\u0005\u001c\u0018P\\2DY&,g\u000e^\u000b\u0002oA\u00191\u0005\u000f\u001e\n\u0005e\"#AB(qi&|g\u000e\u0005\u0002<\u000b6\tAH\u0003\u0002\u0016{)\u0011ahP\u0001\tg\u0016\u0014h/[2fg*\u0011\u0001)Q\u0001\u0007C^\u001c8\u000fZ6\u000b\u0005\t\u001b\u0015AB1nCj|gNC\u0001E\u0003!\u0019xN\u001a;xCJ,\u0017B\u0001$=\u0005M!\u0015P\\1n_\u0012\u0013\u0017i]=oG\u000ec\u0017.\u001a8u\u00031\t7/\u001f8d\u00072LWM\u001c;!\u0003)\u0019\u0018P\\2DY&,g\u000e^\u000b\u0002\u0015B\u00191\u0005O&\u0011\u0005mb\u0015BA'=\u00059!\u0015P\\1n_\u0012\u00137\t\\5f]R\f1b]=oG\u000ec\u0017.\u001a8uA\u0005\u0011rO]5uK\n\u000b7m[8gM\u000e{gNZ5h+\u0005\t\u0006C\u0001*V\u001b\u0005\u0019&B\u0001+\u0015\u0003\u0019\u0019wN\u001c4jO&\u0011ak\u0015\u0002\u000e\u0005\u0006\u001c7n\u001c4g\u0007>tg-[4\u0002']\u0014\u0018\u000e^3CC\u000e\\wN\u001a4D_:4\u0017n\u001a\u0011\u0002\rqJg.\u001b;?)\u0015QF,\u00180`!\tY\u0006!D\u0001\u0011\u0011\u0015a\u0013\u00021\u0001/\u0011\u0015)\u0014\u00021\u00018\u0011\u0015A\u0015\u00021\u0001K\u0011\u0015y\u0015\u00021\u0001R\u00039!W\r\\3uK&#X-\u001c$m_^,\u0012A\u0019\t\u0006G6|W\u000f_\u0007\u0002I*\u0011QMZ\u0001\tg\u000e\fG.\u00193tY*\u0011q\r[\u0001\u0007gR\u0014X-Y7\u000b\u0005eI'B\u00016l\u0003\u0019\t\u0007/Y2iK*\tA.A\u0002pe\u001eL!A\u001c3\u0003\t\u0019cwn\u001e\t\u0003aNl\u0011!\u001d\u0006\u0003er\nQ!\\8eK2L!\u0001^9\u0003#\u0011+G.\u001a;f\u0013R,WNU3rk\u0016\u001cH\u000f\u0005\u0002qm&\u0011q/\u001d\u0002\u0013\t\u0016dW\r^3Ji\u0016l'+Z:q_:\u001cX\r\u0005\u0002zu6\t\u0001.\u0003\u0002|Q\n9aj\u001c;Vg\u0016$\u0017A\u00052bi\u000eDwK]5uK&#X-\u001c$m_^,\u0012A \t\u0007G6|\u0018Q\u0001=\u0011\u0007A\f\t!C\u0002\u0002\u0004E\u0014QCQ1uG\"<&/\u001b;f\u0013R,WNU3rk\u0016\u001cH\u000fE\u0002q\u0003\u000fI1!!\u0003r\u0005Y\u0011\u0015\r^2i/JLG/Z%uK6\u0014Vm\u001d9p]N,\u0017a\u0007:fGV\u00148/\u001b<f\u0005\u0006$8\r[,sSR,\u0017\n^3n\r2|w/A\u0006qkRLE/Z7GY><XCAA\t!\u001d\u0019W.a\u0005\u0002\u001aa\u00042\u0001]A\u000b\u0013\r\t9\"\u001d\u0002\u000f!V$\u0018\n^3n%\u0016\fX/Z:u!\r\u0001\u00181D\u0005\u0004\u0003;\t(a\u0004)vi&#X-\u001c*fgB|gn]3\u0002\u001dU\u0004H-\u0019;f\u0013R,WN\u00127poV\u0011\u00111\u0005\t\bG6\f)#a\u000by!\r\u0001\u0018qE\u0005\u0004\u0003S\t(!E+qI\u0006$X-\u0013;f[J+\u0017/^3tiB\u0019\u0001/!\f\n\u0007\u0005=\u0012O\u0001\nVa\u0012\fG/Z%uK6\u0014Vm\u001d9p]N,\u0007")
/* loaded from: input_file:com/github/j5ik2o/pekko/persistence/dynamodb/client/v2/StreamWriteClient.class */
public final class StreamWriteClient implements StreamSupport {
    private final PluginContext pluginContext;
    private final Option<DynamoDbAsyncClient> asyncClient;
    private final Option<DynamoDbClient> syncClient;
    private final BackoffConfig writeBackoffConfig;

    public <In, Out> Flow<In, Out, NotUsed> flowWithBackoffSettings(BackoffConfig backoffConfig, Flow<In, Out, NotUsed> flow) {
        return StreamSupport.flowWithBackoffSettings$(this, backoffConfig, flow);
    }

    public PluginContext pluginContext() {
        return this.pluginContext;
    }

    public Option<DynamoDbAsyncClient> asyncClient() {
        return this.asyncClient;
    }

    public Option<DynamoDbClient> syncClient() {
        return this.syncClient;
    }

    public BackoffConfig writeBackoffConfig() {
        return this.writeBackoffConfig;
    }

    public Flow<DeleteItemRequest, DeleteItemResponse, NotUsed> deleteItemFlow() {
        Flow withV2Dispatcher$extension;
        Tuple2 tuple2 = new Tuple2(asyncClient(), syncClient());
        if (tuple2 != null) {
            Some some = (Option) tuple2._1();
            Option option = (Option) tuple2._2();
            if (some instanceof Some) {
                final DynamoDbAsyncClient dynamoDbAsyncClient = (DynamoDbAsyncClient) some.value();
                if (None$.MODULE$.equals(option)) {
                    final StreamWriteClient streamWriteClient = null;
                    withV2Dispatcher$extension = Flow$.MODULE$.create().mapAsync(1, new Function<DeleteItemRequest, CompletableFuture<DeleteItemResponse>>(streamWriteClient, dynamoDbAsyncClient) { // from class: com.github.j5ik2o.pekko.persistence.dynamodb.client.v2.StreamWriteClient$$anon$1
                        private final DynamoDbAsyncClient c$1;

                        public CompletableFuture<DeleteItemResponse> apply(DeleteItemRequest deleteItemRequest) {
                            return this.c$1.deleteItem(deleteItemRequest);
                        }

                        {
                            this.c$1 = dynamoDbAsyncClient;
                        }
                    }).asScala();
                    Flow flow = withV2Dispatcher$extension;
                    Function1 log$default$2 = flow.log$default$2();
                    return flowWithBackoffSettings(writeBackoffConfig(), flow.log("deleteItemFlow", log$default$2, flow.log$default$3("deleteItemFlow", log$default$2)));
                }
            }
        }
        if (tuple2 != null) {
            Option option2 = (Option) tuple2._1();
            Some some2 = (Option) tuple2._2();
            if (None$.MODULE$.equals(option2) && (some2 instanceof Some)) {
                DynamoDbClient dynamoDbClient = (DynamoDbClient) some2.value();
                withV2Dispatcher$extension = DispatcherUtils$ApplyV2DispatcherOps$.MODULE$.withV2Dispatcher$extension(DispatcherUtils$.MODULE$.ApplyV2DispatcherOps(org.apache.pekko.stream.scaladsl.Flow$.MODULE$.apply().map(deleteItemRequest -> {
                    return dynamoDbClient.deleteItem(deleteItemRequest);
                })), pluginContext().pluginConfig());
                Flow flow2 = withV2Dispatcher$extension;
                Function1 log$default$22 = flow2.log$default$2();
                return flowWithBackoffSettings(writeBackoffConfig(), flow2.log("deleteItemFlow", log$default$22, flow2.log$default$3("deleteItemFlow", log$default$22)));
            }
        }
        throw new IllegalStateException("invalid state");
    }

    public Flow<BatchWriteItemRequest, BatchWriteItemResponse, NotUsed> batchWriteItemFlow() {
        Flow withV2Dispatcher$extension;
        Tuple2 tuple2 = new Tuple2(asyncClient(), syncClient());
        if (tuple2 != null) {
            Some some = (Option) tuple2._1();
            Option option = (Option) tuple2._2();
            if (some instanceof Some) {
                final DynamoDbAsyncClient dynamoDbAsyncClient = (DynamoDbAsyncClient) some.value();
                if (None$.MODULE$.equals(option)) {
                    final StreamWriteClient streamWriteClient = null;
                    withV2Dispatcher$extension = Flow$.MODULE$.create().mapAsync(1, new Function<BatchWriteItemRequest, CompletableFuture<BatchWriteItemResponse>>(streamWriteClient, dynamoDbAsyncClient) { // from class: com.github.j5ik2o.pekko.persistence.dynamodb.client.v2.StreamWriteClient$$anon$2
                        private final DynamoDbAsyncClient c$3;

                        public CompletableFuture<BatchWriteItemResponse> apply(BatchWriteItemRequest batchWriteItemRequest) {
                            return this.c$3.batchWriteItem(batchWriteItemRequest);
                        }

                        {
                            this.c$3 = dynamoDbAsyncClient;
                        }
                    }).asScala();
                    Flow flow = withV2Dispatcher$extension;
                    Function1 log$default$2 = flow.log$default$2();
                    return flowWithBackoffSettings(writeBackoffConfig(), flow.log("batchWriteItemFlow", log$default$2, flow.log$default$3("batchWriteItemFlow", log$default$2)));
                }
            }
        }
        if (tuple2 != null) {
            Option option2 = (Option) tuple2._1();
            Some some2 = (Option) tuple2._2();
            if (None$.MODULE$.equals(option2) && (some2 instanceof Some)) {
                DynamoDbClient dynamoDbClient = (DynamoDbClient) some2.value();
                withV2Dispatcher$extension = DispatcherUtils$ApplyV2DispatcherOps$.MODULE$.withV2Dispatcher$extension(DispatcherUtils$.MODULE$.ApplyV2DispatcherOps(org.apache.pekko.stream.scaladsl.Flow$.MODULE$.apply().map(batchWriteItemRequest -> {
                    return dynamoDbClient.batchWriteItem(batchWriteItemRequest);
                })), pluginContext().pluginConfig());
                Flow flow2 = withV2Dispatcher$extension;
                Function1 log$default$22 = flow2.log$default$2();
                return flowWithBackoffSettings(writeBackoffConfig(), flow2.log("batchWriteItemFlow", log$default$22, flow2.log$default$3("batchWriteItemFlow", log$default$22)));
            }
        }
        throw new IllegalStateException("invalid state");
    }

    public Flow<BatchWriteItemRequest, BatchWriteItemResponse, NotUsed> recursiveBatchWriteItemFlow() {
        return loop$1(Source$.MODULE$.empty());
    }

    public Flow<PutItemRequest, PutItemResponse, NotUsed> putItemFlow() {
        Flow withV2Dispatcher$extension;
        Tuple2 tuple2 = new Tuple2(asyncClient(), syncClient());
        if (tuple2 != null) {
            Some some = (Option) tuple2._1();
            Option option = (Option) tuple2._2();
            if (some instanceof Some) {
                final DynamoDbAsyncClient dynamoDbAsyncClient = (DynamoDbAsyncClient) some.value();
                if (None$.MODULE$.equals(option)) {
                    final StreamWriteClient streamWriteClient = null;
                    withV2Dispatcher$extension = Flow$.MODULE$.create().mapAsync(1, new Function<PutItemRequest, CompletableFuture<PutItemResponse>>(streamWriteClient, dynamoDbAsyncClient) { // from class: com.github.j5ik2o.pekko.persistence.dynamodb.client.v2.StreamWriteClient$$anon$3
                        private final DynamoDbAsyncClient c$5;

                        public CompletableFuture<PutItemResponse> apply(PutItemRequest putItemRequest) {
                            return this.c$5.putItem(putItemRequest);
                        }

                        {
                            this.c$5 = dynamoDbAsyncClient;
                        }
                    }).asScala();
                    Flow flow = withV2Dispatcher$extension;
                    Function1 log$default$2 = flow.log$default$2();
                    return flowWithBackoffSettings(writeBackoffConfig(), flow.log("putItemFlow", log$default$2, flow.log$default$3("putItemFlow", log$default$2)));
                }
            }
        }
        if (tuple2 != null) {
            Option option2 = (Option) tuple2._1();
            Some some2 = (Option) tuple2._2();
            if (None$.MODULE$.equals(option2) && (some2 instanceof Some)) {
                DynamoDbClient dynamoDbClient = (DynamoDbClient) some2.value();
                withV2Dispatcher$extension = DispatcherUtils$ApplyV2DispatcherOps$.MODULE$.withV2Dispatcher$extension(DispatcherUtils$.MODULE$.ApplyV2DispatcherOps(org.apache.pekko.stream.scaladsl.Flow$.MODULE$.apply().map(putItemRequest -> {
                    return dynamoDbClient.putItem(putItemRequest);
                })), pluginContext().pluginConfig());
                Flow flow2 = withV2Dispatcher$extension;
                Function1 log$default$22 = flow2.log$default$2();
                return flowWithBackoffSettings(writeBackoffConfig(), flow2.log("putItemFlow", log$default$22, flow2.log$default$3("putItemFlow", log$default$22)));
            }
        }
        throw new IllegalStateException("invalid state");
    }

    public Flow<UpdateItemRequest, UpdateItemResponse, NotUsed> updateItemFlow() {
        Flow withV2Dispatcher$extension;
        Tuple2 tuple2 = new Tuple2(asyncClient(), syncClient());
        if (tuple2 != null) {
            Some some = (Option) tuple2._1();
            Option option = (Option) tuple2._2();
            if (some instanceof Some) {
                final DynamoDbAsyncClient dynamoDbAsyncClient = (DynamoDbAsyncClient) some.value();
                if (None$.MODULE$.equals(option)) {
                    final StreamWriteClient streamWriteClient = null;
                    withV2Dispatcher$extension = Flow$.MODULE$.create().mapAsync(1, new Function<UpdateItemRequest, CompletableFuture<UpdateItemResponse>>(streamWriteClient, dynamoDbAsyncClient) { // from class: com.github.j5ik2o.pekko.persistence.dynamodb.client.v2.StreamWriteClient$$anon$4
                        private final DynamoDbAsyncClient c$7;

                        public CompletableFuture<UpdateItemResponse> apply(UpdateItemRequest updateItemRequest) {
                            return this.c$7.updateItem(updateItemRequest);
                        }

                        {
                            this.c$7 = dynamoDbAsyncClient;
                        }
                    }).asScala();
                    Flow flow = withV2Dispatcher$extension;
                    Function1 log$default$2 = flow.log$default$2();
                    return flowWithBackoffSettings(writeBackoffConfig(), flow.log("updateItemFlow", log$default$2, flow.log$default$3("updateItemFlow", log$default$2)));
                }
            }
        }
        if (tuple2 != null) {
            Option option2 = (Option) tuple2._1();
            Some some2 = (Option) tuple2._2();
            if (None$.MODULE$.equals(option2) && (some2 instanceof Some)) {
                DynamoDbClient dynamoDbClient = (DynamoDbClient) some2.value();
                withV2Dispatcher$extension = DispatcherUtils$ApplyV2DispatcherOps$.MODULE$.withV2Dispatcher$extension(DispatcherUtils$.MODULE$.ApplyV2DispatcherOps(org.apache.pekko.stream.scaladsl.Flow$.MODULE$.apply().map(updateItemRequest -> {
                    return dynamoDbClient.updateItem(updateItemRequest);
                })), pluginContext().pluginConfig());
                Flow flow2 = withV2Dispatcher$extension;
                Function1 log$default$22 = flow2.log$default$2();
                return flowWithBackoffSettings(writeBackoffConfig(), flow2.log("updateItemFlow", log$default$22, flow2.log$default$3("updateItemFlow", log$default$22)));
            }
        }
        throw new IllegalStateException("invalid state");
    }

    public static final /* synthetic */ Graph $anonfun$recursiveBatchWriteItemFlow$8(int i) {
        return Concat$.MODULE$.apply(i);
    }

    public static final /* synthetic */ Graph $anonfun$recursiveBatchWriteItemFlow$9(int i) {
        return Concat$.MODULE$.apply(i);
    }

    private final Flow loop$1(Source source) {
        return org.apache.pekko.stream.scaladsl.Flow$.MODULE$.apply().flatMapConcat(batchWriteItemRequest -> {
            return Source$.MODULE$.single(batchWriteItemRequest).via(this.batchWriteItemFlow()).flatMapConcat(batchWriteItemResponse -> {
                if (!batchWriteItemResponse.sdkHttpResponse().isSuccessful()) {
                    return Source$.MODULE$.failed(new IOException(new StringBuilder(12).append("statusCode: ").append(batchWriteItemResponse.sdkHttpResponse().statusCode()).append(OptionConverters$RichOptionalGeneric$.MODULE$.asScala$extension(OptionConverters$.MODULE$.RichOptionalGeneric(batchWriteItemResponse.sdkHttpResponse().statusText())).fold(() -> {
                        return "";
                    }, str -> {
                        return new StringBuilder(2).append(", ").append(str).toString();
                    })).toString()));
                }
                Vector vector = (Vector) Option$.MODULE$.apply(batchWriteItemResponse.unprocessedItems()).map(map -> {
                    return CollectionConverters$.MODULE$.MapHasAsScala(map).asScala().toMap($less$colon$less$.MODULE$.refl());
                }).map(map2 -> {
                    return map2.map(tuple2 -> {
                        if (tuple2 == null) {
                            throw new MatchError(tuple2);
                        }
                        return new Tuple2((String) tuple2._1(), CollectionConverters$.MODULE$.ListHasAsScala((List) tuple2._2()).asScala().toVector());
                    });
                }).flatMap(map3 -> {
                    return map3.get(this.pluginContext().pluginConfig().tableName());
                }).getOrElse(() -> {
                    return package$.MODULE$.Vector().empty();
                });
                return vector.nonEmpty() ? Source$.MODULE$.single((BatchWriteItemRequest) batchWriteItemRequest.toBuilder().requestItems(CollectionConverters$.MODULE$.MapHasAsJava((Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(this.pluginContext().pluginConfig().tableName()), CollectionConverters$.MODULE$.SeqHasAsJava(vector).asJava())}))).asJava()).build()).via(this.loop$1(Source$.MODULE$.combine(source, Source$.MODULE$.single(batchWriteItemResponse), Nil$.MODULE$, obj -> {
                    return $anonfun$recursiveBatchWriteItemFlow$8(BoxesRunTime.unboxToInt(obj));
                }))) : Source$.MODULE$.combine(source, Source$.MODULE$.single(batchWriteItemResponse), Nil$.MODULE$, obj2 -> {
                    return $anonfun$recursiveBatchWriteItemFlow$9(BoxesRunTime.unboxToInt(obj2));
                });
            });
        });
    }

    public StreamWriteClient(PluginContext pluginContext, Option<DynamoDbAsyncClient> option, Option<DynamoDbClient> option2, BackoffConfig backoffConfig) {
        this.pluginContext = pluginContext;
        this.asyncClient = option;
        this.syncClient = option2;
        this.writeBackoffConfig = backoffConfig;
        StreamSupport.$init$(this);
    }
}
