package software.amazon.kinesis.checkpoint;

import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.kinesis.model.Record;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.KinesisClientLibDependencyException;
import software.amazon.kinesis.exceptions.KinesisClientLibException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.exceptions.ThrottlingException;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.processor.Checkpointer;
import software.amazon.kinesis.processor.PreparedCheckpointer;
import software.amazon.kinesis.processor.RecordProcessorCheckpointer;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;

/* loaded from: input_file:software/amazon/kinesis/checkpoint/ShardRecordProcessorCheckpointer.class */
public class ShardRecordProcessorCheckpointer implements RecordProcessorCheckpointer {
    private static final Logger log = LoggerFactory.getLogger(ShardRecordProcessorCheckpointer.class);

    @NonNull
    private final ShardInfo shardInfo;

    @NonNull
    private final Checkpointer checkpointer;
    private ExtendedSequenceNumber lastCheckpointValue;
    private ExtendedSequenceNumber largestPermittedCheckpointValue;
    private ExtendedSequenceNumber sequenceNumberAtShardEnd;

    @Override // software.amazon.kinesis.processor.RecordProcessorCheckpointer
    public synchronized void checkpoint() throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException {
        if (log.isDebugEnabled()) {
            log.debug("Checkpointing {}, token {} at largest permitted value {}", new Object[]{ShardInfo.getLeaseKey(this.shardInfo), this.shardInfo.concurrencyToken(), this.largestPermittedCheckpointValue});
        }
        advancePosition(this.largestPermittedCheckpointValue);
    }

    @Override // software.amazon.kinesis.processor.RecordProcessorCheckpointer
    public synchronized void checkpoint(Record record) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, IllegalArgumentException {
        if (record == null) {
            throw new IllegalArgumentException("Could not checkpoint a null record");
        }
        checkpoint(record.sequenceNumber(), 0L);
    }

    @Override // software.amazon.kinesis.processor.RecordProcessorCheckpointer
    public synchronized void checkpoint(String str) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, IllegalArgumentException {
        checkpoint(str, 0L);
    }

    @Override // software.amazon.kinesis.processor.RecordProcessorCheckpointer
    public synchronized void checkpoint(String str, long j) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, IllegalArgumentException {
        if (j < 0) {
            throw new IllegalArgumentException("Could not checkpoint at invalid, negative subsequence number " + j);
        }
        ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber(str, Long.valueOf(j));
        if ((this.lastCheckpointValue != null && this.lastCheckpointValue.compareTo(extendedSequenceNumber) > 0) || extendedSequenceNumber.compareTo(this.largestPermittedCheckpointValue) > 0) {
            throw new IllegalArgumentException(String.format("Could not checkpoint at extended sequence number %s as it did not fall into acceptable range between the last checkpoint %s and the greatest extended sequence number passed to this record processor %s", extendedSequenceNumber, this.lastCheckpointValue, this.largestPermittedCheckpointValue));
        }
        if (log.isDebugEnabled()) {
            log.debug("Checkpointing {}, token {} at specific extended sequence number {}", new Object[]{ShardInfo.getLeaseKey(this.shardInfo), this.shardInfo.concurrencyToken(), extendedSequenceNumber});
        }
        advancePosition(extendedSequenceNumber);
    }

    @Override // software.amazon.kinesis.processor.RecordProcessorCheckpointer
    public synchronized PreparedCheckpointer prepareCheckpoint() throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException {
        return prepareCheckpoint(this.largestPermittedCheckpointValue.sequenceNumber(), this.largestPermittedCheckpointValue.subSequenceNumber());
    }

    @Override // software.amazon.kinesis.processor.RecordProcessorCheckpointer
    public PreparedCheckpointer prepareCheckpoint(byte[] bArr) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException {
        return prepareCheckpoint(this.largestPermittedCheckpointValue.sequenceNumber(), bArr);
    }

    @Override // software.amazon.kinesis.processor.RecordProcessorCheckpointer
    public PreparedCheckpointer prepareCheckpoint(Record record, byte[] bArr) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException {
        if (record == null) {
            throw new IllegalArgumentException("Could not prepare checkpoint a null record");
        }
        return prepareCheckpoint(record.sequenceNumber(), 0L, bArr);
    }

    @Override // software.amazon.kinesis.processor.RecordProcessorCheckpointer
    public synchronized PreparedCheckpointer prepareCheckpoint(Record record) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException {
        return prepareCheckpoint(record, (byte[]) null);
    }

    @Override // software.amazon.kinesis.processor.RecordProcessorCheckpointer
    public synchronized PreparedCheckpointer prepareCheckpoint(String str) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException {
        return prepareCheckpoint(str, 0L);
    }

    @Override // software.amazon.kinesis.processor.RecordProcessorCheckpointer
    public PreparedCheckpointer prepareCheckpoint(String str, byte[] bArr) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, IllegalArgumentException {
        return prepareCheckpoint(str, 0L, bArr);
    }

    @Override // software.amazon.kinesis.processor.RecordProcessorCheckpointer
    public synchronized PreparedCheckpointer prepareCheckpoint(String str, long j) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException {
        return prepareCheckpoint(str, j, null);
    }

    @Override // software.amazon.kinesis.processor.RecordProcessorCheckpointer
    public PreparedCheckpointer prepareCheckpoint(String str, long j, byte[] bArr) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException, IllegalArgumentException {
        if (j < 0) {
            throw new IllegalArgumentException("Could not checkpoint at invalid, negative subsequence number " + j);
        }
        ExtendedSequenceNumber extendedSequenceNumber = new ExtendedSequenceNumber(str, Long.valueOf(j));
        if ((this.lastCheckpointValue != null && this.lastCheckpointValue.compareTo(extendedSequenceNumber) > 0) || extendedSequenceNumber.compareTo(this.largestPermittedCheckpointValue) > 0) {
            throw new IllegalArgumentException(String.format("Could not prepare checkpoint at extended sequence number %s as it did not fall into acceptable range between the last checkpoint %s and the greatest extended sequence number passed to this record processor %s", extendedSequenceNumber, this.lastCheckpointValue, this.largestPermittedCheckpointValue));
        }
        if (log.isDebugEnabled()) {
            log.debug("Preparing checkpoint {}, token {} at specific extended sequence number {}", new Object[]{ShardInfo.getLeaseKey(this.shardInfo), this.shardInfo.concurrencyToken(), extendedSequenceNumber});
        }
        return doPrepareCheckpoint(extendedSequenceNumber, bArr);
    }

    public synchronized void setInitialCheckpointValue(ExtendedSequenceNumber extendedSequenceNumber) {
        this.lastCheckpointValue = extendedSequenceNumber;
    }

    public synchronized void largestPermittedCheckpointValue(ExtendedSequenceNumber extendedSequenceNumber) {
        this.largestPermittedCheckpointValue = extendedSequenceNumber;
    }

    public synchronized void sequenceNumberAtShardEnd(ExtendedSequenceNumber extendedSequenceNumber) {
        this.sequenceNumberAtShardEnd = extendedSequenceNumber;
    }

    void advancePosition(String str) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException {
        advancePosition(new ExtendedSequenceNumber(str));
    }

    void advancePosition(ExtendedSequenceNumber extendedSequenceNumber) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException {
        ExtendedSequenceNumber extendedSequenceNumber2 = extendedSequenceNumber;
        if (this.sequenceNumberAtShardEnd != null && this.sequenceNumberAtShardEnd.equals(extendedSequenceNumber)) {
            extendedSequenceNumber2 = ExtendedSequenceNumber.SHARD_END;
        }
        if (extendedSequenceNumber == null || extendedSequenceNumber.equals(this.lastCheckpointValue)) {
            return;
        }
        try {
            if (log.isDebugEnabled()) {
                log.debug("Setting {}, token {} checkpoint to {}", new Object[]{ShardInfo.getLeaseKey(this.shardInfo), this.shardInfo.concurrencyToken(), extendedSequenceNumber2});
            }
            this.checkpointer.setCheckpoint(ShardInfo.getLeaseKey(this.shardInfo), extendedSequenceNumber2, this.shardInfo.concurrencyToken());
            this.lastCheckpointValue = extendedSequenceNumber2;
        } catch (InvalidStateException | KinesisClientLibDependencyException | ShutdownException | ThrottlingException e) {
            throw e;
        } catch (KinesisClientLibException e2) {
            log.warn("Caught exception setting checkpoint.", e2);
            throw new KinesisClientLibDependencyException("Caught exception while checkpointing", e2);
        }
    }

    private PreparedCheckpointer doPrepareCheckpoint(ExtendedSequenceNumber extendedSequenceNumber, byte[] bArr) throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException {
        ExtendedSequenceNumber extendedSequenceNumber2 = extendedSequenceNumber;
        if (this.sequenceNumberAtShardEnd != null && this.sequenceNumberAtShardEnd.equals(extendedSequenceNumber)) {
            extendedSequenceNumber2 = ExtendedSequenceNumber.SHARD_END;
        }
        if (extendedSequenceNumber2.equals(this.lastCheckpointValue)) {
            return new DoesNothingPreparedCheckpointer(extendedSequenceNumber2);
        }
        try {
            this.checkpointer.prepareCheckpoint(ShardInfo.getLeaseKey(this.shardInfo), extendedSequenceNumber2, this.shardInfo.concurrencyToken(), bArr);
            return new ShardPreparedCheckpointer(extendedSequenceNumber2, this);
        } catch (InvalidStateException | KinesisClientLibDependencyException | ShutdownException | ThrottlingException e) {
            throw e;
        } catch (KinesisClientLibException e2) {
            log.warn("Caught exception setting prepareCheckpoint.", e2);
            throw new KinesisClientLibDependencyException("Caught exception while prepareCheckpointing", e2);
        }
    }

    public ShardRecordProcessorCheckpointer(@NonNull ShardInfo shardInfo, @NonNull Checkpointer checkpointer) {
        if (shardInfo == null) {
            throw new NullPointerException("shardInfo is marked non-null but is null");
        }
        if (checkpointer == null) {
            throw new NullPointerException("checkpointer is marked non-null but is null");
        }
        this.shardInfo = shardInfo;
        this.checkpointer = checkpointer;
    }

    @Override // software.amazon.kinesis.processor.RecordProcessorCheckpointer
    @NonNull
    public Checkpointer checkpointer() {
        return this.checkpointer;
    }

    public ExtendedSequenceNumber lastCheckpointValue() {
        return this.lastCheckpointValue;
    }

    public ExtendedSequenceNumber largestPermittedCheckpointValue() {
        return this.largestPermittedCheckpointValue;
    }
}
