package io.debezium.connector.spanner;

import io.debezium.bean.StandardBeanNames;
import io.debezium.config.Configuration;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.spanner.config.SpannerTableFilter;
import io.debezium.connector.spanner.context.source.SourceInfoFactory;
import io.debezium.connector.spanner.context.source.SpannerSourceTaskContext;
import io.debezium.connector.spanner.db.DaoFactory;
import io.debezium.connector.spanner.db.DatabaseClientFactory;
import io.debezium.connector.spanner.db.SpannerChangeStreamFactory;
import io.debezium.connector.spanner.db.metadata.SchemaRegistry;
import io.debezium.connector.spanner.db.stream.ChangeStream;
import io.debezium.connector.spanner.kafka.KafkaAdminClientFactory;
import io.debezium.connector.spanner.kafka.KafkaPartitionInfoProvider;
import io.debezium.connector.spanner.metrics.SpannerChangeEventSourceMetricsFactory;
import io.debezium.connector.spanner.metrics.SpannerMeter;
import io.debezium.connector.spanner.processor.SourceRecordUtils;
import io.debezium.connector.spanner.processor.SpannerEventDispatcher;
import io.debezium.connector.spanner.processor.heartbeat.SpannerHeartbeatFactory;
import io.debezium.connector.spanner.processor.metadata.SpannerEventMetadataProvider;
import io.debezium.connector.spanner.schema.KafkaSpannerSchema;
import io.debezium.connector.spanner.schema.KafkaSpannerTableSchemaFactory;
import io.debezium.connector.spanner.task.LowWatermarkHolder;
import io.debezium.connector.spanner.task.PartitionOffsetProvider;
import io.debezium.connector.spanner.task.SynchronizationTaskContext;
import io.debezium.connector.spanner.task.SynchronizedPartitionManager;
import io.debezium.connector.spanner.task.TaskUid;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.pipeline.notification.NotificationService;
import io.debezium.processors.PostProcessorRegistryServiceProvider;
import io.debezium.schema.DefaultTopicNamingStrategy;
import io.debezium.schema.SchemaFactory;
import io.debezium.schema.SchemaNameAdjuster;
import io.debezium.service.spi.ServiceRegistry;
import io.debezium.snapshot.SnapshotterService;
import java.time.Instant;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/spanner/SpannerConnectorTask.class */
public class SpannerConnectorTask extends SpannerBaseSourceTask {
    private static final Logger LOGGER = LoggerFactory.getLogger(SpannerConnectorTask.class);
    private static final String CONTEXT_NAME = "spanner-connector-task";
    private volatile ChangeEventQueue<DataChangeEvent> queue;
    private volatile SynchronizationTaskContext synchronizationTaskContext;
    private volatile String taskUid;
    private volatile SpannerMeter spannerMeter;
    private volatile LowWatermarkHolder lowWatermarkHolder;
    private volatile KafkaAdminClientFactory adminClientFactory;
    private volatile ChangeStream changeStream;
    private volatile SpannerEventDispatcher dispatcher;
    private volatile KafkaSpannerSchema schema;
    private volatile boolean beganPolling = false;

    /* JADX INFO: Access modifiers changed from: protected */
    /* renamed from: start, reason: merged with bridge method [inline-methods] */
    public SpannerChangeEventSourceCoordinator m8start(Configuration configuration) {
        SpannerConnectorConfig spannerConnectorConfig = new SpannerConnectorConfig(configuration);
        this.taskUid = TaskUid.generateTaskUid(spannerConnectorConfig.getConnectorName(), spannerConnectorConfig.getTaskId());
        LOGGER.info("Starting task with uid: {}", this.taskUid);
        DatabaseClientFactory databaseClientFactory = getDatabaseClientFactory(spannerConnectorConfig);
        DaoFactory daoFactory = new DaoFactory(databaseClientFactory);
        SpannerSourceTaskContext spannerSourceTaskContext = new SpannerSourceTaskContext(spannerConnectorConfig, () -> {
            return this.spannerMeter.getCapturedTables();
        });
        this.queue = new ChangeEventQueue.Builder().pollInterval(spannerConnectorConfig.getPollInterval()).maxBatchSize(spannerConnectorConfig.getMaxBatchSize()).maxQueueSize(spannerConnectorConfig.getMaxQueueSize()).maxQueueSizeInBytes(spannerConnectorConfig.getMaxQueueSizeInBytes()).loggingContextSupplier(() -> {
            return spannerSourceTaskContext.configureLoggingContext(CONTEXT_NAME);
        }).build();
        SpannerErrorHandler spannerErrorHandler = new SpannerErrorHandler(this, this.queue);
        this.spannerMeter = new SpannerMeter(this, spannerConnectorConfig, spannerErrorHandler, () -> {
            return this.lowWatermarkHolder.getLowWatermark();
        });
        SchemaNameAdjuster schemaNameAdjuster = spannerConnectorConfig.schemaNameAdjuster();
        SpannerTableFilter spannerTableFilter = new SpannerTableFilter(spannerConnectorConfig);
        SpannerEventMetadataProvider spannerEventMetadataProvider = new SpannerEventMetadataProvider();
        DefaultTopicNamingStrategy create = DefaultTopicNamingStrategy.create(spannerConnectorConfig);
        SchemaRegistry schemaRegistry = new SchemaRegistry(spannerConnectorConfig.changeStreamName(), daoFactory.getSchemaDao(), () -> {
            this.schema.resetCache();
        });
        this.schema = new KafkaSpannerSchema(new KafkaSpannerTableSchemaFactory(create, schemaNameAdjuster, schemaRegistry, spannerConnectorConfig.getSourceInfoStructMaker().schema()));
        SpannerHeartbeatFactory spannerHeartbeatFactory = new SpannerHeartbeatFactory(spannerConnectorConfig, create, schemaNameAdjuster);
        PartitionOffsetProvider partitionOffsetProvider = new PartitionOffsetProvider(this.context.offsetStorageReader(), this.spannerMeter.getMetricsEventPublisher());
        SynchronizedPartitionManager synchronizedPartitionManager = new SynchronizedPartitionManager(taskStateChangeEvent -> {
            this.synchronizationTaskContext.publishEvent(taskStateChangeEvent);
        });
        this.changeStream = new SpannerChangeStreamFactory(this.taskUid, daoFactory, this.spannerMeter.getMetricsEventPublisher(), spannerConnectorConfig.getConnectorName(), databaseClientFactory).getStream(spannerConnectorConfig.changeStreamName(), spannerConnectorConfig.getHeartbeatInterval(), spannerConnectorConfig.getMaxMissedHeartbeats());
        this.lowWatermarkHolder = new LowWatermarkHolder();
        SourceInfoFactory sourceInfoFactory = new SourceInfoFactory(spannerConnectorConfig, this.lowWatermarkHolder);
        this.adminClientFactory = new KafkaAdminClientFactory(spannerConnectorConfig);
        this.dispatcher = new SpannerEventDispatcher(spannerConnectorConfig, create, this.schema, this.queue, spannerTableFilter, new SpannerChangeEventCreator(), spannerEventMetadataProvider, spannerHeartbeatFactory, schemaNameAdjuster, schemaRegistry, sourceInfoFactory, new KafkaPartitionInfoProvider(this.adminClientFactory.getAdminClient()));
        this.synchronizationTaskContext = new SynchronizationTaskContext(this, spannerConnectorConfig, spannerErrorHandler, partitionOffsetProvider, this.changeStream, this.dispatcher, this.adminClientFactory, schemaRegistry, this::finish, this.spannerMeter.getMetricsEventPublisher(), this.lowWatermarkHolder);
        SpannerChangeEventSourceFactory spannerChangeEventSourceFactory = new SpannerChangeEventSourceFactory(spannerConnectorConfig, this.dispatcher, spannerErrorHandler, schemaRegistry, this.spannerMeter, this.changeStream, sourceInfoFactory, synchronizedPartitionManager);
        List notificationChannels = getNotificationChannels();
        SchemaFactory schemaFactory = SchemaFactory.get();
        SpannerEventDispatcher spannerEventDispatcher = this.dispatcher;
        Objects.requireNonNull(spannerEventDispatcher);
        NotificationService notificationService = new NotificationService(notificationChannels, spannerConnectorConfig, schemaFactory, spannerEventDispatcher::enqueueNotification);
        spannerConnectorConfig.getBeanRegistry().add(StandardBeanNames.CONFIGURATION, configuration);
        spannerConnectorConfig.getBeanRegistry().add("ConnectorConfig", spannerConnectorConfig);
        spannerConnectorConfig.getBeanRegistry().add("Schema", this.schema);
        registerServiceProviders(spannerConnectorConfig.getServiceRegistry());
        this.coordinator = new SpannerChangeEventSourceCoordinator(getInitialOffsets(), spannerErrorHandler, SpannerConnector.class, spannerConnectorConfig, spannerChangeEventSourceFactory, new SpannerChangeEventSourceMetricsFactory(this.spannerMeter), this.dispatcher, this.schema, notificationService, spannerConnectorConfig.getServiceRegistry().tryGetService(SnapshotterService.class));
        this.spannerMeter.start();
        this.coordinator.start(spannerSourceTaskContext, this.queue, spannerEventMetadataProvider);
        this.synchronizationTaskContext.init();
        LOGGER.info("Finished starting task {}", this.taskUid);
        return this.coordinator;
    }

    DatabaseClientFactory getDatabaseClientFactory(SpannerConnectorConfig spannerConnectorConfig) {
        return new DatabaseClientFactory(spannerConnectorConfig);
    }

    protected List<SourceRecord> doPoll() throws InterruptedException {
        if (!this.beganPolling) {
            this.beganPolling = true;
            LOGGER.info("Task {}, began polling", this.taskUid);
        }
        List poll = this.queue.poll();
        long epochMilli = Instant.now().toEpochMilli();
        List<SourceRecord> list = (List) poll.stream().map((v0) -> {
            return v0.getRecord();
        }).map(sourceRecord -> {
            return SourceRecordUtils.addPollTimestamp(sourceRecord, epochMilli);
        }).collect(Collectors.toList());
        if (!list.isEmpty()) {
            LOGGER.debug("Records sent to Kafka: {}", list);
        }
        return list;
    }

    @Override // io.debezium.connector.spanner.SpannerBaseSourceTask
    protected void onRecordSent(SourceRecord sourceRecord) {
        this.spannerMeter.getMetricsEventPublisher().logLatency(sourceRecord);
    }

    protected void doStop() {
        LOGGER.info("Stopping task {}, changeStream", this.taskUid);
        this.changeStream.stop();
        LOGGER.info("Stopping task {}, synchronizationTaskContext", this.taskUid);
        this.synchronizationTaskContext.destroy();
        LOGGER.info("Stopping task {}, dispatcher", this.taskUid);
        this.dispatcher.destroy();
        LOGGER.info("Stopping task {}, adminClientFactory", this.taskUid);
        this.adminClientFactory.close();
        LOGGER.info("Stopping task {}, spannerMeter", this.taskUid);
        this.spannerMeter.shutdown();
        LOGGER.info("Task {} was stopped", this.taskUid);
    }

    public void finish() {
        this.queue.producerException(new ConnectException("Task " + this.taskUid + " finished work"));
    }

    public void restart() {
        this.queue.producerException(new RetriableException("Task " + this.taskUid + " will be restarted"));
    }

    public String getTaskUid() {
        return this.taskUid;
    }

    protected void registerServiceProviders(ServiceRegistry serviceRegistry) {
        serviceRegistry.registerServiceProvider(new PostProcessorRegistryServiceProvider());
    }
}
