package io.debezium.connector.common;

import io.debezium.DebeziumException;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.data.Envelope;
import io.debezium.function.LogPositionValidator;
import io.debezium.pipeline.ChangeEventSourceCoordinator;
import io.debezium.pipeline.notification.channels.NotificationChannel;
import io.debezium.pipeline.signal.channels.SignalChannelReader;
import io.debezium.pipeline.signal.channels.process.SignalChannelWriter;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Offsets;
import io.debezium.pipeline.spi.Partition;
import io.debezium.processors.PostProcessorRegistryServiceProvider;
import io.debezium.schema.DatabaseSchema;
import io.debezium.schema.HistorizedDatabaseSchema;
import io.debezium.service.spi.ServiceRegistry;
import io.debezium.snapshot.SnapshotLockProvider;
import io.debezium.snapshot.SnapshotQueryProvider;
import io.debezium.snapshot.SnapshotterServiceProvider;
import io.debezium.spi.snapshot.Snapshotter;
import io.debezium.util.Clock;
import io.debezium.util.ElapsedTimeStrategy;
import io.debezium.util.Loggings;
import io.debezium.util.Metronome;
import io.debezium.util.Strings;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/common/BaseSourceTask.class */
public abstract class BaseSourceTask<P extends Partition, O extends OffsetContext> extends SourceTask {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) BaseSourceTask.class);
    private static final Duration INITIAL_POLL_PERIOD_IN_MILLIS = Duration.ofMillis(TimeUnit.SECONDS.toMillis(5));
    private static final Duration MAX_POLL_PERIOD_IN_MILLIS = Duration.ofMillis(TimeUnit.HOURS.toMillis(1));
    private Configuration config;
    private List<SignalChannelReader> signalChannels;
    private volatile ElapsedTimeStrategy restartDelay;
    protected ChangeEventSourceCoordinator<P, O> coordinator;
    private Duration retriableRestartWait;
    private int previousOutputBatchSize;
    private final AtomicReference<State> state = new AtomicReference<>(State.INITIAL);
    private final ReentrantLock stateLock = new ReentrantLock();
    private final Map<Map<String, ?>, Map<String, ?>> lastOffsets = new HashMap();
    private final Clock clock = Clock.system();
    private final ServiceLoader<SignalChannelReader> availableSignalChannels = ServiceLoader.load(SignalChannelReader.class);
    private boolean offsetLoadedInPast = false;
    private final ElapsedTimeStrategy pollOutputDelay = ElapsedTimeStrategy.exponential(this.clock, INITIAL_POLL_PERIOD_IN_MILLIS, MAX_POLL_PERIOD_IN_MILLIS);
    private Instant previousOutputInstant = this.clock.currentTimeAsInstant();
    private final List<NotificationChannel> notificationChannels = (List) StreamSupport.stream(ServiceLoader.load(NotificationChannel.class).spliterator(), false).collect(Collectors.toList());

    /* loaded from: input_file:io/debezium/connector/common/BaseSourceTask$State.class */
    public enum State {
        RESTARTING,
        RUNNING,
        INITIAL,
        STOPPED
    }

    protected void validateAndLoadSchemaHistory(CommonConnectorConfig commonConnectorConfig, LogPositionValidator logPositionValidator, Offsets<P, O> offsets, DatabaseSchema databaseSchema, Snapshotter snapshotter) {
        Iterator<Map.Entry<P, O>> it = offsets.iterator();
        while (it.hasNext()) {
            Map.Entry<P, O> next = it.next();
            P key = next.getKey();
            O value = next.getValue();
            if (value == null) {
                if (snapshotter.shouldSnapshotOnSchemaError()) {
                    throw new DebeziumException("Could not find existing redo log information while attempting schema only recovery snapshot");
                }
                LOGGER.info("Connector started for the first time.");
                if (databaseSchema.isHistorized()) {
                    ((HistorizedDatabaseSchema) databaseSchema).initializeStorage();
                    return;
                }
                return;
            }
            if (value.isInitialSnapshotRunning()) {
                if (!snapshotter.shouldSnapshotData(true, true) && !snapshotter.shouldSnapshotSchema(true, true)) {
                    throw new DebeziumException("The connector previously stopped while taking a snapshot, but now the connector is configured to never allow snapshots. Reconfigure the connector to use snapshots initially or when needed.");
                }
            } else {
                if (databaseSchema.isHistorized() && !((HistorizedDatabaseSchema) databaseSchema).historyExists()) {
                    LOGGER.warn("Database schema history was not found but was expected");
                    if (!snapshotter.shouldSnapshotOnSchemaError()) {
                        throw new DebeziumException("The db history topic is missing. You may attempt to recover it by reconfiguring the connector to recovery.");
                    }
                    LOGGER.info("The db-history topic is missing but we are in {} snapshot mode. Attempting to snapshot the current schema and then begin reading the redo log from the last recorded offset.", snapshotter.name());
                    if (databaseSchema.isHistorized()) {
                        ((HistorizedDatabaseSchema) databaseSchema).initializeStorage();
                        return;
                    }
                    return;
                }
                if (commonConnectorConfig.isLogPositionCheckEnabled() && !isLogPositionAvailable(logPositionValidator, key, value, commonConnectorConfig)) {
                    LOGGER.warn("Last recorded offset is no longer available on the server.");
                    if (snapshotter.shouldSnapshotOnDataError()) {
                        LOGGER.info("The last recorded offset is no longer available but we are in {} snapshot mode. Attempting to snapshot data to fill the gap.", snapshotter.name());
                        offsets.resetOffset(offsets.getTheOnlyPartition());
                        return;
                    }
                    LOGGER.warn("The connector is trying to read redo log starting at " + String.valueOf(value) + ", but this is no longer available on the server. Reconfigure the connector to use a snapshot when needed if you want to recover. If not the connector will streaming from the last available position in the log");
                }
                if (databaseSchema.isHistorized()) {
                    ((HistorizedDatabaseSchema) databaseSchema).recover(key, value);
                }
            }
        }
    }

    public boolean isLogPositionAvailable(LogPositionValidator logPositionValidator, Partition partition, OffsetContext offsetContext, CommonConnectorConfig commonConnectorConfig) {
        if (logPositionValidator != null) {
            return logPositionValidator.validate(partition, offsetContext, commonConnectorConfig);
        }
        LOGGER.warn("Current JDBC connection implementation is not providing a log position validator implementation. The check will always be 'true'");
        return true;
    }

    @Override // org.apache.kafka.connect.source.SourceTask, org.apache.kafka.connect.connector.Task
    public final void start(Map<String, String> map) {
        if (this.context == null) {
            throw new ConnectException("Unexpected null context");
        }
        this.stateLock.lock();
        try {
            setTaskState(State.INITIAL);
            this.config = Configuration.from(map);
            this.retriableRestartWait = this.config.getDuration(CommonConnectorConfig.RETRIABLE_RESTART_WAIT, ChronoUnit.MILLIS);
            this.restartDelay = null;
            Configuration configuration = this.config;
            Iterable<Field> allConfigurationFields = getAllConfigurationFields();
            Logger logger = LOGGER;
            Objects.requireNonNull(logger);
            if (!configuration.validateAndRecord(allConfigurationFields, logger::error)) {
                throw new ConnectException("Error configuring an instance of " + getClass().getSimpleName() + "; check the logs for details");
            }
            if (LOGGER.isInfoEnabled()) {
                StringBuilder sb = new StringBuilder("Starting " + getClass().getSimpleName() + " with configuration:");
                withMaskedSensitiveOptions(this.config).forEach((str, str2) -> {
                    sb.append("\n   ").append(str).append(" = ").append(str2);
                });
                sb.append(StringUtils.LF);
                LOGGER.info(sb.toString());
            }
            try {
                this.coordinator = start(this.config);
                setTaskState(State.RUNNING);
            } catch (RetriableException e) {
                LOGGER.warn("Failed to start connector, will re-attempt during polling.", (Throwable) e);
                this.restartDelay = ElapsedTimeStrategy.constant(Clock.system(), this.retriableRestartWait);
                setTaskState(State.RESTARTING);
            }
        } finally {
            this.stateLock.unlock();
        }
    }

    public List<SignalChannelReader> getAvailableSignalChannels() {
        if (this.signalChannels == null) {
            this.signalChannels = (List) this.availableSignalChannels.stream().map((v0) -> {
                return v0.get();
            }).collect(Collectors.toList());
        }
        return this.signalChannels;
    }

    public Optional<? extends SignalChannelWriter> getAvailableSignalChannelWriter() {
        Stream<SignalChannelReader> stream = getAvailableSignalChannels().stream();
        Class<SignalChannelWriter> cls = SignalChannelWriter.class;
        Objects.requireNonNull(SignalChannelWriter.class);
        Stream<SignalChannelReader> filter = stream.filter((v1) -> {
            return r1.isInstance(v1);
        });
        Class<SignalChannelWriter> cls2 = SignalChannelWriter.class;
        Objects.requireNonNull(SignalChannelWriter.class);
        return filter.map((v1) -> {
            return r1.cast(v1);
        }).findFirst();
    }

    protected Configuration withMaskedSensitiveOptions(Configuration configuration) {
        return configuration.withMaskedPasswords();
    }

    protected abstract ChangeEventSourceCoordinator<P, O> start(Configuration configuration);

    @Override // org.apache.kafka.connect.source.SourceTask
    public final List<SourceRecord> poll() throws InterruptedException {
        try {
            if (!startIfNeededAndPossible()) {
                return Collections.emptyList();
            }
            List<SourceRecord> doPoll = doPoll();
            logStatistics(doPoll);
            resetErrorHandlerRetriesIfNeeded(doPoll);
            return doPoll;
        } catch (RetriableException e) {
            stop(true);
            throw e;
        }
    }

    protected void logStatistics(List<SourceRecord> list) {
        int size;
        if (list == null || !LOGGER.isInfoEnabled() || (size = list.size()) <= 0) {
            return;
        }
        if (LOGGER.isDebugEnabled()) {
            LinkedHashMap linkedHashMap = new LinkedHashMap();
            list.forEach(sourceRecord -> {
                linkedHashMap.merge(sourceRecord.topic(), 1, (v0, v1) -> {
                    return Integer.sum(v0, v1);
                });
            });
            for (Map.Entry entry : linkedHashMap.entrySet()) {
                LOGGER.debug("Sending {} records to topic {}", entry.getValue(), entry.getKey());
            }
        }
        SourceRecord sourceRecord2 = list.get(size - 1);
        this.previousOutputBatchSize += size;
        if (this.pollOutputDelay.hasElapsed()) {
            Instant currentTime = this.clock.currentTime();
            LOGGER.info("{} records sent during previous {}, last recorded offset of {} partition is {}", Integer.valueOf(this.previousOutputBatchSize), Strings.duration(Duration.between(this.previousOutputInstant, currentTime).toMillis()), sourceRecord2.sourcePartition(), sourceRecord2.sourceOffset());
            this.previousOutputInstant = currentTime;
            this.previousOutputBatchSize = 0;
        }
    }

    private void updateLastOffset(Map<String, ?> map, Map<String, ?> map2) {
        this.stateLock.lock();
        this.lastOffsets.put(map, map2);
        this.stateLock.unlock();
    }

    protected void resetErrorHandlerRetriesIfNeeded(List<SourceRecord> list) {
        if (!containsChangeDataMessages(list) || this.coordinator == null || this.coordinator.getErrorHandler().getRetries() <= 0) {
            return;
        }
        this.coordinator.getErrorHandler().resetRetries();
    }

    protected boolean containsChangeDataMessages(List<SourceRecord> list) {
        if (list == null || list.isEmpty()) {
            return false;
        }
        for (SourceRecord sourceRecord : list) {
            if (sourceRecord.valueSchema() != null && Envelope.isEnvelopeSchema(sourceRecord.valueSchema())) {
                return true;
            }
        }
        return false;
    }

    protected abstract List<SourceRecord> doPoll() throws InterruptedException;

    private boolean startIfNeededAndPossible() throws InterruptedException {
        this.stateLock.lock();
        boolean z = false;
        try {
            State taskState = getTaskState();
            if (taskState == State.RUNNING) {
                z = true;
            } else if (taskState == State.RESTARTING) {
                if (this.restartDelay.hasElapsed()) {
                    LOGGER.info("Attempting to restart task.");
                    this.coordinator = start(this.config);
                    LOGGER.info("Successfully restarted task");
                    this.restartDelay = null;
                    setTaskState(State.RUNNING);
                    z = true;
                } else {
                    LOGGER.info("Awaiting end of restart backoff period after a retriable error");
                    Metronome.parker(this.retriableRestartWait, Clock.SYSTEM).pause();
                }
            }
            return z;
        } finally {
            this.stateLock.unlock();
        }
    }

    @Override // org.apache.kafka.connect.source.SourceTask, org.apache.kafka.connect.connector.Task
    public final void stop() {
        stop(false);
    }

    private void stop(boolean z) {
        this.stateLock.lock();
        try {
            if (z) {
                LOGGER.warn("Going to restart connector after {} sec. after a retriable exception", Long.valueOf(this.retriableRestartWait.getSeconds()));
            } else {
                LOGGER.info("Stopping down connector");
            }
            try {
                if (this.coordinator != null) {
                    this.coordinator.stop();
                    this.coordinator = null;
                }
                doStop();
                if (z) {
                    setTaskState(State.RESTARTING);
                    if (this.restartDelay == null) {
                        this.restartDelay = ElapsedTimeStrategy.constant(Clock.system(), this.retriableRestartWait);
                    }
                } else {
                    setTaskState(State.STOPPED);
                }
            } catch (InterruptedException e) {
                Thread.interrupted();
                LOGGER.error("Interrupted while stopping coordinator", (Throwable) e);
                throw new ConnectException("Interrupted while stopping coordinator, failing the task");
            }
        } finally {
            this.stateLock.unlock();
        }
    }

    protected abstract void doStop();

    @Override // org.apache.kafka.connect.source.SourceTask
    public void commitRecord(SourceRecord sourceRecord, RecordMetadata recordMetadata) throws InterruptedException {
        LOGGER.trace("Committing record {}", Loggings.maybeRedactSensitiveData(sourceRecord));
        Map<String, ?> sourceOffset = sourceRecord.sourceOffset();
        if (sourceOffset != null) {
            updateLastOffset(sourceRecord.sourcePartition(), sourceOffset);
        }
    }

    @Override // org.apache.kafka.connect.source.SourceTask
    public void commit() throws InterruptedException {
        if (!this.stateLock.tryLock()) {
            LOGGER.info("Couldn't commit processed log positions with the source database due to a concurrent connector shutdown or restart");
            return;
        }
        try {
            if (this.coordinator != null) {
                Iterator<Map<String, ?>> it = this.lastOffsets.keySet().iterator();
                while (it.hasNext()) {
                    Map<String, ?> next = it.next();
                    Map<String, ?> map = this.lastOffsets.get(next);
                    LOGGER.debug("Committing offset '{}' for partition '{}'", next, map);
                    this.coordinator.commitOffset(next, map);
                    it.remove();
                }
            }
        } finally {
            this.stateLock.unlock();
        }
    }

    protected abstract Iterable<Field> getAllConfigurationFields();

    /* JADX INFO: Access modifiers changed from: protected */
    public Offsets<P, O> getPreviousOffsets(Partition.Provider<P> provider, OffsetContext.Loader<O> loader) {
        Set<P> partitions = provider.getPartitions();
        Map<P, O> offsets = new OffsetReader(this.context.offsetStorageReader(), loader).offsets(partitions);
        boolean z = false;
        for (P p : partitions) {
            O o = offsets.get(p);
            if (o != null) {
                z = true;
                if (this.offsetLoadedInPast) {
                    LOGGER.debug("Found previous partition offset {}: {}", p, o.getOffset());
                } else {
                    LOGGER.info("Found previous partition offset {}: {}", p, o.getOffset());
                    this.offsetLoadedInPast = true;
                }
            }
        }
        if (!z) {
            LOGGER.info("No previous offsets found");
        }
        return Offsets.of(offsets);
    }

    private void setTaskState(State state) {
        LOGGER.debug("Setting task state to '{}', previous state was '{}'", state, this.state.getAndSet(state));
    }

    public State getTaskState() {
        this.stateLock.lock();
        try {
            return this.state.get();
        } finally {
            this.stateLock.unlock();
        }
    }

    public List<NotificationChannel> getNotificationChannels() {
        return this.notificationChannels;
    }

    protected void registerServiceProviders(ServiceRegistry serviceRegistry) {
        serviceRegistry.registerServiceProvider(new PostProcessorRegistryServiceProvider());
        serviceRegistry.registerServiceProvider(new SnapshotLockProvider());
        serviceRegistry.registerServiceProvider(new SnapshotQueryProvider());
        serviceRegistry.registerServiceProvider(new SnapshotterServiceProvider());
    }
}
