package io.debezium.connector.jdbc;

import io.debezium.connector.jdbc.JdbcSinkConnectorConfig;
import io.debezium.connector.jdbc.dialect.DatabaseDialect;
import io.debezium.connector.jdbc.relational.TableDescriptor;
import io.debezium.metadata.CollectionId;
import io.debezium.sink.DebeziumSinkRecord;
import io.debezium.sink.field.FieldDescriptor;
import io.debezium.sink.spi.ChangeEventSink;
import io.debezium.util.Clock;
import io.debezium.util.Metronome;
import io.debezium.util.Stopwatch;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.ObjectMethods;
import java.sql.SQLException;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.hibernate.StatelessSession;
import org.hibernate.Transaction;
import org.hibernate.dialect.DatabaseVersion;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/jdbc/JdbcChangeEventSink.class */
public class JdbcChangeEventSink implements ChangeEventSink {
    private static final Logger LOGGER = LoggerFactory.getLogger(JdbcChangeEventSink.class);
    public static final String DETECT_SCHEMA_CHANGE_RECORD_MSG = "Schema change records are not supported by JDBC connector. Adjust `topics` or `topics.regex` to exclude schema change topic.";
    private final JdbcSinkConnectorConfig config;
    private final DatabaseDialect dialect;
    private final StatelessSession session;
    private final RecordWriter recordWriter;
    private final int flushMaxRetries;
    private final Duration flushRetryDelay;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/debezium/connector/jdbc/JdbcChangeEventSink$BufferFlushRecords.class */
    public static final class BufferFlushRecords extends Record {
        private final Buffer buffer;
        private final List<JdbcSinkRecord> records;

        private BufferFlushRecords(Buffer buffer, List<JdbcSinkRecord> list) {
            this.buffer = buffer;
            this.records = list;
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, BufferFlushRecords.class), BufferFlushRecords.class, "buffer;records", "FIELD:Lio/debezium/connector/jdbc/JdbcChangeEventSink$BufferFlushRecords;->buffer:Lio/debezium/connector/jdbc/Buffer;", "FIELD:Lio/debezium/connector/jdbc/JdbcChangeEventSink$BufferFlushRecords;->records:Ljava/util/List;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, BufferFlushRecords.class), BufferFlushRecords.class, "buffer;records", "FIELD:Lio/debezium/connector/jdbc/JdbcChangeEventSink$BufferFlushRecords;->buffer:Lio/debezium/connector/jdbc/Buffer;", "FIELD:Lio/debezium/connector/jdbc/JdbcChangeEventSink$BufferFlushRecords;->records:Ljava/util/List;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, BufferFlushRecords.class, Object.class), BufferFlushRecords.class, "buffer;records", "FIELD:Lio/debezium/connector/jdbc/JdbcChangeEventSink$BufferFlushRecords;->buffer:Lio/debezium/connector/jdbc/Buffer;", "FIELD:Lio/debezium/connector/jdbc/JdbcChangeEventSink$BufferFlushRecords;->records:Ljava/util/List;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        public Buffer buffer() {
            return this.buffer;
        }

        public List<JdbcSinkRecord> records() {
            return this.records;
        }
    }

    public JdbcChangeEventSink(JdbcSinkConnectorConfig jdbcSinkConnectorConfig, StatelessSession statelessSession, DatabaseDialect databaseDialect, RecordWriter recordWriter) {
        this.config = jdbcSinkConnectorConfig;
        this.dialect = databaseDialect;
        this.session = statelessSession;
        this.recordWriter = recordWriter;
        this.flushMaxRetries = jdbcSinkConnectorConfig.getFlushMaxRetries();
        this.flushRetryDelay = Duration.of(jdbcSinkConnectorConfig.getFlushRetryDelayMs(), ChronoUnit.MILLIS);
        DatabaseVersion version = this.dialect.getVersion();
        LOGGER.info("Database version {}.{}.{}", new Object[]{Integer.valueOf(version.getMajor()), Integer.valueOf(version.getMinor()), Integer.valueOf(version.getMicro())});
    }

    public void execute(Collection<SinkRecord> collection) {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        LinkedHashMap linkedHashMap2 = new LinkedHashMap();
        Iterator<SinkRecord> it = collection.iterator();
        while (it.hasNext()) {
            JdbcKafkaSinkRecord jdbcKafkaSinkRecord = new JdbcKafkaSinkRecord(it.next(), this.config.getPrimaryKeyMode(), this.config.getPrimaryKeyFields(), this.config.getFieldFilter(), this.config.cloudEventsSchemaNamePattern(), this.dialect);
            LOGGER.trace("Processing {}", jdbcKafkaSinkRecord);
            validate(jdbcKafkaSinkRecord);
            Optional<CollectionId> collectionIdFromRecord = getCollectionIdFromRecord(jdbcKafkaSinkRecord);
            if (collectionIdFromRecord.isEmpty()) {
                LOGGER.warn("Ignored to write record from topic '{}' partition '{}' offset '{}'. No resolvable table name", new Object[]{jdbcKafkaSinkRecord.topicName(), jdbcKafkaSinkRecord.partition(), Long.valueOf(jdbcKafkaSinkRecord.offset())});
            } else {
                CollectionId collectionId = collectionIdFromRecord.get();
                if (jdbcKafkaSinkRecord.isTruncate()) {
                    if (this.config.isTruncateEnabled()) {
                        flushBuffers(linkedHashMap);
                        flushBuffers(linkedHashMap2);
                        try {
                            writeTruncate(this.dialect.getTruncateStatement(checkAndApplyTableChangesIfNeeded(collectionId, jdbcKafkaSinkRecord)));
                        } catch (SQLException e) {
                            throw new ConnectException("Failed to process a sink record", e);
                        }
                    } else {
                        LOGGER.debug("Truncates are not enabled, skipping truncate for topic '{}'", jdbcKafkaSinkRecord.topicName());
                    }
                } else if (!jdbcKafkaSinkRecord.isDelete() && !jdbcKafkaSinkRecord.isTombstone()) {
                    Buffer buffer = linkedHashMap2.get(collectionId);
                    if (buffer != null && !buffer.isEmpty()) {
                        if (this.config.isUseReductionBuffer()) {
                            buffer.remove(jdbcKafkaSinkRecord);
                        } else {
                            flushBufferWithRetries(collectionId, buffer);
                        }
                    }
                    flushBufferRecordsWithRetries(collectionId, getRecordsToFlush(linkedHashMap, collectionId, jdbcKafkaSinkRecord));
                } else if (this.config.isDeleteEnabled()) {
                    Buffer buffer2 = linkedHashMap.get(collectionId);
                    if (buffer2 != null && !buffer2.isEmpty()) {
                        if (this.config.isUseReductionBuffer()) {
                            buffer2.remove(jdbcKafkaSinkRecord);
                        } else {
                            flushBufferWithRetries(collectionId, buffer2);
                        }
                    }
                    flushBufferRecordsWithRetries(collectionId, getRecordsToFlush(linkedHashMap2, collectionId, jdbcKafkaSinkRecord));
                } else {
                    LOGGER.debug("Deletes are not enabled, skipping delete for topic '{}'", jdbcKafkaSinkRecord.topicName());
                }
            }
        }
        flushBuffers(linkedHashMap);
        flushBuffers(linkedHashMap2);
    }

    private void validate(JdbcSinkRecord jdbcSinkRecord) {
        if (jdbcSinkRecord.isSchemaChange()) {
            LOGGER.error(DETECT_SCHEMA_CHANGE_RECORD_MSG);
            throw new DataException(DETECT_SCHEMA_CHANGE_RECORD_MSG);
        }
    }

    private BufferFlushRecords getRecordsToFlush(Map<CollectionId, Buffer> map, CollectionId collectionId, JdbcSinkRecord jdbcSinkRecord) {
        Stopwatch reusable = Stopwatch.reusable();
        reusable.start();
        Buffer orCreateBuffer = getOrCreateBuffer(map, collectionId, jdbcSinkRecord);
        if (isSchemaChanged(jdbcSinkRecord, orCreateBuffer.getTableDescriptor())) {
            flushBufferWithRetries(collectionId, orCreateBuffer);
            map.remove(collectionId);
            orCreateBuffer = getOrCreateBuffer(map, collectionId, jdbcSinkRecord);
        }
        List<JdbcSinkRecord> add = orCreateBuffer.add(jdbcSinkRecord);
        reusable.stop();
        LOGGER.trace("[PERF] Resolve and add record execution time for collection '{}': {}", collectionId.name(), reusable.durations());
        return new BufferFlushRecords(orCreateBuffer, add);
    }

    private Buffer getOrCreateBuffer(Map<CollectionId, Buffer> map, CollectionId collectionId, JdbcSinkRecord jdbcSinkRecord) {
        return map.computeIfAbsent(collectionId, collectionId2 -> {
            try {
                return createBuffer(this.config, checkAndApplyTableChangesIfNeeded(collectionId, jdbcSinkRecord), jdbcSinkRecord);
            } catch (SQLException e) {
                throw new ConnectException("Error while checking and applying table changes for collection '" + String.valueOf(collectionId) + "'", e);
            }
        });
    }

    private Buffer createBuffer(JdbcSinkConnectorConfig jdbcSinkConnectorConfig, TableDescriptor tableDescriptor, JdbcSinkRecord jdbcSinkRecord) {
        return (!jdbcSinkConnectorConfig.isUseReductionBuffer() || jdbcSinkRecord.keyFieldNames().isEmpty()) ? new RecordBuffer(jdbcSinkConnectorConfig, tableDescriptor) : new ReducedRecordBuffer(jdbcSinkConnectorConfig, tableDescriptor);
    }

    private boolean isSchemaChanged(JdbcSinkRecord jdbcSinkRecord, TableDescriptor tableDescriptor) {
        Set<String> resolveMissingFields = this.dialect.resolveMissingFields(jdbcSinkRecord, tableDescriptor);
        LOGGER.debug("Schema change detected for '{}', missing fields: {}", tableDescriptor.getId().toFullIdentiferString(), resolveMissingFields);
        return !resolveMissingFields.isEmpty();
    }

    private void flushBuffers(Map<CollectionId, Buffer> map) {
        map.forEach(this::flushBufferWithRetries);
    }

    private void flushBufferRecordsWithRetries(CollectionId collectionId, BufferFlushRecords bufferFlushRecords) {
        flushBufferWithRetries(collectionId, bufferFlushRecords.records(), bufferFlushRecords.buffer.getTableDescriptor());
    }

    private void flushBufferWithRetries(CollectionId collectionId, Buffer buffer) {
        flushBufferWithRetries(collectionId, buffer.flush(), buffer.getTableDescriptor());
    }

    private void flushBufferWithRetries(CollectionId collectionId, List<JdbcSinkRecord> list, TableDescriptor tableDescriptor) {
        Exception exc = null;
        LOGGER.debug("Flushing records in JDBC Writer for table: {}", collectionId.name());
        for (int i = 0; i <= this.flushMaxRetries; i++) {
            if (i > 0) {
                try {
                    LOGGER.warn("Retry to flush records for table '{}'. Retry {}/{} with delay {} ms", new Object[]{collectionId.name(), Integer.valueOf(i), Integer.valueOf(this.flushMaxRetries), Long.valueOf(this.flushRetryDelay.toMillis())});
                    try {
                        Metronome.parker(this.flushRetryDelay, Clock.SYSTEM).pause();
                    } catch (InterruptedException e) {
                        throw new ConnectException("Interrupted while waiting to retry flush records", e);
                    }
                } catch (Exception e2) {
                    exc = e2;
                    if (!isRetriable(e2)) {
                        throw new ConnectException("Failed to process a sink record", e2);
                    }
                }
            }
            flushBuffer(collectionId, list, tableDescriptor);
            return;
        }
        throw new ConnectException("Exceeded max retries " + this.flushMaxRetries + " times, failed to process sink records", exc);
    }

    private void flushBuffer(CollectionId collectionId, List<JdbcSinkRecord> list, TableDescriptor tableDescriptor) throws SQLException {
        Stopwatch reusable = Stopwatch.reusable();
        Stopwatch reusable2 = Stopwatch.reusable();
        if (list.isEmpty()) {
            return;
        }
        LOGGER.debug("Flushing records in JDBC Writer for table: {}", collectionId.name());
        reusable2.start();
        reusable2.stop();
        String sqlStatement = getSqlStatement(tableDescriptor, list.get(0));
        reusable.start();
        this.recordWriter.write(list, sqlStatement);
        reusable.stop();
        LOGGER.trace("[PERF] Flush buffer execution time {}", reusable.durations());
        LOGGER.trace("[PERF] Table changes execution time {}", reusable2.durations());
    }

    public void close() {
        if (this.session == null || !this.session.isOpen()) {
            LOGGER.info("Session already closed.");
        } else {
            LOGGER.info("Closing session.");
            this.session.close();
        }
    }

    private TableDescriptor checkAndApplyTableChangesIfNeeded(CollectionId collectionId, JdbcSinkRecord jdbcSinkRecord) throws SQLException {
        if (hasTable(collectionId)) {
            try {
                return alterTableIfNeeded(collectionId, jdbcSinkRecord);
            } catch (SQLException e) {
                LOGGER.error("Failed to alter the table '{}'.", collectionId.toFullIdentiferString(), e);
                throw e;
            }
        }
        try {
            return createTable(collectionId, jdbcSinkRecord);
        } catch (SQLException e2) {
            LOGGER.warn("Table creation failed for '{}', attempting to alter the table", collectionId.toFullIdentiferString(), e2);
            try {
                return alterTableIfNeeded(collectionId, jdbcSinkRecord);
            } catch (SQLException e3) {
                LOGGER.error("Failed to alter the table '{}'.", collectionId.toFullIdentiferString(), e3);
                throw e3;
            }
        }
    }

    private boolean hasTable(CollectionId collectionId) {
        return ((Boolean) this.session.doReturningWork(connection -> {
            return Boolean.valueOf(this.dialect.tableExists(connection, collectionId));
        })).booleanValue();
    }

    private TableDescriptor readTable(CollectionId collectionId) {
        return (TableDescriptor) this.session.doReturningWork(connection -> {
            return this.dialect.readTable(connection, collectionId);
        });
    }

    private TableDescriptor createTable(CollectionId collectionId, JdbcSinkRecord jdbcSinkRecord) throws SQLException {
        LOGGER.debug("Attempting to create table '{}'.", collectionId.toFullIdentiferString());
        if (JdbcSinkConnectorConfig.SchemaEvolutionMode.NONE.equals(this.config.getSchemaEvolutionMode())) {
            LOGGER.warn("Table '{}' cannot be created because schema evolution is disabled.", collectionId.toFullIdentiferString());
            throw new SQLException("Cannot create table " + collectionId.toFullIdentiferString() + " because schema evolution is disabled");
        }
        Transaction beginTransaction = this.session.beginTransaction();
        try {
            String createTableStatement = this.dialect.getCreateTableStatement(jdbcSinkRecord, collectionId);
            LOGGER.trace("SQL: {}", createTableStatement);
            this.session.createNativeQuery(createTableStatement, Object.class).executeUpdate();
            beginTransaction.commit();
            return readTable(collectionId);
        } catch (Exception e) {
            beginTransaction.rollback();
            throw e;
        }
    }

    private TableDescriptor alterTableIfNeeded(CollectionId collectionId, JdbcSinkRecord jdbcSinkRecord) throws SQLException {
        LOGGER.debug("Attempting to alter table '{}'.", collectionId.toFullIdentiferString());
        if (!hasTable(collectionId)) {
            LOGGER.error("Table '{}' does not exist and cannot be altered.", collectionId.toFullIdentiferString());
            throw new SQLException("Could not find table: " + collectionId.toFullIdentiferString());
        }
        TableDescriptor readTable = readTable(collectionId);
        Set<String> resolveMissingFields = this.dialect.resolveMissingFields(jdbcSinkRecord, readTable);
        if (resolveMissingFields.isEmpty()) {
            return readTable;
        }
        LOGGER.debug("The follow fields are missing in the table: {}", resolveMissingFields);
        Iterator<String> it = resolveMissingFields.iterator();
        while (it.hasNext()) {
            FieldDescriptor fieldDescriptor = (FieldDescriptor) jdbcSinkRecord.allFields().get(it.next());
            if (!fieldDescriptor.getSchema().isOptional() && fieldDescriptor.getSchema().defaultValue() == null) {
                throw new SQLException(String.format("Cannot ALTER table '%s' because field '%s' is not optional but has no default value", collectionId.toFullIdentiferString(), fieldDescriptor.getName()));
            }
        }
        if (JdbcSinkConnectorConfig.SchemaEvolutionMode.NONE.equals(this.config.getSchemaEvolutionMode())) {
            LOGGER.warn("Table '{}' cannot be altered because schema evolution is disabled.", collectionId.toFullIdentiferString());
            throw new SQLException("Cannot alter table " + collectionId.toFullIdentiferString() + " because schema evolution is disabled");
        }
        Transaction beginTransaction = this.session.beginTransaction();
        try {
            String alterTableStatement = this.dialect.getAlterTableStatement(readTable, jdbcSinkRecord, resolveMissingFields);
            LOGGER.trace("SQL: {}", alterTableStatement);
            this.session.createNativeQuery(alterTableStatement, Object.class).executeUpdate();
            beginTransaction.commit();
            return readTable(collectionId);
        } catch (Exception e) {
            beginTransaction.rollback();
            throw e;
        }
    }

    private String getSqlStatement(TableDescriptor tableDescriptor, JdbcSinkRecord jdbcSinkRecord) {
        if (jdbcSinkRecord.isDelete()) {
            return this.dialect.getDeleteStatement(tableDescriptor, jdbcSinkRecord);
        }
        switch (this.config.getInsertMode()) {
            case INSERT:
                return this.dialect.getInsertStatement(tableDescriptor, jdbcSinkRecord);
            case UPSERT:
                if (jdbcSinkRecord.keyFieldNames().isEmpty()) {
                    throw new ConnectException("Cannot write to table " + tableDescriptor.getId().name() + " with no key fields defined.");
                }
                return this.dialect.getUpsertStatement(tableDescriptor, jdbcSinkRecord);
            case UPDATE:
                return this.dialect.getUpdateStatement(tableDescriptor, jdbcSinkRecord);
            default:
                throw new DataException(String.format("Unable to get SQL statement for %s", jdbcSinkRecord));
        }
    }

    private void writeTruncate(String str) throws SQLException {
        Transaction beginTransaction = this.session.beginTransaction();
        try {
            LOGGER.trace("SQL: {}", str);
            this.session.createNativeQuery(str, Object.class).executeUpdate();
            beginTransaction.commit();
        } catch (Exception e) {
            beginTransaction.rollback();
            throw e;
        }
    }

    public Optional<CollectionId> getCollectionId(String str) {
        return Optional.of(this.dialect.getCollectionId(str));
    }

    private boolean isRetriable(Throwable th) {
        if (th == null) {
            return false;
        }
        Iterator<Class<? extends Exception>> it = this.dialect.getCommunicationExceptions().iterator();
        while (it.hasNext()) {
            if (it.next().isAssignableFrom(th.getClass())) {
                return true;
            }
        }
        return isRetriable(th.getCause());
    }

    public Optional<CollectionId> getCollectionIdFromRecord(DebeziumSinkRecord debeziumSinkRecord) {
        String resolveCollectionName = this.config.getCollectionNamingStrategy().resolveCollectionName(debeziumSinkRecord, this.config.getCollectionNameFormat());
        return resolveCollectionName == null ? Optional.empty() : getCollectionId(resolveCollectionName);
    }
}
