package io.debezium.connector.oracle.logminer.unbuffered;

import io.debezium.DebeziumException;
import io.debezium.common.annotation.Incubating;
import io.debezium.config.Configuration;
import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleDatabaseSchema;
import io.debezium.connector.oracle.OraclePartition;
import io.debezium.connector.oracle.Scn;
import io.debezium.connector.oracle.logminer.AbstractLogMinerStreamingChangeEventSource;
import io.debezium.connector.oracle.logminer.LogMinerChangeRecordEmitter;
import io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSourceMetrics;
import io.debezium.connector.oracle.logminer.TransactionCommitConsumer;
import io.debezium.connector.oracle.logminer.events.DmlEvent;
import io.debezium.connector.oracle.logminer.events.EventType;
import io.debezium.connector.oracle.logminer.events.LogMinerEvent;
import io.debezium.connector.oracle.logminer.events.LogMinerEventRow;
import io.debezium.connector.oracle.logminer.events.RedoSqlDmlEvent;
import io.debezium.connector.oracle.logminer.events.TruncateEvent;
import io.debezium.connector.oracle.logminer.parser.LogMinerDmlEntry;
import io.debezium.data.Envelope;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Partition;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.util.Clock;
import io.debezium.util.Loggings;
import io.debezium.util.Stopwatch;
import io.debezium.util.Strings;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Incubating
/* loaded from: input_file:io/debezium/connector/oracle/logminer/unbuffered/UnbufferedLogMinerStreamingChangeEventSource.class */
public class UnbufferedLogMinerStreamingChangeEventSource extends AbstractLogMinerStreamingChangeEventSource {
    private static final Logger LOGGER = LoggerFactory.getLogger(UnbufferedLogMinerStreamingChangeEventSource.class);
    private final String miningQuery;
    private final boolean includeSql;
    private final TransactionCommitConsumer accumulator;
    private final List<LogMinerEventRow> ddlQueue;
    private final ResumePositionProvider resumePositionProvider;
    private boolean skipCurrentTransaction;
    private ZoneOffset databaseOffset;
    private Scn lastCommitScn;

    public UnbufferedLogMinerStreamingChangeEventSource(OracleConnectorConfig oracleConnectorConfig, OracleConnection oracleConnection, EventDispatcher<OraclePartition, TableId> eventDispatcher, ErrorHandler errorHandler, Clock clock, OracleDatabaseSchema oracleDatabaseSchema, Configuration configuration, LogMinerStreamingChangeEventSourceMetrics logMinerStreamingChangeEventSourceMetrics) {
        super(oracleConnectorConfig, oracleConnection, eventDispatcher, errorHandler, clock, oracleDatabaseSchema, configuration, logMinerStreamingChangeEventSourceMetrics);
        this.ddlQueue = new ArrayList();
        this.skipCurrentTransaction = false;
        this.lastCommitScn = Scn.NULL;
        this.miningQuery = new UnbufferedLogMinerQueryBuilder(oracleConnectorConfig).getQuery();
        this.includeSql = oracleConnectorConfig.isLogMiningIncludeRedoSql();
        this.accumulator = new TransactionCommitConsumer(this::dispatchEvent, oracleConnectorConfig, oracleDatabaseSchema);
        this.resumePositionProvider = new ResumePositionProvider(oracleConnectorConfig, getJdbcConfiguration());
    }

    @Override // io.debezium.connector.oracle.logminer.AbstractLogMinerStreamingChangeEventSource
    protected void executeLogMiningStreaming() throws Exception {
        Scn scn = Scn.NULL;
        Scn scn2 = m41getOffsetContext().getScn();
        Scn minCommittedScn = m41getOffsetContext().getCommitScn().getMinCommittedScn();
        if (minCommittedScn.isNull()) {
            minCommittedScn = scn2;
        }
        Stopwatch start = Stopwatch.accumulating().start();
        int i = 1;
        prepareLogsForMining(false, scn2);
        while (getContext().isRunning() && !isArchiveLogOnlyModeAndScnIsNotAvailable(scn2)) {
            Instant now = Instant.now();
            updateDatabaseTimeDifference();
            this.databaseOffset = getMetrics().getDatabaseOffset();
            scn2 = computeResumeScnAndUpdateOffsets(scn2, minCommittedScn);
            getMetrics().setOffsetScn(scn2);
            Scn currentScn = getCurrentScn();
            getMetrics().setCurrentScn(currentScn);
            scn = calculateUpperBounds(scn2, scn, currentScn);
            if (scn.isNull()) {
                LOGGER.debug("Delaying mining transaction logs by one iteration");
                pauseBetweenMiningSessions();
            } else if (getConfig().isArchiveLogOnlyMode() && scn2.equals(scn)) {
                pauseBetweenMiningSessions();
            } else {
                if (isMiningSessionRestartRequired(start) || checkLogSwitchOccurredAndUpdate()) {
                    endMiningSession();
                    if (getConfig().isLogMiningRestartConnection()) {
                        prepareJdbcConnection(true);
                    }
                    prepareLogsForMining(true, scn2);
                    start = Stopwatch.accumulating().start();
                }
                if (startMiningSession(scn2, Scn.NULL, i)) {
                    i = 1;
                    minCommittedScn = process(minCommittedScn);
                    getMetrics().setLastBatchProcessingDuration(Duration.between(now, Instant.now()));
                } else {
                    i++;
                }
                captureJdbcSessionMemoryStatistics();
                pauseBetweenMiningSessions();
                if (getContext().isPaused()) {
                    Scn scn3 = m41getOffsetContext().getScn();
                    m41getOffsetContext().setScn(minCommittedScn);
                    executeBlockingSnapshot();
                    m41getOffsetContext().setScn(scn3);
                }
            }
        }
    }

    @Override // io.debezium.connector.oracle.logminer.AbstractLogMinerStreamingChangeEventSource
    protected boolean isUsingCommittedDataOnly() {
        return true;
    }

    public void close() {
        try {
            this.resumePositionProvider.close();
        } catch (Exception e) {
            LOGGER.warn("Failed to gracefully shutdown the resume position provider", e);
        }
    }

    @Override // io.debezium.connector.oracle.logminer.AbstractLogMinerStreamingChangeEventSource
    protected void enqueueEvent(LogMinerEventRow logMinerEventRow, LogMinerEvent logMinerEvent) throws InterruptedException {
        getMetrics().calculateLagFromSource(logMinerEventRow.getChangeTime());
        this.accumulator.accept(logMinerEvent);
    }

    @Override // io.debezium.connector.oracle.logminer.AbstractLogMinerStreamingChangeEventSource
    protected void executeDataChangeEventPreDispatchSteps(LogMinerEventRow logMinerEventRow) throws InterruptedException {
        if (this.ddlQueue.isEmpty()) {
            return;
        }
        if (!logMinerEventRow.getCommitScn().equals(m41getOffsetContext().getEventCommitScn())) {
            m41getOffsetContext().setEventCommitScn(logMinerEventRow.getCommitScn());
        }
        dispatchSchemaChanges();
    }

    private Scn computeResumeScnAndUpdateOffsets(Scn scn, Scn scn2) throws SQLException {
        Scn computeResumePositionFromLogs = this.resumePositionProvider.computeResumePositionFromLogs(scn, scn2, getCurrentLogFiles());
        if (!computeResumePositionFromLogs.equals(scn)) {
            LOGGER.debug("Advancing offset low-watermark scn to {}", computeResumePositionFromLogs);
            m41getOffsetContext().setScn(computeResumePositionFromLogs);
        }
        return computeResumePositionFromLogs;
    }

    private PreparedStatement createQueryStatement() throws SQLException {
        PreparedStatement prepareStatement = getConnection().connection().prepareStatement(this.miningQuery, 1003, 1007, 1);
        prepareStatement.setQueryTimeout((int) getConnection().config().getQueryTimeout().toSeconds());
        return prepareStatement;
    }

    private Scn process(Scn scn) throws SQLException, InterruptedException {
        getBatchMetrics().reset();
        PreparedStatement createQueryStatement = createQueryStatement();
        try {
            LOGGER.debug("Fetching results with COMMIT_SCN >= {}", scn);
            createQueryStatement.setFetchSize(getConfig().getQueryFetchSize());
            createQueryStatement.setFetchDirection(1000);
            createQueryStatement.setString(1, scn.toString());
            this.lastCommitScn = scn;
            executeAndProcessQuery(createQueryStatement);
            if (!scn.equals(this.lastCommitScn)) {
                LOGGER.debug("Adjusting Min Commit SCN from {} to {}.", scn, this.lastCommitScn);
            }
            Scn scn2 = this.lastCommitScn;
            if (createQueryStatement != null) {
                createQueryStatement.close();
            }
            return scn2;
        } catch (Throwable th) {
            if (createQueryStatement != null) {
                try {
                    createQueryStatement.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.debezium.connector.oracle.logminer.AbstractLogMinerStreamingChangeEventSource
    public void processEvent(LogMinerEventRow logMinerEventRow) throws SQLException, InterruptedException {
        super.processEvent(logMinerEventRow);
        if (EventType.COMMIT.equals(logMinerEventRow.getEventType())) {
            this.lastCommitScn = logMinerEventRow.getCommitScn();
            this.skipCurrentTransaction = false;
        }
    }

    @Override // io.debezium.connector.oracle.logminer.AbstractLogMinerStreamingChangeEventSource
    protected boolean isEventSkipped(LogMinerEventRow logMinerEventRow) {
        return this.skipCurrentTransaction || isEventIncludedInSnapshot(logMinerEventRow) || isNonSchemaChangeEventSkipped(logMinerEventRow);
    }

    @Override // io.debezium.connector.oracle.logminer.AbstractLogMinerStreamingChangeEventSource
    protected boolean hasEventBeenProcessed(LogMinerEventRow logMinerEventRow) {
        if (m41getOffsetContext().getCommitScn().hasBeenHandled(logMinerEventRow.getThread(), logMinerEventRow.getCommitScn().isNull() ? logMinerEventRow.getScn() : logMinerEventRow.getCommitScn(), logMinerEventRow.getTransactionId())) {
            return true;
        }
        return Objects.equals(m41getOffsetContext().getTransactionId(), logMinerEventRow.getTransactionId()) && m41getOffsetContext().getTransactionSequence() != null && m41getOffsetContext().getTransactionSequence().longValue() >= logMinerEventRow.getTransactionSequence().longValue();
    }

    @Override // io.debezium.connector.oracle.logminer.AbstractLogMinerStreamingChangeEventSource
    protected void handleStartEvent(LogMinerEventRow logMinerEventRow) {
        this.skipCurrentTransaction = false;
        if (!logMinerEventRow.getCommitScn().isNull() && m41getOffsetContext().getCommitScn().hasBeenHandled(logMinerEventRow.getThread(), logMinerEventRow.getCommitScn(), logMinerEventRow.getTransactionId())) {
            LOGGER.info("Skipping transaction {} with SCN {}, already committed.", logMinerEventRow.getTransactionId(), logMinerEventRow.getScn());
            this.skipCurrentTransaction = true;
            return;
        }
        if (isUserNameSkipped(logMinerEventRow.getUserName()) || isClientIdSkipped(logMinerEventRow.getClientId())) {
            this.skipCurrentTransaction = true;
            return;
        }
        Loggings.logDebugAndTraceRecord(LOGGER, logMinerEventRow, "Starting transaction {} with SCN {}", new Object[]{logMinerEventRow.getTransactionId(), logMinerEventRow.getScn()});
        Instant startTime = logMinerEventRow.getStartTime();
        if (startTime != null) {
            startTime = startTime.minusSeconds(this.databaseOffset.getTotalSeconds());
        }
        Instant commitTime = logMinerEventRow.getCommitTime();
        if (commitTime != null) {
            commitTime = commitTime.minusSeconds(this.databaseOffset.getTotalSeconds());
        }
        m41getOffsetContext().setStartScn(logMinerEventRow.getScn());
        m41getOffsetContext().setRedoThread(Integer.valueOf(logMinerEventRow.getThread()));
        m41getOffsetContext().setStartTime(startTime);
        m41getOffsetContext().setCommitTime(commitTime);
        m41getOffsetContext().setEventCommitScn(logMinerEventRow.getCommitScn());
        m41getOffsetContext().setUserName(logMinerEventRow.getUserName());
        m41getOffsetContext().setTransactionId(logMinerEventRow.getTransactionId());
        m41getOffsetContext().setTransactionSequence(null);
        getMetrics().setActiveTransactionCount(1L);
    }

    @Override // io.debezium.connector.oracle.logminer.AbstractLogMinerStreamingChangeEventSource
    protected void handleCommitEvent(LogMinerEventRow logMinerEventRow) throws InterruptedException {
        Loggings.logDebugAndTraceRecord(LOGGER, logMinerEventRow, "Commit transaction {} at SCN {}.", new Object[]{logMinerEventRow.getTransactionId(), logMinerEventRow.getScn()});
        Instant now = Instant.now();
        m41getOffsetContext().getCommitScn().recordCommit(logMinerEventRow);
        m41getOffsetContext().setEventScn(logMinerEventRow.getScn());
        m41getOffsetContext().setEventCommitScn(logMinerEventRow.getScn());
        if (!this.ddlQueue.isEmpty()) {
            dispatchSchemaChanges();
        }
        this.accumulator.close();
        getEventDispatcher().dispatchTransactionCommittedEvent(getPartition(), m41getOffsetContext(), logMinerEventRow.getChangeTime());
        getBatchMetrics().commitObserved();
        getMetrics().setActiveTransactionCount(0L);
        updateCommitMetrics(logMinerEventRow, Duration.between(now, Instant.now()));
    }

    @Override // io.debezium.connector.oracle.logminer.AbstractLogMinerStreamingChangeEventSource
    protected void handleRollbackEvent(LogMinerEventRow logMinerEventRow) {
        throw new DebeziumException("Rollback event with SCN " + String.valueOf(logMinerEventRow.getScn()) + " found, but should not be in this mode");
    }

    @Override // io.debezium.connector.oracle.logminer.AbstractLogMinerStreamingChangeEventSource
    protected void handleSchemaChangeEvent(LogMinerEventRow logMinerEventRow) {
        if (isSchemaChangeEventSkipped(logMinerEventRow) || Strings.isNullOrBlank(logMinerEventRow.getTableName())) {
            return;
        }
        Loggings.logDebugAndTraceRecord(LOGGER, logMinerEventRow, "Processing DDL event with SCN {}: {}", new Object[]{logMinerEventRow.getScn(), logMinerEventRow.getRedoSql()});
        this.ddlQueue.add(logMinerEventRow);
    }

    @Override // io.debezium.connector.oracle.logminer.AbstractLogMinerStreamingChangeEventSource
    protected void handleReplicationMarkerEvent(LogMinerEventRow logMinerEventRow) {
        LOGGER.trace("Skipped GoldenGate replication marker event: {}", Loggings.maybeRedactSensitiveData(logMinerEventRow));
    }

    @Override // io.debezium.connector.oracle.logminer.AbstractLogMinerStreamingChangeEventSource
    protected void handleTruncateEvent(LogMinerEventRow logMinerEventRow) throws InterruptedException {
        try {
            try {
                Table tableForDataEvent = getTableForDataEvent(logMinerEventRow);
                if (tableForDataEvent != null) {
                    LOGGER.debug("Dispatching TRUNCATE event for table '{}' with SCN {}", tableForDataEvent.id(), logMinerEventRow.getScn());
                    LogMinerDmlEntry parseTruncateEvent = parseTruncateEvent(logMinerEventRow);
                    m41getOffsetContext().getCommitScn().recordCommit(logMinerEventRow);
                    m41getOffsetContext().setEventScn(logMinerEventRow.getScn());
                    m41getOffsetContext().setRedoThread(Integer.valueOf(logMinerEventRow.getThread()));
                    m41getOffsetContext().setRsId(logMinerEventRow.getRsId());
                    m41getOffsetContext().setRowId("");
                    m41getOffsetContext().setTransactionId(logMinerEventRow.getTransactionId());
                    m41getOffsetContext().setTransactionSequence(logMinerEventRow.getTransactionSequence());
                    if (this.includeSql) {
                        m41getOffsetContext().setRedoSql(logMinerEventRow.getRedoSql());
                    }
                    getEventDispatcher().dispatchDataChangeEvent(getPartition(), tableForDataEvent.id(), new LogMinerChangeRecordEmitter(getConfig(), (Partition) getPartition(), (OffsetContext) m41getOffsetContext(), Envelope.Operation.TRUNCATE, parseTruncateEvent.getOldValues(), parseTruncateEvent.getNewValues(), tableForDataEvent, getSchema(), getClock()));
                }
            } catch (SQLException e) {
                LOGGER.warn("Failed to process truncate event", e);
                getMetrics().incrementWarningCount();
                if (this.includeSql) {
                    m41getOffsetContext().setRedoSql("");
                }
            }
        } finally {
            if (this.includeSql) {
                m41getOffsetContext().setRedoSql("");
            }
        }
    }

    protected void dispatchEvent(LogMinerEvent logMinerEvent, long j) throws InterruptedException {
        if (logMinerEvent instanceof TruncateEvent) {
            TruncateEvent truncateEvent = (TruncateEvent) logMinerEvent;
            int totalSeconds = this.databaseOffset.getTotalSeconds();
            getMetrics().calculateLagFromSource(truncateEvent.getChangeTime());
            m41getOffsetContext().setEventScn(truncateEvent.getScn());
            m41getOffsetContext().setTransactionId(truncateEvent.getTransactionId());
            m41getOffsetContext().setTransactionSequence(truncateEvent.getTransactionSequence());
            m41getOffsetContext().setSourceTime(truncateEvent.getChangeTime().minusSeconds(totalSeconds));
            m41getOffsetContext().setTableId(truncateEvent.getTableId());
            m41getOffsetContext().setRsId(truncateEvent.getRsId());
            m41getOffsetContext().setRowId(truncateEvent.getRowId());
            getEventDispatcher().dispatchDataChangeEvent(getPartition(), truncateEvent.getTableId(), new LogMinerChangeRecordEmitter(getConfig(), (Partition) getPartition(), (OffsetContext) m41getOffsetContext(), Envelope.Operation.TRUNCATE, truncateEvent.getDmlEntry().getOldValues(), truncateEvent.getDmlEntry().getNewValues(), getSchema().tableFor(truncateEvent.getTableId()), getSchema(), Clock.system()));
        } else if (logMinerEvent instanceof DmlEvent) {
            DmlEvent dmlEvent = (DmlEvent) logMinerEvent;
            int totalSeconds2 = this.databaseOffset.getTotalSeconds();
            getMetrics().calculateLagFromSource(dmlEvent.getChangeTime());
            m41getOffsetContext().setEventScn(dmlEvent.getScn());
            m41getOffsetContext().setTransactionId(dmlEvent.getTransactionId());
            m41getOffsetContext().setTransactionSequence(dmlEvent.getTransactionSequence());
            m41getOffsetContext().setSourceTime(dmlEvent.getChangeTime().minusSeconds(totalSeconds2));
            m41getOffsetContext().setTableId(dmlEvent.getTableId());
            m41getOffsetContext().setRsId(dmlEvent.getRsId());
            m41getOffsetContext().setRowId(dmlEvent.getRowId());
            if (logMinerEvent instanceof RedoSqlDmlEvent) {
                m41getOffsetContext().setRedoSql(((RedoSqlDmlEvent) logMinerEvent).getRedoSql());
            }
            getEventDispatcher().dispatchDataChangeEvent(getPartition(), dmlEvent.getTableId(), new LogMinerChangeRecordEmitter(getConfig(), (Partition) getPartition(), (OffsetContext) m41getOffsetContext(), dmlEvent.getDmlEntry().getEventType(), dmlEvent.getDmlEntry().getOldValues(), dmlEvent.getDmlEntry().getNewValues(), getSchema().tableFor(dmlEvent.getTableId()), getSchema(), Clock.system()));
        }
        m41getOffsetContext().setRedoSql(null);
    }

    private void dispatchSchemaChanges() throws InterruptedException {
        Iterator<LogMinerEventRow> it = this.ddlQueue.iterator();
        while (it.hasNext()) {
            dispatchSchemaChangeEventInternal(it.next());
        }
        this.ddlQueue.clear();
    }
}
