package io.debezium.connector.spanner.task;

import com.google.cloud.Timestamp;
import io.debezium.connector.spanner.SpannerPartition;
import io.debezium.connector.spanner.context.offset.PartitionOffset;
import io.debezium.connector.spanner.kafka.internal.model.PartitionState;
import io.debezium.connector.spanner.metrics.MetricsEventPublisher;
import io.debezium.connector.spanner.metrics.event.OffsetReceivingTimeMetricEvent;
import java.time.Instant;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.kafka.connect.storage.OffsetStorageReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/spanner/task/PartitionOffsetProvider.class */
public class PartitionOffsetProvider {
    private static final Logger LOGGER = LoggerFactory.getLogger(PartitionOffsetProvider.class);
    private final OffsetStorageReader offsetStorageReader;
    private final MetricsEventPublisher metricsEventPublisher;
    private final ExecutorService executor = Executors.newCachedThreadPool();

    /* loaded from: input_file:io/debezium/connector/spanner/task/PartitionOffsetProvider$ExecutorServiceCallable.class */
    public static class ExecutorServiceCallable implements Callable<Map<String, ?>> {
        private OffsetStorageReader offsetStorageReader;
        private Map<String, String> spannerPartition;

        public ExecutorServiceCallable(OffsetStorageReader offsetStorageReader, Map<String, String> map) {
            this.offsetStorageReader = offsetStorageReader;
            this.spannerPartition = map;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Map<String, ?> call() throws Exception {
            try {
                return this.offsetStorageReader.offset(this.spannerPartition);
            } catch (Exception e) {
                PartitionOffsetProvider.LOGGER.error("Offsetstoragereader throwing exception", e);
                throw e;
            }
        }
    }

    public PartitionOffsetProvider(OffsetStorageReader offsetStorageReader, MetricsEventPublisher metricsEventPublisher) {
        this.offsetStorageReader = offsetStorageReader;
        this.metricsEventPublisher = metricsEventPublisher;
    }

    public Timestamp getOffset(PartitionState partitionState) {
        Map<String, ?> retrieveOffsetMap = retrieveOffsetMap(new SpannerPartition(partitionState.getToken()).getSourcePartition());
        if (retrieveOffsetMap == null) {
            LOGGER.warn("Token {} returning start timestamp because no offset was retrieved", partitionState);
            return partitionState.getStartTimestamp();
        }
        LOGGER.info("Successfully retrieved offset {} for token {}", retrieveOffsetMap, partitionState);
        return PartitionOffset.extractOffset(retrieveOffsetMap);
    }

    public Map<String, String> getOffsetMap(PartitionState partitionState) {
        Map retrieveOffsetMap = retrieveOffsetMap(new SpannerPartition(partitionState.getToken()).getSourcePartition());
        return retrieveOffsetMap == null ? Map.of() : retrieveOffsetMap;
    }

    public Map<String, Timestamp> getOffsets(Collection<String> collection) {
        Instant now = Instant.now();
        Map offsets = this.offsetStorageReader.offsets((List) collection.stream().map(str -> {
            return new SpannerPartition(str).getSourcePartition();
        }).collect(Collectors.toList()));
        if (offsets == null) {
            return Map.of();
        }
        this.metricsEventPublisher.publishMetricEvent(OffsetReceivingTimeMetricEvent.from(now));
        HashMap hashMap = new HashMap();
        for (Map.Entry entry : offsets.entrySet()) {
            hashMap.put(SpannerPartition.extractToken((Map) entry.getKey()), PartitionOffset.extractOffset((Map) entry.getValue()));
        }
        return hashMap;
    }

    private Map<String, ?> retrieveOffsetMap(Map<String, String> map) {
        Instant now = Instant.now();
        Map<String, ?> map2 = null;
        Future submit = this.executor.submit(new ExecutorServiceCallable(this.offsetStorageReader, map));
        try {
            try {
                map2 = (Map) submit.get(5L, TimeUnit.SECONDS);
                submit.cancel(true);
            } catch (InterruptedException e) {
                LOGGER.error("Token {},interrupting PartitionOffsetProvider", map, e);
                Thread.currentThread().interrupt();
                submit.cancel(true);
            } catch (ExecutionException e2) {
                LOGGER.error("Token {}, failed to retrieve offset {}:{}", new Object[]{map, e2.toString(), e2.getStackTrace()});
                submit.cancel(true);
            } catch (TimeoutException e3) {
                LOGGER.error("Token {}, failed to retrieve offset in time", map, e3);
                submit.cancel(true);
            }
            this.metricsEventPublisher.publishMetricEvent(OffsetReceivingTimeMetricEvent.from(now));
            return map2;
        } catch (Throwable th) {
            submit.cancel(true);
            throw th;
        }
    }
}
