package io.debezium.connector.spanner.db.stream;

import io.debezium.connector.spanner.db.dao.ChangeStreamDao;
import io.debezium.connector.spanner.db.dao.ChangeStreamResultSet;
import io.debezium.connector.spanner.db.mapper.ChangeStreamRecordMapper;
import io.debezium.connector.spanner.db.model.Partition;
import io.debezium.connector.spanner.db.model.event.ChangeStreamEvent;
import io.debezium.connector.spanner.db.model.event.ChildPartitionsEvent;
import io.debezium.connector.spanner.db.model.event.FinishPartitionEvent;
import io.debezium.connector.spanner.db.model.event.HeartbeatEvent;
import io.debezium.connector.spanner.metrics.MetricsEventPublisher;
import io.debezium.connector.spanner.metrics.event.DelayChangeStreamEventsMetricEvent;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/spanner/db/stream/SpannerChangeStreamService.class */
public class SpannerChangeStreamService {
    private static final Logger LOGGER = LoggerFactory.getLogger(SpannerChangeStreamService.class);
    private final ChangeStreamDao changeStreamDao;
    private final ChangeStreamRecordMapper changeStreamRecordMapper;
    private final Duration heartbeatMillis;
    private final MetricsEventPublisher metricsEventPublisher;
    private final String taskUid;

    public SpannerChangeStreamService(String str, ChangeStreamDao changeStreamDao, ChangeStreamRecordMapper changeStreamRecordMapper, Duration duration, MetricsEventPublisher metricsEventPublisher) {
        this.changeStreamDao = changeStreamDao;
        this.changeStreamRecordMapper = changeStreamRecordMapper;
        this.heartbeatMillis = duration;
        this.metricsEventPublisher = metricsEventPublisher;
        this.taskUid = str;
    }

    public void getEvents(Partition partition, ChangeStreamEventConsumer changeStreamEventConsumer, PartitionEventListener partitionEventListener) throws InterruptedException, Exception {
        String token = partition.getToken();
        partitionEventListener.onRun(partition);
        LOGGER.info("Task: {}, Streaming {} from {} to {}", new Object[]{this.taskUid, token, partition.getStartTimestamp(), partition.getEndTimestamp()});
        try {
            ChangeStreamResultSet streamQuery = this.changeStreamDao.streamQuery(token, partition.getStartTimestamp(), partition.getEndTimestamp(), this.heartbeatMillis.toMillis());
            try {
                long now = now();
                while (streamQuery.next()) {
                    long now2 = now() - now;
                    List<ChangeStreamEvent> changeStreamEvents = this.changeStreamRecordMapper.toChangeStreamEvents(partition, streamQuery, streamQuery.getMetadata());
                    LOGGER.debug("Task: {}, Events receive from stream: {}", this.taskUid, changeStreamEvents);
                    if (!changeStreamEvents.isEmpty() && (changeStreamEvents.get(0) instanceof HeartbeatEvent)) {
                        HeartbeatEvent heartbeatEvent = (HeartbeatEvent) changeStreamEvents.get(0);
                        long currentTimeMillis = System.currentTimeMillis() - heartbeatEvent.getRecordTimestamp().toSqlTimestamp().toInstant().toEpochMilli();
                        if (currentTimeMillis > 60000) {
                            LOGGER.warn("Task: {}, heartbeat has very old timestamp, lag: {}, token: {}, event: {}", new Object[]{this.taskUid, Long.valueOf(currentTimeMillis), heartbeatEvent.getMetadata().getPartitionToken(), heartbeatEvent});
                        }
                    }
                    processEvents(partition, changeStreamEvents, changeStreamEventConsumer);
                    if (!changeStreamEvents.isEmpty() && !(changeStreamEvents.get(0) instanceof HeartbeatEvent)) {
                        this.metricsEventPublisher.publishMetricEvent(new DelayChangeStreamEventsMetricEvent((int) now2));
                    }
                    now = now();
                }
                if (streamQuery != null) {
                    streamQuery.close();
                }
            } finally {
            }
        } catch (InterruptedException e) {
            LOGGER.info("task {}, Interrupting streaming partition task with token {}", this.taskUid, partition.getToken());
            Thread.currentThread().interrupt();
        }
        partitionEventListener.onFinish(partition);
        LOGGER.info("Task {}, Finished consuming partition {}", this.taskUid, partition);
        changeStreamEventConsumer.acceptChangeStreamEvent(new FinishPartitionEvent(partition));
    }

    private long now() {
        return Instant.now().toEpochMilli();
    }

    private void processEvents(Partition partition, List<ChangeStreamEvent> list, ChangeStreamEventConsumer changeStreamEventConsumer) throws InterruptedException {
        for (ChangeStreamEvent changeStreamEvent : list) {
            if (changeStreamEvent instanceof ChildPartitionsEvent) {
                LOGGER.info("Task: {}, Received child partition from partition {}:{}", new Object[]{this.taskUid, partition.getToken(), (ChildPartitionsEvent) changeStreamEvent});
            }
            LOGGER.debug("Task: {}, Received record from partition {}: {}", new Object[]{this.taskUid, partition.getToken(), changeStreamEvent});
            changeStreamEventConsumer.acceptChangeStreamEvent(changeStreamEvent);
        }
    }
}
