package io.debezium.connector.common;

import io.debezium.DebeziumException;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.doc.FixFor;
import io.debezium.function.LogPositionValidator;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.pipeline.ChangeEventSourceCoordinator;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Offsets;
import io.debezium.pipeline.spi.Partition;
import io.debezium.relational.history.SchemaHistory;
import io.debezium.schema.DatabaseSchema;
import io.debezium.schema.HistorizedDatabaseSchema;
import io.debezium.spi.snapshot.Snapshotter;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTaskContext;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:io/debezium/connector/common/BaseSourceTaskSnapshotModesValidationTest.class */
public class BaseSourceTaskSnapshotModesValidationTest {
    private final MyBaseSourceTask baseSourceTask = new MyBaseSourceTask();

    /* loaded from: input_file:io/debezium/connector/common/BaseSourceTaskSnapshotModesValidationTest$MyBaseSourceTask.class */
    public static class MyBaseSourceTask extends BaseSourceTask<Partition, OffsetContext> {
        final List<SourceRecord> records = new ArrayList();
        final AtomicInteger startCount = new AtomicInteger();
        final AtomicInteger stopCount = new AtomicInteger();
        final ChangeEventSourceCoordinator<Partition, OffsetContext> coordinator = (ChangeEventSourceCoordinator) Mockito.mock(ChangeEventSourceCoordinator.class);

        protected ChangeEventSourceCoordinator<Partition, OffsetContext> start(Configuration configuration) {
            this.startCount.incrementAndGet();
            return this.coordinator;
        }

        protected List<SourceRecord> doPoll() {
            return this.records;
        }

        protected void resetErrorHandlerRetriesIfNeeded(List<SourceRecord> list) {
        }

        protected void doStop() {
            this.stopCount.incrementAndGet();
        }

        protected Iterable<Field> getAllConfigurationFields() {
            return List.of(Field.create("f1"));
        }

        public String version() {
            return "1.0";
        }
    }

    @Before
    public void setup() {
        this.baseSourceTask.initialize((SourceTaskContext) Mockito.mock(SourceTaskContext.class));
    }

    @Test
    @FixFor({"DBZ-7780"})
    public void whenSnapshotModePermitsSchemaOrDataAndSnapshotIsNotCompletedOnConnectorRestartsValidateMustPass() {
        CommonConnectorConfig commonConnectorConfig = (CommonConnectorConfig) Mockito.mock(CommonConnectorConfig.class);
        LogPositionValidator logPositionValidator = (partition, offsetContext, commonConnectorConfig2) -> {
            return true;
        };
        Partition partition2 = (Partition) Mockito.mock(Partition.class);
        OffsetContext offsetContext2 = (OffsetContext) Mockito.mock(OffsetContext.class);
        Mockito.when(Boolean.valueOf(offsetContext2.isInitialSnapshotRunning())).thenReturn(true);
        Offsets of = Offsets.of(partition2, offsetContext2);
        DatabaseSchema databaseSchema = (DatabaseSchema) Mockito.mock(DatabaseSchema.class);
        Snapshotter snapshotter = (Snapshotter) Mockito.mock(Snapshotter.class);
        Mockito.when(Boolean.valueOf(snapshotter.shouldSnapshotData(true, true))).thenReturn(false);
        Mockito.when(Boolean.valueOf(snapshotter.shouldSnapshotSchema(true, true))).thenReturn(true);
        Assertions.assertThatCode(() -> {
            this.baseSourceTask.validateAndLoadSchemaHistory(commonConnectorConfig, logPositionValidator, of, databaseSchema, snapshotter);
        }).doesNotThrowAnyException();
    }

    @Test
    public void whenSnapshotModeNotPermitsSchemaAndDataAndSnapshotIsNotCompletedOnConnectorRestartsExceptionWillBeThrown() {
        CommonConnectorConfig commonConnectorConfig = (CommonConnectorConfig) Mockito.mock(CommonConnectorConfig.class);
        LogPositionValidator logPositionValidator = (partition, offsetContext, commonConnectorConfig2) -> {
            return true;
        };
        Partition partition2 = (Partition) Mockito.mock(Partition.class);
        OffsetContext offsetContext2 = (OffsetContext) Mockito.mock(OffsetContext.class);
        Mockito.when(Boolean.valueOf(offsetContext2.isInitialSnapshotRunning())).thenReturn(true);
        Offsets of = Offsets.of(partition2, offsetContext2);
        DatabaseSchema databaseSchema = (DatabaseSchema) Mockito.mock(DatabaseSchema.class);
        Snapshotter snapshotter = (Snapshotter) Mockito.mock(Snapshotter.class);
        Mockito.when(Boolean.valueOf(snapshotter.shouldSnapshotData(true, true))).thenReturn(false);
        Mockito.when(Boolean.valueOf(snapshotter.shouldSnapshotSchema(true, true))).thenReturn(false);
        Assertions.assertThatCode(() -> {
            this.baseSourceTask.validateAndLoadSchemaHistory(commonConnectorConfig, logPositionValidator, of, databaseSchema, snapshotter);
        }).isInstanceOf(DebeziumException.class).hasMessage("The connector previously stopped while taking a snapshot, but now the connector is configured to never allow snapshots. Reconfigure the connector to use snapshots initially or when needed.");
    }

    @Test
    public void whenNoOffsetExistsAndSnapshotPermitsSchemaRecoveryAnExceptionWillBeThrown() {
        CommonConnectorConfig commonConnectorConfig = (CommonConnectorConfig) Mockito.mock(CommonConnectorConfig.class);
        LogPositionValidator logPositionValidator = (partition, offsetContext, commonConnectorConfig2) -> {
            return true;
        };
        Offsets of = Offsets.of((Partition) Mockito.mock(Partition.class), (OffsetContext) null);
        DatabaseSchema databaseSchema = (DatabaseSchema) Mockito.mock(DatabaseSchema.class);
        Snapshotter snapshotter = (Snapshotter) Mockito.mock(Snapshotter.class);
        Mockito.when(Boolean.valueOf(snapshotter.shouldSnapshotOnSchemaError())).thenReturn(true);
        Assertions.assertThatThrownBy(() -> {
            this.baseSourceTask.validateAndLoadSchemaHistory(commonConnectorConfig, logPositionValidator, of, databaseSchema, snapshotter);
        }).isInstanceOf(DebeziumException.class).hasMessage("Could not find existing redo log information while attempting schema only recovery snapshot");
    }

    @Test
    public void whenNoOffsetExistsAndDatabaseIsHistorizedThenSchemaStorageIsInitialized() {
        CommonConnectorConfig commonConnectorConfig = (CommonConnectorConfig) Mockito.mock(CommonConnectorConfig.class);
        LogPositionValidator logPositionValidator = (partition, offsetContext, commonConnectorConfig2) -> {
            return true;
        };
        Offsets of = Offsets.of((Partition) Mockito.mock(Partition.class), (OffsetContext) null);
        DatabaseSchema databaseSchema = (HistorizedDatabaseSchema) Mockito.mock(HistorizedDatabaseSchema.class);
        Mockito.when(Boolean.valueOf(databaseSchema.isHistorized())).thenReturn(true);
        Mockito.when(databaseSchema.getSchemaHistory()).thenReturn((SchemaHistory) Mockito.mock(SchemaHistory.class));
        this.baseSourceTask.validateAndLoadSchemaHistory(commonConnectorConfig, logPositionValidator, of, databaseSchema, (Snapshotter) Mockito.mock(Snapshotter.class));
        ((HistorizedDatabaseSchema) Mockito.verify(databaseSchema)).initializeStorage();
    }

    @Test
    public void whenCompletedSnapshotExistsAndHistoryNotExistsAndSnapshotOnSchemaErrorThenSchemaStorageIsInitialized() {
        CommonConnectorConfig commonConnectorConfig = (CommonConnectorConfig) Mockito.mock(CommonConnectorConfig.class);
        LogPositionValidator logPositionValidator = (partition, offsetContext, commonConnectorConfig2) -> {
            return true;
        };
        Partition partition2 = (Partition) Mockito.mock(Partition.class);
        OffsetContext offsetContext2 = (OffsetContext) Mockito.mock(OffsetContext.class);
        Mockito.when(Boolean.valueOf(offsetContext2.isInitialSnapshotRunning())).thenReturn(false);
        Offsets of = Offsets.of(partition2, offsetContext2);
        DatabaseSchema databaseSchema = (HistorizedDatabaseSchema) Mockito.mock(HistorizedDatabaseSchema.class);
        Mockito.when(Boolean.valueOf(databaseSchema.isHistorized())).thenReturn(true);
        Mockito.when(databaseSchema.getSchemaHistory()).thenReturn((SchemaHistory) Mockito.mock(SchemaHistory.class));
        Snapshotter snapshotter = (Snapshotter) Mockito.mock(Snapshotter.class);
        Mockito.when(Boolean.valueOf(snapshotter.shouldSnapshotOnSchemaError())).thenReturn(true);
        this.baseSourceTask.validateAndLoadSchemaHistory(commonConnectorConfig, logPositionValidator, of, databaseSchema, snapshotter);
        ((HistorizedDatabaseSchema) Mockito.verify(databaseSchema)).initializeStorage();
    }

    @Test
    public void whenCompletedSnapshotExistsAndHistoryNotExistsAndSnapshotOnSchemaErrorIsFalseThenAnExceptionWillBeThrown() {
        CommonConnectorConfig commonConnectorConfig = (CommonConnectorConfig) Mockito.mock(CommonConnectorConfig.class);
        LogPositionValidator logPositionValidator = (partition, offsetContext, commonConnectorConfig2) -> {
            return true;
        };
        Partition partition2 = (Partition) Mockito.mock(Partition.class);
        OffsetContext offsetContext2 = (OffsetContext) Mockito.mock(OffsetContext.class);
        Mockito.when(Boolean.valueOf(offsetContext2.isInitialSnapshotRunning())).thenReturn(false);
        Offsets of = Offsets.of(partition2, offsetContext2);
        HistorizedDatabaseSchema historizedDatabaseSchema = (HistorizedDatabaseSchema) Mockito.mock(HistorizedDatabaseSchema.class);
        Mockito.when(Boolean.valueOf(historizedDatabaseSchema.isHistorized())).thenReturn(true);
        Mockito.when(historizedDatabaseSchema.getSchemaHistory()).thenReturn((SchemaHistory) Mockito.mock(SchemaHistory.class));
        Snapshotter snapshotter = (Snapshotter) Mockito.mock(Snapshotter.class);
        Mockito.when(Boolean.valueOf(snapshotter.shouldSnapshotOnSchemaError())).thenReturn(false);
        Assertions.assertThatThrownBy(() -> {
            this.baseSourceTask.validateAndLoadSchemaHistory(commonConnectorConfig, logPositionValidator, of, historizedDatabaseSchema, snapshotter);
        }).isInstanceOf(DebeziumException.class).hasMessage("The db history topic is missing. You may attempt to recover it by reconfiguring the connector to recovery.");
    }

    @Test
    public void whenCompletedSnapshotExistsAndStoredOffsetPositionIsPresentOnDbLogThenSchemaWillBeRecovered() {
        CommonConnectorConfig commonConnectorConfig = (CommonConnectorConfig) Mockito.mock(CommonConnectorConfig.class);
        Mockito.when(Boolean.valueOf(commonConnectorConfig.isLogPositionCheckEnabled())).thenReturn(true);
        LogPositionValidator logPositionValidator = (partition, offsetContext, commonConnectorConfig2) -> {
            return true;
        };
        Partition partition2 = (Partition) Mockito.mock(Partition.class);
        OffsetContext offsetContext2 = (OffsetContext) Mockito.mock(OffsetContext.class);
        Mockito.when(Boolean.valueOf(offsetContext2.isInitialSnapshotRunning())).thenReturn(false);
        Offsets of = Offsets.of(partition2, offsetContext2);
        DatabaseSchema databaseSchema = (HistorizedDatabaseSchema) Mockito.mock(HistorizedDatabaseSchema.class);
        Mockito.when(Boolean.valueOf(databaseSchema.isHistorized())).thenReturn(true);
        SchemaHistory schemaHistory = (SchemaHistory) Mockito.mock(SchemaHistory.class);
        Mockito.when(databaseSchema.getSchemaHistory()).thenReturn(schemaHistory);
        Mockito.when(Boolean.valueOf(schemaHistory.exists())).thenReturn(true);
        this.baseSourceTask.validateAndLoadSchemaHistory(commonConnectorConfig, logPositionValidator, of, databaseSchema, (Snapshotter) Mockito.mock(Snapshotter.class));
        ((HistorizedDatabaseSchema) Mockito.verify(databaseSchema)).recover(partition2, offsetContext2);
    }

    @Test
    public void whenCompletedSnapshotExistsAndStoredOffsetPositionIsNotPresentOnDbLogThenAWarnShouldBeLogged() {
        LogInterceptor logInterceptor = new LogInterceptor((Class<?>) BaseSourceTask.class);
        CommonConnectorConfig commonConnectorConfig = (CommonConnectorConfig) Mockito.mock(CommonConnectorConfig.class);
        Mockito.when(Boolean.valueOf(commonConnectorConfig.isLogPositionCheckEnabled())).thenReturn(true);
        LogPositionValidator logPositionValidator = (partition, offsetContext, commonConnectorConfig2) -> {
            return false;
        };
        Partition partition2 = (Partition) Mockito.mock(Partition.class);
        OffsetContext offsetContext2 = (OffsetContext) Mockito.mock(OffsetContext.class);
        Mockito.when(Boolean.valueOf(offsetContext2.isInitialSnapshotRunning())).thenReturn(false);
        Offsets of = Offsets.of(partition2, offsetContext2);
        DatabaseSchema databaseSchema = (HistorizedDatabaseSchema) Mockito.mock(HistorizedDatabaseSchema.class);
        Mockito.when(Boolean.valueOf(databaseSchema.isHistorized())).thenReturn(true);
        SchemaHistory schemaHistory = (SchemaHistory) Mockito.mock(SchemaHistory.class);
        Mockito.when(databaseSchema.getSchemaHistory()).thenReturn(schemaHistory);
        Mockito.when(Boolean.valueOf(schemaHistory.exists())).thenReturn(true);
        this.baseSourceTask.validateAndLoadSchemaHistory(commonConnectorConfig, logPositionValidator, of, databaseSchema, (Snapshotter) Mockito.mock(Snapshotter.class));
        Assertions.assertThat(logInterceptor.containsWarnMessage("The connector is trying to read redo log starting at " + String.valueOf(offsetContext2) + ", but this is no longer available on the server. Reconfigure the connector to use a snapshot when needed if you want to recover. If not the connector will streaming from the last available position in the log")).isTrue();
    }

    @Test
    public void whenCompletedSnapshotExistsAndStoredOffsetPositionIsNotPresentOnDbLogAndSnapshotOnDataErrorThenOffsetWillBeReset() {
        CommonConnectorConfig commonConnectorConfig = (CommonConnectorConfig) Mockito.mock(CommonConnectorConfig.class);
        Mockito.when(Boolean.valueOf(commonConnectorConfig.isLogPositionCheckEnabled())).thenReturn(true);
        LogPositionValidator logPositionValidator = (partition, offsetContext, commonConnectorConfig2) -> {
            return false;
        };
        Partition partition2 = (Partition) Mockito.mock(Partition.class);
        OffsetContext offsetContext2 = (OffsetContext) Mockito.mock(OffsetContext.class);
        Mockito.when(Boolean.valueOf(offsetContext2.isInitialSnapshotRunning())).thenReturn(false);
        Offsets of = Offsets.of(partition2, offsetContext2);
        DatabaseSchema databaseSchema = (HistorizedDatabaseSchema) Mockito.mock(HistorizedDatabaseSchema.class);
        Mockito.when(Boolean.valueOf(databaseSchema.isHistorized())).thenReturn(true);
        Mockito.when(databaseSchema.getSchemaHistory()).thenReturn((SchemaHistory) Mockito.mock(SchemaHistory.class));
        Mockito.when(Boolean.valueOf(databaseSchema.getSchemaHistory().exists())).thenReturn(true);
        Snapshotter snapshotter = (Snapshotter) Mockito.mock(Snapshotter.class);
        Mockito.when(Boolean.valueOf(snapshotter.shouldSnapshotOnDataError())).thenReturn(true);
        this.baseSourceTask.validateAndLoadSchemaHistory(commonConnectorConfig, logPositionValidator, of, databaseSchema, snapshotter);
        Assertions.assertThat(of.getTheOnlyOffset()).isNull();
    }
}
