package io.atleon.aws.sqs;

import io.atleon.core.Alo;
import io.atleon.core.AloFlux;
import io.atleon.core.SenderResult;
import java.io.Closeable;
import java.util.Objects;
import java.util.function.BiConsumer;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.SynchronousSink;

/* loaded from: input_file:io/atleon/aws/sqs/AloSqsSender.class */
public class AloSqsSender<T> implements Closeable {
    public static final String CONFIG_PREFIX = "sqs.sender.";
    public static final String BODY_SERIALIZER_CONFIG = "sqs.sender.body.serializer";
    public static final String BATCH_SIZE_CONFIG = "sqs.sender.batch.size";
    public static final String BATCH_DURATION_CONFIG = "sqs.sender.batch.duration";
    public static final String BATCH_PREFETCH_CONFIG = "sqs.sender.batch.prefetch";
    public static final String MAX_REQUESTS_IN_FLIGHT_CONFIG = "sqs.sender.max.requests.in.flight";
    private static final Logger LOGGER = LoggerFactory.getLogger(AloSqsSender.class);
    private final Mono<SendResources<T>> futureResources;
    private final Sinks.Many<Long> closeSink = Sinks.many().multicast().directBestEffort();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/atleon/aws/sqs/AloSqsSender$SendResources.class */
    public static final class SendResources<T> {
        private final SqsSender sender;
        private final BodySerializer<T> bodySerializer;

        private SendResources(SqsSender sqsSender, BodySerializer<T> bodySerializer) {
            this.sender = sqsSender;
            this.bodySerializer = bodySerializer;
        }

        public static <T> SendResources<T> fromConfig(SqsConfig sqsConfig) {
            Objects.requireNonNull(sqsConfig);
            return new SendResources<>(SqsSender.create(SqsSenderOptions.newBuilder(sqsConfig::buildClient).batchSize(sqsConfig.loadInt(AloSqsSender.BATCH_SIZE_CONFIG).orElse(1).intValue()).batchDuration(sqsConfig.loadDuration(AloSqsSender.BATCH_DURATION_CONFIG).orElse(SqsSenderOptions.DEFAULT_BATCH_DURATION)).batchPrefetch(sqsConfig.loadInt(AloSqsSender.BATCH_PREFETCH_CONFIG).orElse(32).intValue()).maxRequestsInFlight(sqsConfig.loadInt(AloSqsSender.MAX_REQUESTS_IN_FLIGHT_CONFIG).orElse(1).intValue()).build()), (BodySerializer) sqsConfig.loadConfiguredOrThrow(AloSqsSender.BODY_SERIALIZER_CONFIG, BodySerializer.class));
        }

        public Mono<SqsSenderResult<SqsMessage<T>>> send(SqsMessage<T> sqsMessage, String str) {
            return this.sender.send((SqsSenderMessage) toSenderMessage(sqsMessage, Function.identity()), str);
        }

        public <R> Flux<SqsSenderResult<R>> send(Publisher<R> publisher, Function<R, SqsMessage<T>> function, String str) {
            return Flux.from(publisher).map(obj -> {
                return toSenderMessage(obj, function);
            }).transform(flux -> {
                return this.sender.send((Publisher) flux, str);
            });
        }

        public <R> Flux<Alo<SqsSenderResult<R>>> sendAlos(Publisher<Alo<R>> publisher, Function<R, SqsMessage<T>> function, String str) {
            return AloFlux.toFlux(publisher).handle(newAloEmitter(function.compose((v0) -> {
                return v0.get();
            }))).transform(flux -> {
                return this.sender.send((Publisher) flux, str);
            }).map(sqsSenderResult -> {
                Alo alo = (Alo) sqsSenderResult.correlationMetadata();
                Objects.requireNonNull(sqsSenderResult);
                return alo.map(sqsSenderResult::replaceCorrelationMetadata);
            });
        }

        public void close() {
            this.sender.close();
        }

        private <R> BiConsumer<Alo<R>, SynchronousSink<SqsSenderMessage<Alo<R>>>> newAloEmitter(Function<Alo<R>, SqsMessage<T>> function) {
            return (alo, synchronousSink) -> {
                alo.runInContext(() -> {
                    synchronousSink.next(toSenderMessage(alo, function));
                });
            };
        }

        private <R> SqsSenderMessage<R> toSenderMessage(R r, Function<R, SqsMessage<T>> function) {
            SqsMessage<T> apply = function.apply(r);
            return SqsSenderMessage.newBuilder().messageDeduplicationId(apply.messageDeduplicationId().orElse(null)).messageGroupId(apply.messageGroupId().orElse(null)).messageAttributes(apply.messageAttributes()).messageSystemAttributes(apply.messageSystemAttributes()).body(this.bodySerializer.serialize(apply.body())).delaySeconds(apply.senderDelaySeconds().orElse(null)).correlationMetadata(r).build();
        }
    }

    private AloSqsSender(SqsConfigSource sqsConfigSource) {
        this.futureResources = ((Mono) sqsConfigSource.create()).map(SendResources::fromConfig).cacheInvalidateWhen(sendResources -> {
            return this.closeSink.asFlux().next().then();
        }, (v0) -> {
            v0.close();
        });
    }

    public static <T> AloSqsSender<T> from(SqsConfigSource sqsConfigSource) {
        return create(sqsConfigSource);
    }

    public static <T> AloSqsSender<T> create(SqsConfigSource sqsConfigSource) {
        return new AloSqsSender<>(sqsConfigSource);
    }

    public Flux<SqsSenderResult<T>> sendBodies(Publisher<T> publisher, SqsMessageCreator<T> sqsMessageCreator, String str) {
        return this.futureResources.flatMapMany(sendResources -> {
            return sendResources.send(publisher, sqsMessageCreator, str);
        });
    }

    public Mono<SqsSenderResult<SqsMessage<T>>> sendMessage(SqsMessage<T> sqsMessage, String str) {
        return this.futureResources.flatMap(sendResources -> {
            return sendResources.send(sqsMessage, str);
        });
    }

    public Flux<SqsSenderResult<SqsMessage<T>>> sendMessages(Publisher<SqsMessage<T>> publisher, String str) {
        return this.futureResources.flatMapMany(sendResources -> {
            return sendResources.send(publisher, Function.identity(), str);
        });
    }

    public Function<Publisher<Alo<T>>, AloFlux<SqsSenderResult<T>>> sendAloBodies(SqsMessageCreator<T> sqsMessageCreator, String str) {
        return publisher -> {
            return sendAloBodies(publisher, sqsMessageCreator, str);
        };
    }

    public AloFlux<SqsSenderResult<T>> sendAloBodies(Publisher<Alo<T>> publisher, SqsMessageCreator<T> sqsMessageCreator, String str) {
        return ((AloFlux) this.futureResources.flatMapMany(sendResources -> {
            return sendResources.sendAlos(publisher, sqsMessageCreator, str);
        }).as((v0) -> {
            return AloFlux.wrap(v0);
        })).processFailure((v0) -> {
            return v0.isFailure();
        }, (v0) -> {
            return SenderResult.toError(v0);
        });
    }

    public Function<Publisher<Alo<SqsMessage<T>>>, AloFlux<SqsSenderResult<SqsMessage<T>>>> sendAloMessages(String str) {
        return publisher -> {
            return sendAloMessages(publisher, str);
        };
    }

    public AloFlux<SqsSenderResult<SqsMessage<T>>> sendAloMessages(Publisher<Alo<SqsMessage<T>>> publisher, String str) {
        return ((AloFlux) this.futureResources.flatMapMany(sendResources -> {
            return sendResources.sendAlos(publisher, Function.identity(), str);
        }).as((v0) -> {
            return AloFlux.wrap(v0);
        })).processFailure((v0) -> {
            return v0.isFailure();
        }, (v0) -> {
            return SenderResult.toError(v0);
        });
    }

    public void close(Object obj) {
        LOGGER.info("Closing AloSqsSender due to reason={}", obj);
        close();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.closeSink.tryEmitNext(Long.valueOf(System.currentTimeMillis()));
    }
}
