package io.debezium.connector.oracle.logminer.unbuffered;

import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.Scn;
import io.debezium.connector.oracle.logminer.LogFile;
import io.debezium.connector.oracle.logminer.LogMinerSessionContext;
import io.debezium.connector.oracle.logminer.events.EventType;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.util.Clock;
import io.debezium.util.HexConverter;
import io.debezium.util.Strings;
import io.debezium.util.Threads;
import java.sql.SQLException;
import java.time.Duration;
import java.util.LinkedHashMap;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/oracle/logminer/unbuffered/ResumePositionProvider.class */
public class ResumePositionProvider implements AutoCloseable {
    private final OracleConnectorConfig connectorConfig;
    private final JdbcConfiguration jdbcConfig;
    private final Duration updateInterval;
    private OracleConnection connection;
    private LogMinerSessionContext sessionContext;
    private final Logger LOGGER = LoggerFactory.getLogger(ResumePositionProvider.class);
    private volatile Threads.Timer queryTimer = resetTimer();

    /* loaded from: input_file:io/debezium/connector/oracle/logminer/unbuffered/ResumePositionProvider$Transaction.class */
    private static class Transaction {
        private final Scn startScn;
        private Scn endScn;

        Transaction(Scn scn) {
            this.startScn = scn;
        }

        public void markEnded(Scn scn) {
            this.endScn = scn;
        }

        public boolean isInProgress() {
            return this.endScn == null;
        }

        public Scn getStartScn() {
            return this.startScn;
        }
    }

    public ResumePositionProvider(OracleConnectorConfig oracleConnectorConfig, JdbcConfiguration jdbcConfiguration) {
        this.connectorConfig = oracleConnectorConfig;
        this.jdbcConfig = jdbcConfiguration;
        this.updateInterval = oracleConnectorConfig.getResumePositionUpdateInterval();
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        if (this.connection != null) {
            this.LOGGER.info("Stopping the unbuffered resume position provider");
            this.sessionContext.close();
            this.connection.close();
        }
    }

    public Scn computeResumePositionFromLogs(Scn scn, Scn scn2, List<LogFile> list) throws SQLException {
        if (!this.queryTimer.expired()) {
            return scn;
        }
        try {
            if (this.connection == null) {
                this.LOGGER.info("Starting the unbuffered resume position provider");
                this.connection = new OracleConnection(this.jdbcConfig, false);
                this.connection.setAutoCommit(false);
                if (!Strings.isNullOrEmpty(this.connectorConfig.getPdbName())) {
                    this.connection.resetSessionToCdb();
                }
                this.sessionContext = new LogMinerSessionContext(this.connection, false, OracleConnectorConfig.LogMiningStrategy.ONLINE_CATALOG);
            }
            this.sessionContext.removeAllLogFilesFromSession();
            this.sessionContext.addLogFiles(list);
            this.sessionContext.startSession(scn, Scn.NULL, false);
            Scn scn3 = (Scn) this.connection.prepareQueryAndMap("SELECT * FROM V$LOGMNR_CONTENTS WHERE OPERATION_CODE IN (6,7,36) AND SCN <= ?", preparedStatement -> {
                preparedStatement.setString(1, scn2.toString());
            }, resultSet -> {
                LinkedHashMap linkedHashMap = new LinkedHashMap();
                while (resultSet.next()) {
                    byte[] bytes = resultSet.getBytes("XID");
                    String convertToHexString = bytes == null ? null : HexConverter.convertToHexString(bytes);
                    if (convertToHexString != null) {
                        EventType from = EventType.from(resultSet.getInt("OPERATION_CODE"));
                        Scn valueOf = Scn.valueOf(resultSet.getString("SCN"));
                        if (EventType.START == from) {
                            linkedHashMap.put(convertToHexString, new Transaction(valueOf));
                        } else {
                            Transaction transaction = (Transaction) linkedHashMap.get(convertToHexString);
                            if (transaction == null) {
                                this.LOGGER.trace("Ignoring transaction {} event {} at SCN {} as it must have started before current resume SCN {}.", new Object[]{convertToHexString, from, valueOf, scn});
                            } else {
                                transaction.markEnded(valueOf);
                            }
                        }
                    }
                }
                return (Scn) linkedHashMap.values().stream().filter((v0) -> {
                    return v0.isInProgress();
                }).findFirst().map((v0) -> {
                    return v0.getStartScn();
                }).orElse(scn2);
            });
            this.LOGGER.debug("Resume/Commit SCN {}/{} - new resume SCN is {}", new Object[]{scn, scn2, scn3});
            this.sessionContext.endMiningSession();
            this.queryTimer = resetTimer();
            return scn3;
        } catch (Throwable th) {
            this.sessionContext.endMiningSession();
            this.queryTimer = resetTimer();
            throw th;
        }
    }

    private Threads.Timer resetTimer() {
        return Threads.timer(Clock.SYSTEM, this.updateInterval);
    }
}
