package com.github.j5ik2o.pekko.persistence.dynamodb.journal.dao;

import com.github.j5ik2o.pekko.persistence.dynamodb.journal.JournalPluginContext;
import com.github.j5ik2o.pekko.persistence.dynamodb.journal.JournalRow;
import com.github.j5ik2o.pekko.persistence.dynamodb.journal.serialization.FlowPersistentReprSerializer;
import com.github.j5ik2o.pekko.persistence.dynamodb.metrics.MetricsReporter;
import com.github.j5ik2o.pekko.persistence.dynamodb.model.PersistenceId;
import com.github.j5ik2o.pekko.persistence.dynamodb.model.SequenceNumber;
import com.github.j5ik2o.pekko.persistence.dynamodb.utils.LoggingSupport;
import org.apache.pekko.Done;
import org.apache.pekko.NotUsed;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.stream.Attributes;
import org.apache.pekko.stream.Attributes$;
import org.apache.pekko.stream.Attributes$LogLevels$;
import org.apache.pekko.stream.KillSwitches$;
import org.apache.pekko.stream.Materializer;
import org.apache.pekko.stream.OverflowStrategy;
import org.apache.pekko.stream.QueueOfferResult;
import org.apache.pekko.stream.QueueOfferResult$Dropped$;
import org.apache.pekko.stream.QueueOfferResult$Enqueued$;
import org.apache.pekko.stream.QueueOfferResult$Failure$;
import org.apache.pekko.stream.QueueOfferResult$QueueClosed$;
import org.apache.pekko.stream.SystemMaterializer$;
import org.apache.pekko.stream.UniqueKillSwitch;
import org.apache.pekko.stream.scaladsl.Flow;
import org.apache.pekko.stream.scaladsl.Flow$;
import org.apache.pekko.stream.scaladsl.Keep$;
import org.apache.pekko.stream.scaladsl.Sink$;
import org.apache.pekko.stream.scaladsl.Source;
import org.apache.pekko.stream.scaladsl.Source$;
import org.apache.pekko.stream.scaladsl.SourceQueueWithComplete;
import org.slf4j.Logger;
import scala.Int$;
import scala.MatchError;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some$;
import scala.Tuple2;
import scala.Tuple2$;
import scala.collection.immutable.Seq;
import scala.collection.immutable.Vector;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.Future$;
import scala.concurrent.Promise;
import scala.concurrent.Promise$;
import scala.package$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;
import scala.runtime.ScalaRunTime$;
import scala.runtime.Statics;

/* compiled from: WriteJournalDaoImpl.scala */
/* loaded from: input_file:com/github/j5ik2o/pekko/persistence/dynamodb/journal/dao/WriteJournalDaoImpl.class */
public final class WriteJournalDaoImpl implements JournalDaoWithUpdates, DaoSupport, LoggingSupport {
    private Logger logger;
    private final JournalPluginContext pluginContext;
    private final JournalRowWriteDriver journalRowDriver;
    private final FlowPersistentReprSerializer<JournalRow> serializer;
    private final ExecutionContext ec;
    private final Materializer mat;
    private final int queueBufferSize;
    private final int queueParallelism;
    private final int writeParallelism;
    private final Attributes logLevels;
    private final OverflowStrategy queueOverflowStrategy;
    private final Seq<Tuple2<SourceQueueWithComplete<Tuple2<Promise<Object>, Seq<JournalRow>>>, UniqueKillSwitch>> putQueues;
    private final Seq<Tuple2<SourceQueueWithComplete<Tuple2<Promise<Object>, Seq<PersistenceIdWithSeqNr>>>, UniqueKillSwitch>> deleteQueues;

    public WriteJournalDaoImpl(JournalPluginContext journalPluginContext, JournalRowWriteDriver journalRowWriteDriver, FlowPersistentReprSerializer<JournalRow> flowPersistentReprSerializer, ExecutionContext executionContext, ActorSystem actorSystem) {
        this.pluginContext = journalPluginContext;
        this.journalRowDriver = journalRowWriteDriver;
        this.serializer = flowPersistentReprSerializer;
        this.ec = executionContext;
        LoggingSupport.$init$(this);
        this.mat = SystemMaterializer$.MODULE$.apply(actorSystem).materializer();
        this.queueBufferSize = journalPluginContext.m7pluginConfig().queueEnable() ? journalPluginContext.m7pluginConfig().queueBufferSize() : 0;
        this.queueParallelism = journalPluginContext.m7pluginConfig().queueEnable() ? journalPluginContext.m7pluginConfig().queueParallelism() : 0;
        this.writeParallelism = journalPluginContext.m7pluginConfig().writeParallelism();
        this.logLevels = Attributes$.MODULE$.logLevels(Attributes$LogLevels$.MODULE$.Debug(), Attributes$LogLevels$.MODULE$.Debug(), Attributes$LogLevels$.MODULE$.Error());
        this.queueOverflowStrategy = journalPluginContext.m7pluginConfig().queueOverflowStrategy();
        this.putQueues = RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(1), this.queueParallelism).map(obj -> {
            return $init$$$anonfun$1(BoxesRunTime.unboxToInt(obj));
        });
        this.deleteQueues = RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(1), this.queueParallelism).map(obj2 -> {
            return $init$$$anonfun$2(BoxesRunTime.unboxToInt(obj2));
        });
        Statics.releaseFence();
    }

    @Override // com.github.j5ik2o.pekko.persistence.dynamodb.journal.dao.DaoSupport
    public /* bridge */ /* synthetic */ Source getMessagesAsJournalRow(PersistenceId persistenceId, SequenceNumber sequenceNumber, SequenceNumber sequenceNumber2, long j, Option option) {
        Source messagesAsJournalRow;
        messagesAsJournalRow = getMessagesAsJournalRow(persistenceId, sequenceNumber, sequenceNumber2, j, option);
        return messagesAsJournalRow;
    }

    @Override // com.github.j5ik2o.pekko.persistence.dynamodb.journal.dao.DaoSupport
    public /* bridge */ /* synthetic */ Option getMessagesAsJournalRow$default$5() {
        Option messagesAsJournalRow$default$5;
        messagesAsJournalRow$default$5 = getMessagesAsJournalRow$default$5();
        return messagesAsJournalRow$default$5;
    }

    @Override // com.github.j5ik2o.pekko.persistence.dynamodb.journal.dao.JournalDaoWithReadMessages, com.github.j5ik2o.pekko.persistence.dynamodb.journal.dao.DaoSupport
    public /* bridge */ /* synthetic */ Source getMessagesAsPersistentRepr(PersistenceId persistenceId, SequenceNumber sequenceNumber, SequenceNumber sequenceNumber2, long j, Option option) {
        Source messagesAsPersistentRepr;
        messagesAsPersistentRepr = getMessagesAsPersistentRepr(persistenceId, sequenceNumber, sequenceNumber2, j, option);
        return messagesAsPersistentRepr;
    }

    @Override // com.github.j5ik2o.pekko.persistence.dynamodb.journal.dao.JournalDaoWithReadMessages, com.github.j5ik2o.pekko.persistence.dynamodb.journal.dao.DaoSupport
    public /* bridge */ /* synthetic */ Option getMessagesAsPersistentRepr$default$5() {
        Option messagesAsPersistentRepr$default$5;
        messagesAsPersistentRepr$default$5 = getMessagesAsPersistentRepr$default$5();
        return messagesAsPersistentRepr$default$5;
    }

    @Override // com.github.j5ik2o.pekko.persistence.dynamodb.journal.dao.JournalDaoWithReadMessages, com.github.j5ik2o.pekko.persistence.dynamodb.journal.dao.DaoSupport
    public /* bridge */ /* synthetic */ Source getMessagesAsPersistentReprWithBatch(String str, long j, long j2, int i, Option option) {
        Source messagesAsPersistentReprWithBatch;
        messagesAsPersistentReprWithBatch = getMessagesAsPersistentReprWithBatch(str, j, j2, i, option);
        return messagesAsPersistentReprWithBatch;
    }

    public Logger logger() {
        return this.logger;
    }

    public void com$github$j5ik2o$pekko$persistence$dynamodb$utils$LoggingSupport$_setter_$logger_$eq(Logger logger) {
        this.logger = logger;
    }

    @Override // com.github.j5ik2o.pekko.persistence.dynamodb.journal.dao.DaoSupport
    public JournalRowWriteDriver journalRowDriver() {
        return this.journalRowDriver;
    }

    @Override // com.github.j5ik2o.pekko.persistence.dynamodb.journal.dao.DaoSupport
    public FlowPersistentReprSerializer<JournalRow> serializer() {
        return this.serializer;
    }

    @Override // com.github.j5ik2o.pekko.persistence.dynamodb.journal.dao.DaoSupport
    public ExecutionContext ec() {
        return this.ec;
    }

    @Override // com.github.j5ik2o.pekko.persistence.dynamodb.journal.dao.DaoSupport
    public Option<MetricsReporter> metricsReporter() {
        return this.pluginContext.metricsReporter();
    }

    @Override // com.github.j5ik2o.pekko.persistence.dynamodb.journal.dao.DaoSupport
    public Materializer mat() {
        return this.mat;
    }

    private Future<Done> internalPutStream(Promise<Object> promise, Seq<JournalRow> seq) {
        return (Future) (seq.size() == 1 ? (Source) Source$.MODULE$.single(seq.head()).batch(Int$.MODULE$.int2long(this.pluginContext.m7pluginConfig().clientConfig().batchWriteItemLimit()), journalRow -> {
            return (Vector) package$.MODULE$.Vector().apply(ScalaRunTime$.MODULE$.wrapRefArray(new JournalRow[]{journalRow}));
        }, (vector, journalRow2) -> {
            return (Vector) vector.$colon$plus(journalRow2);
        }).flatMapConcat(vector2 -> {
            return vector2.size() == 1 ? Source$.MODULE$.single(vector2.head()).via(journalRowDriver().singlePutJournalRowFlow()) : Source$.MODULE$.single(vector2).via(journalRowDriver().multiPutJournalRowsFlow());
        }) : seq.size() > this.pluginContext.m7pluginConfig().clientConfig().batchWriteItemLimit() ? (Source) Source$.MODULE$.apply(seq.toVector()).grouped(this.pluginContext.m7pluginConfig().clientConfig().batchWriteItemLimit()).via(journalRowDriver().multiPutJournalRowsFlow()).fold(BoxesRunTime.boxToLong(0L), (j, j2) -> {
            return j + j2;
        }) : Source$.MODULE$.single(seq).via(journalRowDriver().multiPutJournalRowsFlow())).map(obj -> {
            return internalPutStream$$anonfun$1(promise, BoxesRunTime.unboxToLong(obj));
        }).recover(new WriteJournalDaoImpl$$anon$1(promise)).runWith(Sink$.MODULE$.ignore(), mat());
    }

    private Tuple2<SourceQueueWithComplete<Tuple2<Promise<Object>, Seq<JournalRow>>>, UniqueKillSwitch> putQueue() {
        Tuple2 tuple2 = (Tuple2) Source$.MODULE$.queue(this.queueBufferSize, this.queueOverflowStrategy).viaMat(KillSwitches$.MODULE$.single(), Keep$.MODULE$.both()).mapAsync(this.writeParallelism, tuple22 -> {
            if (tuple22 != null) {
                return internalPutStream((Promise) tuple22._1(), (Seq) tuple22._2());
            }
            throw new MatchError(tuple22);
        }).toMat(Sink$.MODULE$.ignore(), Keep$.MODULE$.both()).withAttributes(this.logLevels).run(mat());
        return Tuple2$.MODULE$.apply(((Tuple2) tuple2._1())._1(), ((Tuple2) tuple2._1())._2());
    }

    private int queueIdFrom(PersistenceId persistenceId) {
        return Math.abs(Statics.anyHash(persistenceId.asString())) % this.queueParallelism;
    }

    private SourceQueueWithComplete<Tuple2<Promise<Object>, Seq<JournalRow>>> selectPutQueue(PersistenceId persistenceId) {
        return (SourceQueueWithComplete) ((Tuple2) this.putQueues.apply(queueIdFrom(persistenceId)))._1();
    }

    private Future<Done> internalDeleteStream(Promise<Object> promise, Seq<PersistenceIdWithSeqNr> seq) {
        return (Future) (seq.size() == 1 ? (Source) Source$.MODULE$.single(seq.head()).batch(Int$.MODULE$.int2long(this.pluginContext.m7pluginConfig().clientConfig().batchWriteItemLimit()), persistenceIdWithSeqNr -> {
            return (Vector) package$.MODULE$.Vector().apply(ScalaRunTime$.MODULE$.wrapRefArray(new PersistenceIdWithSeqNr[]{persistenceIdWithSeqNr}));
        }, (vector, persistenceIdWithSeqNr2) -> {
            return (Vector) vector.$colon$plus(persistenceIdWithSeqNr2);
        }).flatMapConcat(vector2 -> {
            return vector2.size() == 1 ? Source$.MODULE$.single(vector2.head()).via(journalRowDriver().singleDeleteJournalRowFlow()) : Source$.MODULE$.single(vector2).via(journalRowDriver().multiDeleteJournalRowsFlow());
        }) : seq.size() > this.pluginContext.m7pluginConfig().clientConfig().batchWriteItemLimit() ? (Source) Source$.MODULE$.apply(seq.toVector()).grouped(this.pluginContext.m7pluginConfig().clientConfig().batchWriteItemLimit()).via(journalRowDriver().multiDeleteJournalRowsFlow()).fold(BoxesRunTime.boxToLong(0L), (j, j2) -> {
            return j + j2;
        }) : Source$.MODULE$.single(seq).via(journalRowDriver().multiDeleteJournalRowsFlow())).map(obj -> {
            return internalDeleteStream$$anonfun$1(promise, BoxesRunTime.unboxToLong(obj));
        }).recover(new WriteJournalDaoImpl$$anon$2(promise)).runWith(Sink$.MODULE$.ignore(), mat());
    }

    private Tuple2<SourceQueueWithComplete<Tuple2<Promise<Object>, Seq<PersistenceIdWithSeqNr>>>, UniqueKillSwitch> deleteQueue() {
        Tuple2 tuple2 = (Tuple2) Source$.MODULE$.queue(this.queueBufferSize, this.queueOverflowStrategy).viaMat(KillSwitches$.MODULE$.single(), Keep$.MODULE$.both()).mapAsync(this.writeParallelism, tuple22 -> {
            if (tuple22 != null) {
                return internalDeleteStream((Promise) tuple22._1(), (Seq) tuple22._2());
            }
            throw new MatchError(tuple22);
        }).toMat(Sink$.MODULE$.ignore(), Keep$.MODULE$.both()).withAttributes(this.logLevels).run(mat());
        return Tuple2$.MODULE$.apply(((Tuple2) tuple2._1())._1(), ((Tuple2) tuple2._1())._2());
    }

    @Override // com.github.j5ik2o.pekko.persistence.dynamodb.journal.dao.WriteJournalDao
    public void dispose() {
        this.putQueues.foreach(tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            ((UniqueKillSwitch) tuple2._2()).shutdown();
        });
        this.deleteQueues.foreach(tuple22 -> {
            if (tuple22 == null) {
                throw new MatchError(tuple22);
            }
            ((UniqueKillSwitch) tuple22._2()).shutdown();
        });
    }

    private SourceQueueWithComplete<Tuple2<Promise<Object>, Seq<PersistenceIdWithSeqNr>>> selectDeleteQueue(PersistenceId persistenceId) {
        return (SourceQueueWithComplete) ((Tuple2) this.deleteQueues.apply(queueIdFrom(persistenceId)))._1();
    }

    @Override // com.github.j5ik2o.pekko.persistence.dynamodb.journal.dao.JournalDaoWithUpdates
    public Source<BoxedUnit, NotUsed> updateMessage(JournalRow journalRow) {
        return journalRowDriver().updateMessage(journalRow);
    }

    @Override // com.github.j5ik2o.pekko.persistence.dynamodb.journal.dao.WriteJournalDao
    public Source<Object, NotUsed> deleteMessages(PersistenceId persistenceId, SequenceNumber sequenceNumber) {
        return journalRowDriver().getJournalRows(persistenceId, sequenceNumber, false).flatMapConcat(seq -> {
            return this.pluginContext.m7pluginConfig().softDeleted() ? putMessages((Seq) seq.map(journalRow -> {
                return journalRow.withDeleted();
            })) : deleteBy(persistenceId, (Seq) seq.map(journalRow2 -> {
                return journalRow2.sequenceNumber();
            }));
        }).withAttributes(this.logLevels);
    }

    @Override // com.github.j5ik2o.pekko.persistence.dynamodb.journal.dao.WriteJournalDao
    public Source<Object, NotUsed> putMessages(Seq<JournalRow> seq) {
        return seq.isEmpty() ? Source$.MODULE$.single(BoxesRunTime.boxToLong(0L)) : this.pluginContext.m7pluginConfig().queueEnable() ? Source$.MODULE$.single(seq).via(requestPutJournalRows()) : Source$.MODULE$.single(seq).via(requestPutJournalRowsPassThrough());
    }

    @Override // com.github.j5ik2o.pekko.persistence.dynamodb.journal.dao.WriteJournalDao
    public Source<Option<Object>, NotUsed> highestSequenceNr(PersistenceId persistenceId, SequenceNumber sequenceNumber) {
        return journalRowDriver().highestSequenceNr(persistenceId, Some$.MODULE$.apply(sequenceNumber), journalRowDriver().highestSequenceNr$default$3());
    }

    private Flow<Seq<JournalRow>, Object, NotUsed> requestPutJournalRowsPassThrough() {
        return Flow$.MODULE$.apply().mapAsync(this.writeParallelism, seq -> {
            Promise<Object> apply = Promise$.MODULE$.apply();
            return internalPutStream(apply, seq).flatMap(done -> {
                return apply.future();
            }, ec());
        });
    }

    private Flow<Seq<JournalRow>, Object, NotUsed> requestPutJournalRows() {
        return Flow$.MODULE$.apply().mapAsync(1, seq -> {
            Promise apply = Promise$.MODULE$.apply();
            return selectPutQueue(((JournalRow) seq.head()).persistenceId()).offer(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension((Promise) Predef$.MODULE$.ArrowAssoc(apply), seq)).flatMap(queueOfferResult -> {
                if (QueueOfferResult$Enqueued$.MODULE$.equals(queueOfferResult)) {
                    return apply.future();
                }
                if (queueOfferResult instanceof QueueOfferResult.Failure) {
                    return Future$.MODULE$.failed(new Exception("Failed to write journal row batch", QueueOfferResult$Failure$.MODULE$.unapply((QueueOfferResult.Failure) queueOfferResult)._1()));
                }
                if (QueueOfferResult$Dropped$.MODULE$.equals(queueOfferResult)) {
                    return Future$.MODULE$.failed(new Exception(new StringBuilder(129).append("Failed to enqueue journal row batch write, the queue buffer was full (").append(this.queueBufferSize).append(" elements) please check the jdbc-journal.bufferSize setting").toString()));
                }
                if (QueueOfferResult$QueueClosed$.MODULE$.equals(queueOfferResult)) {
                    return Future$.MODULE$.failed(new Exception("Failed to enqueue journal row batch write, the queue was closed"));
                }
                throw new MatchError(queueOfferResult);
            }, ec());
        }).withAttributes(this.logLevels);
    }

    private Source<Object, NotUsed> deleteBy(PersistenceId persistenceId, Seq<SequenceNumber> seq) {
        return seq.isEmpty() ? Source$.MODULE$.empty() : this.pluginContext.m7pluginConfig().queueEnable() ? Source$.MODULE$.single(seq.map(sequenceNumber -> {
            return PersistenceIdWithSeqNr$.MODULE$.apply(persistenceId, sequenceNumber);
        })).via(requestDeleteJournalRows()) : Source$.MODULE$.single(seq.map(sequenceNumber2 -> {
            return PersistenceIdWithSeqNr$.MODULE$.apply(persistenceId, sequenceNumber2);
        })).via(requestDeleteJournalRowsPassThrough());
    }

    private Flow<Seq<PersistenceIdWithSeqNr>, Object, NotUsed> requestDeleteJournalRowsPassThrough() {
        return Flow$.MODULE$.apply().mapAsync(this.writeParallelism, seq -> {
            Promise<Object> apply = Promise$.MODULE$.apply();
            return internalDeleteStream(apply, seq).flatMap(done -> {
                return apply.future();
            }, ec());
        });
    }

    private Flow<Seq<PersistenceIdWithSeqNr>, Object, NotUsed> requestDeleteJournalRows() {
        return Flow$.MODULE$.apply().mapAsync(this.writeParallelism, seq -> {
            Promise apply = Promise$.MODULE$.apply();
            return selectDeleteQueue(((PersistenceIdWithSeqNr) seq.head()).persistenceId()).offer(Predef$ArrowAssoc$.MODULE$.$minus$greater$extension((Promise) Predef$.MODULE$.ArrowAssoc(apply), seq)).flatMap(queueOfferResult -> {
                if (QueueOfferResult$Enqueued$.MODULE$.equals(queueOfferResult)) {
                    return apply.future();
                }
                if (queueOfferResult instanceof QueueOfferResult.Failure) {
                    return Future$.MODULE$.failed(new Exception("Failed to write journal row batch", QueueOfferResult$Failure$.MODULE$.unapply((QueueOfferResult.Failure) queueOfferResult)._1()));
                }
                if (QueueOfferResult$Dropped$.MODULE$.equals(queueOfferResult)) {
                    return Future$.MODULE$.failed(new Exception(new StringBuilder(129).append("Failed to enqueue journal row batch write, the queue buffer was full (").append(this.queueBufferSize).append(" elements) please check the jdbc-journal.bufferSize setting").toString()));
                }
                if (QueueOfferResult$QueueClosed$.MODULE$.equals(queueOfferResult)) {
                    return Future$.MODULE$.failed(new Exception("Failed to enqueue journal row batch write, the queue was closed"));
                }
                throw new MatchError(queueOfferResult);
            }, ec());
        }).withAttributes(this.logLevels);
    }

    private final /* synthetic */ Tuple2 $init$$$anonfun$1(int i) {
        return putQueue();
    }

    private final /* synthetic */ Tuple2 $init$$$anonfun$2(int i) {
        return deleteQueue();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final /* synthetic */ Promise internalPutStream$$anonfun$1(Promise promise, long j) {
        return promise.success(BoxesRunTime.boxToLong(j));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final /* synthetic */ Promise internalDeleteStream$$anonfun$1(Promise promise, long j) {
        return promise.success(BoxesRunTime.boxToLong(j));
    }
}
