package io.debezium.server.kinesis;

import io.debezium.DebeziumException;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.server.BaseChangeConsumer;
import io.debezium.server.CustomConsumerBuilder;
import io.debezium.util.Clock;
import io.debezium.util.Metronome;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import jakarta.enterprise.context.Dependent;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import java.net.URI;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.ConfigProvider;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.KinesisClientBuilder;
import software.amazon.awssdk.services.kinesis.model.KinesisException;
import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;
import software.amazon.awssdk.services.kinesis.model.PutRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.PutRecordsResultEntry;

@Named("kinesis")
@Dependent
/* loaded from: input_file:io/debezium/server/kinesis/KinesisChangeConsumer.class */
public class KinesisChangeConsumer extends BaseChangeConsumer implements DebeziumEngine.ChangeConsumer<ChangeEvent<Object, Object>> {
    private static final String PROP_PREFIX = "debezium.sink.kinesis.";
    private static final String PROP_REGION_NAME = "debezium.sink.kinesis.region";
    private static final String PROP_ENDPOINT_NAME = "debezium.sink.kinesis.endpoint";
    private static final String PROP_CREDENTIALS_PROFILE = "debezium.sink.kinesis.credentials.profile";
    private static final String PROP_BATCH_SIZE = "debezium.sink.kinesis.batch.size";
    private static final String PROP_RETRIES = "debezium.sink.kinesis.default.retries";
    private static final int DEFAULT_RETRY_COUNT = 5;
    private static final int MAX_BATCH_SIZE = 500;
    private String region;
    private Optional<String> endpointOverride;
    private Optional<String> credentialsProfile;
    private Integer batchSize;
    private Integer maxRetries;

    @ConfigProperty(name = "debezium.sink.kinesis.null.key", defaultValue = "default")
    String nullKey;
    private KinesisClient client = null;

    @Inject
    @CustomConsumerBuilder
    Instance<KinesisClient> customClient;
    private static final Logger LOGGER = LoggerFactory.getLogger(KinesisChangeConsumer.class);
    private static final Duration RETRY_INTERVAL = Duration.ofSeconds(1);

    @PostConstruct
    void connect() {
        Config config = ConfigProvider.getConfig();
        this.batchSize = (Integer) config.getOptionalValue(PROP_BATCH_SIZE, Integer.class).orElse(Integer.valueOf(MAX_BATCH_SIZE));
        this.maxRetries = (Integer) config.getOptionalValue(PROP_RETRIES, Integer.class).orElse(Integer.valueOf(DEFAULT_RETRY_COUNT));
        if (this.batchSize.intValue() <= 0) {
            throw new DebeziumException("Batch size must be greater than 0");
        }
        if (this.batchSize.intValue() > MAX_BATCH_SIZE) {
            throw new DebeziumException("Batch size must be less than or equal to MAX_BATCH_SIZE");
        }
        if (this.customClient.isResolvable()) {
            this.client = (KinesisClient) this.customClient.get();
            LOGGER.info("Obtained custom configured KinesisClient '{}'", this.client);
            return;
        }
        this.region = (String) config.getValue(PROP_REGION_NAME, String.class);
        this.endpointOverride = config.getOptionalValue(PROP_ENDPOINT_NAME, String.class);
        this.credentialsProfile = config.getOptionalValue(PROP_CREDENTIALS_PROFILE, String.class);
        KinesisClientBuilder region = KinesisClient.builder().region(Region.of(this.region));
        this.endpointOverride.ifPresent(str -> {
            region.endpointOverride(URI.create(str));
        });
        this.credentialsProfile.ifPresent(str2 -> {
            region.credentialsProvider(ProfileCredentialsProvider.create(str2));
        });
        this.client = (KinesisClient) region.build();
        LOGGER.info("Using default KinesisClient '{}'", this.client);
    }

    @PreDestroy
    void close() {
        try {
            this.client.close();
        } catch (Exception e) {
            LOGGER.warn("Exception while closing Kinesis client: {}", e);
        }
    }

    public void handleBatch(List<ChangeEvent<Object, Object>> list, DebeziumEngine.RecordCommitter<ChangeEvent<Object, Object>> recordCommitter) throws InterruptedException {
        if (list.isEmpty()) {
            recordCommitter.markBatchFinished();
            return;
        }
        new ArrayList();
        for (List list2 : ((Map) list.stream().collect(Collectors.groupingBy(changeEvent -> {
            return changeEvent.destination();
        }))).values()) {
            int i = 0;
            while (true) {
                int i2 = i;
                if (i2 < list2.size()) {
                    List<ChangeEvent> subList = list2.subList(i2, Math.min(i2 + this.batchSize.intValue(), list2.size()));
                    ArrayList arrayList = new ArrayList();
                    String destination = ((ChangeEvent) subList.get(0)).destination();
                    for (ChangeEvent changeEvent2 : subList) {
                        Object value = changeEvent2.value();
                        if (value == null) {
                            value = "";
                        }
                        arrayList.add((PutRecordsRequestEntry) PutRecordsRequestEntry.builder().partitionKey(changeEvent2.key() != null ? getString(changeEvent2.key()) : this.nullKey).data(SdkBytes.fromByteArray(getBytes(value))).build());
                    }
                    boolean z = true;
                    int i3 = 0;
                    ArrayList arrayList2 = arrayList;
                    while (z) {
                        if (i3 >= this.maxRetries.intValue()) {
                            throw new DebeziumException("Exceeded maximum number of attempts to publish event");
                        }
                        try {
                            PutRecordsResponse recordsSent = recordsSent(arrayList2, destination);
                            i3++;
                            if (recordsSent.failedRecordCount().intValue() > 0) {
                                LOGGER.warn("Failed to send {} number of records, retrying", recordsSent.failedRecordCount());
                                Metronome.sleeper(RETRY_INTERVAL, Clock.SYSTEM).pause();
                                List records = recordsSent.records();
                                ArrayList arrayList3 = new ArrayList();
                                for (int i4 = 0; i4 < records.size(); i4++) {
                                    if (((PutRecordsResultEntry) records.get(i4)).errorCode() != null) {
                                        arrayList3.add(arrayList2.get(i4));
                                    }
                                }
                                arrayList2 = arrayList3;
                            } else {
                                z = false;
                                i3 = 0;
                            }
                        } catch (KinesisException e) {
                            LOGGER.warn("Failed to send record to {}", destination, e);
                            i3++;
                            Metronome.sleeper(RETRY_INTERVAL, Clock.SYSTEM).pause();
                        }
                    }
                    Iterator it = subList.iterator();
                    while (it.hasNext()) {
                        recordCommitter.markProcessed((ChangeEvent) it.next());
                    }
                    i = i2 + this.batchSize.intValue();
                }
            }
        }
        recordCommitter.markBatchFinished();
    }

    private PutRecordsResponse recordsSent(List<PutRecordsRequestEntry> list, String str) {
        PutRecordsResponse putRecords = this.client.putRecords((PutRecordsRequest) PutRecordsRequest.builder().streamName(this.streamNameMapper.map(str)).records(list).build());
        LOGGER.trace("Response Receieved: " + String.valueOf(putRecords));
        return putRecords;
    }
}
