package io.debezium.connector.spanner.db.metadata;

import com.google.cloud.Timestamp;
import com.google.cloud.spanner.Dialect;
import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.SpannerException;
import io.debezium.connector.spanner.db.dao.SchemaDao;
import io.debezium.connector.spanner.db.model.schema.ChangeStreamSchema;
import io.debezium.connector.spanner.db.model.schema.Column;
import io.debezium.connector.spanner.db.model.schema.SpannerSchema;
import io.debezium.connector.spanner.db.model.schema.TableSchema;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/spanner/db/metadata/SchemaRegistry.class */
public class SchemaRegistry {
    private static final String DATABASE_SCHEMA_NOT_CACHED = "Database schema is not cached";
    private static final Logger LOGGER = LoggerFactory.getLogger(SchemaRegistry.class);
    private final SchemaDao schemaDao;
    private volatile SpannerSchema spannerSchema;
    private volatile ChangeStreamSchema changeStream;
    private final String streamName;
    private volatile Timestamp timestamp;
    private final Runnable schemaResetTrigger;

    public SchemaRegistry(String str, SchemaDao schemaDao, Runnable runnable) {
        this.schemaDao = schemaDao;
        this.streamName = str;
        this.schemaResetTrigger = runnable;
    }

    public void init(String str) {
        LOGGER.info("Task Uid, initializing schema registry", str);
        forceUpdateSchema(str, null, Timestamp.now(), null);
        LOGGER.info("Task Uid, done initializing schema registry", str);
    }

    public synchronized TableSchema getWatchedTable(TableId tableId) {
        if (this.timestamp == null) {
            throw new IllegalStateException(DATABASE_SCHEMA_NOT_CACHED);
        }
        TableSchema table = this.spannerSchema.getTable(tableId);
        ChangeStreamSchema.Table table2 = this.changeStream.getTable(table.getName());
        return new TableSchema(table.getName(), (List) table.columns().stream().filter(column -> {
            return column.isPrimaryKey() || table2.hasColumn(column.getName());
        }).collect(Collectors.toList()));
    }

    public synchronized void checkSchema(TableId tableId, Timestamp timestamp, List<Column> list) {
        if (validate(tableId, list)) {
            return;
        }
        if (updateSchema(tableId, timestamp, list)) {
            this.schemaResetTrigger.run();
        } else {
            LOGGER.warn("Schema has not been updated");
        }
    }

    private boolean validate(TableId tableId, List<Column> list) {
        if (this.timestamp == null) {
            throw new IllegalStateException(DATABASE_SCHEMA_NOT_CACHED);
        }
        TableSchema table = this.spannerSchema.getTable(tableId);
        TableSchema watchedTable = getWatchedTable(tableId);
        if (table != null) {
            return SchemaValidator.validate(table, watchedTable, list);
        }
        LOGGER.warn("Table not found in registry : {}", tableId.getTableName());
        return false;
    }

    public synchronized boolean updateSchema(TableId tableId, Timestamp timestamp, List<Column> list) {
        if (timestamp.equals(this.timestamp)) {
            return false;
        }
        LOGGER.info("Schema is outdated. Try to update schema registry...");
        forceUpdateSchema("", tableId, timestamp, list);
        LOGGER.info("Schema registry has been updated to date {}", timestamp);
        return true;
    }

    public void updateSchemaFromStaleTimestamp(TableId tableId, Timestamp timestamp, List<Column> list) {
        LOGGER.info("Schema is outdated. Try to update schema registry outside of retention period...");
        SpannerSchema.SpannerSchemaBuilder builder = SpannerSchema.builder();
        Dialect dialect = this.schemaDao.isPostgres() ? Dialect.POSTGRESQL : Dialect.GOOGLE_STANDARD_SQL;
        for (Column column : list) {
            if (column.isPrimaryKey()) {
                builder.addPrimaryColumn(tableId.getTableName(), column.getName());
            }
            builder.addColumn(tableId.getTableName(), column.getName(), column.getType().getType().toString(), column.getOrdinalPosition(), column.isNullable(), dialect);
        }
        SpannerSchema build = builder.build();
        if (this.spannerSchema == null) {
            this.spannerSchema = build;
        } else {
            this.spannerSchema = SchemaMerger.merge(this.spannerSchema, build);
            LOGGER.info("Schema registry has been updated to stale timestamp {}", timestamp);
        }
    }

    void forceUpdateSchema(String str, @Nullable TableId tableId, Timestamp timestamp, @Nullable List<Column> list) {
        try {
            LOGGER.info("Task {}, started updating schema registry", str);
            this.timestamp = timestamp;
            this.changeStream = this.schemaDao.getStream(this.timestamp, this.streamName);
            if (this.changeStream == null) {
                throw new IllegalStateException("Change stream doesn't exist at timestamp: " + this.streamName + ", " + String.valueOf(this.timestamp));
            }
            SpannerSchema schema = this.changeStream.isWatchedAllTables() ? this.schemaDao.getSchema(this.timestamp) : this.schemaDao.getSchema(this.timestamp, this.changeStream.getTables());
            if (this.spannerSchema == null) {
                this.spannerSchema = schema;
            } else {
                this.spannerSchema = SchemaMerger.merge(this.spannerSchema, schema);
                LOGGER.info("Task {} merged schema ", str);
            }
        } catch (Exception e) {
            LOGGER.error("Task {} received exception {} when initializing schema registry", str, e);
            throw e;
        } catch (SpannerException e2) {
            LOGGER.error("Task {} received exception {} when initializing schema registry", str, e2);
            if (!e2.getMessage().contains("has exceeded the maximum timestamp staleness") || !e2.getErrorCode().equals(ErrorCode.FAILED_PRECONDITION)) {
                throw e2;
            }
            updateSchemaFromStaleTimestamp(tableId, this.timestamp, list);
        }
    }

    public Set<TableId> getAllTables() {
        if (this.spannerSchema == null) {
            throw new IllegalStateException("database schema is not cached yet");
        }
        return this.spannerSchema.getAllTables();
    }
}
