package io.debezium.server.sqs;

import io.debezium.DebeziumException;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.server.BaseChangeConsumer;
import io.debezium.util.Clock;
import io.debezium.util.Metronome;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import jakarta.enterprise.context.Dependent;
import jakarta.inject.Named;
import java.net.URI;
import java.time.Duration;
import java.util.List;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.ConfigProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.SqsClientBuilder;
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;

@Named("sqs")
@Dependent
/* loaded from: input_file:io/debezium/server/sqs/SqsChangeConsumer.class */
public class SqsChangeConsumer extends BaseChangeConsumer implements DebeziumEngine.ChangeConsumer<ChangeEvent<Object, Object>> {
    protected static final String PROP_PREFIX = "debezium.sink.sqs.";
    protected static final String PROP_REGION_NAME = "debezium.sink.sqs.region";
    private static final Logger LOGGER = LoggerFactory.getLogger(SqsChangeConsumer.class);
    private static final Duration RETRY_INTERVAL = Duration.ofSeconds(1);
    private static final int DEFAULT_RETRIES = 5;
    private static final String PROP_ENDPOINT_NAME = "debezium.sink.sqs.endpoint";
    private static final String PROP_QUEUE_URL = "debezium.sink.sqs.queue.url";
    private static final String PROP_CREDENTIALS_PROFILE = "debezium.sink.sqs.credentials.profile";
    private static final String PROP_QUEUE_FIFO_MESSAGE_GROUP_ID = "debezium.sink.sqs.fifo.message.group.id";
    private String queueUrl;
    private String messageGroupId = null;
    private SqsClient client = null;

    @PostConstruct
    void connect() {
        Config config = ConfigProvider.getConfig();
        SqsClientBuilder region = SqsClient.builder().region(Region.of((String) config.getValue(PROP_REGION_NAME, String.class)));
        config.getOptionalValue(PROP_ENDPOINT_NAME, String.class).ifPresent(str -> {
            LOGGER.info("Queue Endpoint {}", str);
            region.endpointOverride(URI.create(str));
        });
        config.getOptionalValue(PROP_CREDENTIALS_PROFILE, String.class).ifPresent(str2 -> {
            LOGGER.info("Credentials profile {}", str2);
            region.credentialsProvider(ProfileCredentialsProvider.create(str2));
        });
        this.client = (SqsClient) region.build();
        this.queueUrl = (String) config.getValue(PROP_QUEUE_URL, String.class);
        LOGGER.info("Queue Url {}", this.queueUrl);
        if (this.queueUrl.endsWith(".fifo")) {
            this.messageGroupId = (String) config.getOptionalValue(PROP_QUEUE_FIFO_MESSAGE_GROUP_ID, String.class).orElse("cdc-group");
        }
    }

    @PreDestroy
    void close() {
        try {
            this.client.close();
        } catch (Exception e) {
            LOGGER.warn("Exception while closing Sqs client", e);
        }
    }

    public void handleBatch(List<ChangeEvent<Object, Object>> list, DebeziumEngine.RecordCommitter<ChangeEvent<Object, Object>> recordCommitter) throws InterruptedException {
        for (ChangeEvent<Object, Object> changeEvent : list) {
            LOGGER.trace("Received event '{}'", changeEvent);
            int i = 0;
            while (!recordSent(changeEvent)) {
                i++;
                if (i >= DEFAULT_RETRIES) {
                    throw new DebeziumException("Exceeded maximum number of attempts to publish event " + String.valueOf(changeEvent));
                }
                Metronome.sleeper(RETRY_INTERVAL, Clock.SYSTEM).pause();
            }
            recordCommitter.markProcessed(changeEvent);
        }
        recordCommitter.markBatchFinished();
    }

    private boolean recordSent(ChangeEvent<Object, Object> changeEvent) {
        Object value = changeEvent.value();
        if (value == null) {
            value = "";
        }
        LOGGER.info(changeEvent.toString());
        SendMessageRequest.Builder messageBody = SendMessageRequest.builder().queueUrl(this.queueUrl).messageBody(value.toString());
        if (this.messageGroupId != null) {
            messageBody.messageGroupId(this.messageGroupId);
        }
        try {
            this.client.sendMessage((SendMessageRequest) messageBody.build());
            return true;
        } catch (SdkClientException e) {
            LOGGER.error("Failed to send record to {}", changeEvent.destination(), e);
            return false;
        }
    }
}
