package io.debezium.connector.oracle.logminer.buffered;

import io.debezium.config.Configuration;
import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleDatabaseSchema;
import io.debezium.connector.oracle.OraclePartition;
import io.debezium.connector.oracle.Scn;
import io.debezium.connector.oracle.logminer.AbstractLogMinerStreamingChangeEventSource;
import io.debezium.connector.oracle.logminer.LogMinerChangeRecordEmitter;
import io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSourceMetrics;
import io.debezium.connector.oracle.logminer.SqlUtils;
import io.debezium.connector.oracle.logminer.TransactionCommitConsumer;
import io.debezium.connector.oracle.logminer.buffered.LogMinerTransactionCache;
import io.debezium.connector.oracle.logminer.buffered.ehcache.EhcacheCacheProvider;
import io.debezium.connector.oracle.logminer.buffered.ehcache.EhcacheTransactionFactory;
import io.debezium.connector.oracle.logminer.buffered.infinispan.EmbeddedInfinispanCacheProvider;
import io.debezium.connector.oracle.logminer.buffered.infinispan.InfinispanTransactionFactory;
import io.debezium.connector.oracle.logminer.buffered.infinispan.RemoteInfinispanCacheProvider;
import io.debezium.connector.oracle.logminer.buffered.memory.MemoryCacheProvider;
import io.debezium.connector.oracle.logminer.buffered.memory.MemoryTransactionFactory;
import io.debezium.connector.oracle.logminer.events.DmlEvent;
import io.debezium.connector.oracle.logminer.events.EventType;
import io.debezium.connector.oracle.logminer.events.LogMinerEvent;
import io.debezium.connector.oracle.logminer.events.LogMinerEventRow;
import io.debezium.connector.oracle.logminer.events.RedoSqlDmlEvent;
import io.debezium.connector.oracle.logminer.events.TruncateEvent;
import io.debezium.connector.oracle.logminer.logwriter.CommitLogWriterFlushStrategy;
import io.debezium.connector.oracle.logminer.logwriter.LogWriterFlushStrategy;
import io.debezium.connector.oracle.logminer.logwriter.RacCommitLogWriterFlushStrategy;
import io.debezium.connector.oracle.logminer.logwriter.ReadOnlyLogWriterFlushStrategy;
import io.debezium.data.Envelope;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Partition;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.util.Clock;
import io.debezium.util.Loggings;
import io.debezium.util.Stopwatch;
import io.debezium.util.Strings;
import java.math.BigInteger;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneOffset;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/oracle/logminer/buffered/BufferedLogMinerStreamingChangeEventSource.class */
public class BufferedLogMinerStreamingChangeEventSource extends AbstractLogMinerStreamingChangeEventSource {
    private static final Logger LOGGER = LoggerFactory.getLogger(BufferedLogMinerStreamingChangeEventSource.class);
    private static final Logger ABANDONED_DETAILS_LOGGER = LoggerFactory.getLogger(BufferedLogMinerStreamingChangeEventSource.class.getName() + ".AbandonedDetails");
    private static final String NO_SEQUENCE_TRX_ID_SUFFIX = "ffffffff";
    private final String queryString;
    private final CacheProvider<Transaction> cacheProvider;
    private final TransactionFactory<Transaction> transactionFactory;
    private Instant lastProcessedScnChangeTime;
    private Scn lastProcessedScn;

    public BufferedLogMinerStreamingChangeEventSource(OracleConnectorConfig oracleConnectorConfig, OracleConnection oracleConnection, EventDispatcher<OraclePartition, TableId> eventDispatcher, ErrorHandler errorHandler, Clock clock, OracleDatabaseSchema oracleDatabaseSchema, Configuration configuration, LogMinerStreamingChangeEventSourceMetrics logMinerStreamingChangeEventSourceMetrics) {
        super(oracleConnectorConfig, oracleConnection, eventDispatcher, errorHandler, clock, oracleDatabaseSchema, configuration, logMinerStreamingChangeEventSourceMetrics);
        this.lastProcessedScnChangeTime = null;
        this.lastProcessedScn = Scn.NULL;
        this.queryString = new BufferedLogMinerQueryBuilder(oracleConnectorConfig).getQuery();
        this.cacheProvider = createCacheProvider(oracleConnectorConfig);
        this.transactionFactory = createTransactionFactory(oracleConnectorConfig);
    }

    @Override // io.debezium.connector.oracle.logminer.AbstractLogMinerStreamingChangeEventSource
    protected void executeLogMiningStreaming() throws Exception {
        LogWriterFlushStrategy resolveFlushStrategy = resolveFlushStrategy();
        try {
            Scn scn = m41getOffsetContext().getScn();
            Scn scn2 = Scn.NULL;
            Stopwatch start = Stopwatch.accumulating().start();
            int i = 1;
            prepareLogsForMining(false, scn);
            while (getContext().isRunning() && !isArchiveLogOnlyModeAndScnIsNotAvailable(scn)) {
                Instant now = Instant.now();
                updateDatabaseTimeDifference();
                Scn currentScn = getCurrentScn();
                getMetrics().setCurrentScn(currentScn);
                scn2 = calculateUpperBounds(scn, scn2, currentScn);
                if (scn2.isNull()) {
                    LOGGER.debug("Requested delay of mining by one iteration");
                    pauseBetweenMiningSessions();
                } else if (getConfig().isArchiveLogOnlyMode() && scn.equals(scn2)) {
                    pauseBetweenMiningSessions();
                } else {
                    resolveFlushStrategy.flush(getCurrentScn());
                    if (isMiningSessionRestartRequired(start) || checkLogSwitchOccurredAndUpdate()) {
                        endMiningSession();
                        if (getConfig().isLogMiningRestartConnection()) {
                            prepareJdbcConnection(true);
                        }
                        prepareLogsForMining(true, scn);
                        start = Stopwatch.accumulating().start();
                    }
                    if (startMiningSession(scn, scn2, i)) {
                        i = 1;
                        scn = process(scn, scn2);
                        getMetrics().setLastBatchProcessingDuration(Duration.between(now, Instant.now()));
                    } else {
                        i++;
                    }
                    captureJdbcSessionMemoryStatistics();
                    pauseBetweenMiningSessions();
                    if (getContext().isPaused()) {
                        executeBlockingSnapshot();
                    }
                }
            }
            if (resolveFlushStrategy != null) {
                resolveFlushStrategy.close();
            }
        } catch (Throwable th) {
            if (resolveFlushStrategy != null) {
                try {
                    resolveFlushStrategy.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    public void close() {
        try {
            this.cacheProvider.close();
        } catch (Exception e) {
            LOGGER.warn("Failed to gracefully shutdown the cache provider", e);
        }
    }

    private LogWriterFlushStrategy resolveFlushStrategy() {
        return getConfig().isLogMiningReadOnly() ? new ReadOnlyLogWriterFlushStrategy() : getConfig().isRacSystem().booleanValue() ? new RacCommitLogWriterFlushStrategy(getConfig(), getJdbcConfiguration(), getMetrics()) : new CommitLogWriterFlushStrategy(getConfig(), getConnection());
    }

    private <T extends Transaction> CacheProvider<T> createCacheProvider(OracleConnectorConfig oracleConnectorConfig) {
        switch (oracleConnectorConfig.getLogMiningBufferType()) {
            case MEMORY:
                return new MemoryCacheProvider(oracleConnectorConfig);
            case INFINISPAN_EMBEDDED:
                return new EmbeddedInfinispanCacheProvider(oracleConnectorConfig);
            case INFINISPAN_REMOTE:
                return new RemoteInfinispanCacheProvider(oracleConnectorConfig);
            case EHCACHE:
                return new EhcacheCacheProvider(oracleConnectorConfig);
            default:
                throw new IncompatibleClassChangeError();
        }
    }

    private <T extends Transaction> TransactionFactory<T> createTransactionFactory(OracleConnectorConfig oracleConnectorConfig) {
        switch (oracleConnectorConfig.getLogMiningBufferType()) {
            case MEMORY:
                return new MemoryTransactionFactory();
            case INFINISPAN_EMBEDDED:
            case INFINISPAN_REMOTE:
                return new InfinispanTransactionFactory();
            case EHCACHE:
                return new EhcacheTransactionFactory();
            default:
                throw new IncompatibleClassChangeError();
        }
    }

    protected Scn process(Scn scn, Scn scn2) throws SQLException, InterruptedException {
        getBatchMetrics().reset();
        PreparedStatement createQueryStatement = createQueryStatement();
        try {
            LOGGER.debug("Fetching results for SCN [{}, {}]", scn, scn2);
            createQueryStatement.setFetchSize(getConfig().getQueryFetchSize());
            createQueryStatement.setFetchDirection(1000);
            createQueryStatement.setString(1, scn.toString());
            createQueryStatement.setString(2, scn2.toString());
            executeAndProcessQuery(createQueryStatement);
            logActiveTransactions();
            Scn calculateNewStartScn = calculateNewStartScn(scn, scn2, m41getOffsetContext().getCommitScn().getMaxCommittedScn());
            if (createQueryStatement != null) {
                createQueryStatement.close();
            }
            return calculateNewStartScn;
        } catch (Throwable th) {
            if (createQueryStatement != null) {
                try {
                    createQueryStatement.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    protected LogMinerTransactionCache<Transaction> getTransactionCache() {
        return this.cacheProvider.getTransactionCache();
    }

    protected LogMinerCache<String, String> getProcessedTransactionsCache() {
        return this.cacheProvider.getProcessedTransactionsCache();
    }

    protected LogMinerCache<String, String> getSchemaChangesCache() {
        return this.cacheProvider.getSchemaChangesCache();
    }

    private boolean isRecentlyProcessed(String str) {
        return getProcessedTransactionsCache().containsKey(str);
    }

    private boolean hasSchemaChangeBeenSeen(LogMinerEventRow logMinerEventRow) {
        return getSchemaChangesCache().containsKey(logMinerEventRow.getScn().toString());
    }

    private int getTransactionEventCount(Transaction transaction) {
        return getTransactionCache().getTransactionEventCount(transaction);
    }

    protected PreparedStatement createQueryStatement() throws SQLException {
        PreparedStatement prepareStatement = getConnection().connection().prepareStatement(this.queryString, 1003, 1007, 1);
        prepareStatement.setQueryTimeout((int) getConnection().config().getQueryTimeout().toSeconds());
        return prepareStatement;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.debezium.connector.oracle.logminer.AbstractLogMinerStreamingChangeEventSource
    public void preProcessEvent(LogMinerEventRow logMinerEventRow) {
        super.preProcessEvent(logMinerEventRow);
        if (EventType.MISSING_SCN.equals(logMinerEventRow.getEventType())) {
            return;
        }
        this.lastProcessedScn = logMinerEventRow.getScn();
        this.lastProcessedScnChangeTime = logMinerEventRow.getChangeTime();
    }

    @Override // io.debezium.connector.oracle.logminer.AbstractLogMinerStreamingChangeEventSource
    protected boolean isEventSkipped(LogMinerEventRow logMinerEventRow) {
        if (logMinerEventRow.getTableId() != null) {
            if (LogWriterFlushStrategy.isFlushTable(logMinerEventRow.getTableId(), getConfig().getJdbcConfig().getUser(), getConfig().getLogMiningFlushTableName())) {
                LOGGER.trace("Skipped change associated with flush table '{}'", logMinerEventRow.getTableId());
                return true;
            }
            if (isNonSchemaChangeEventSkipped(logMinerEventRow)) {
                return true;
            }
        }
        Transaction transaction = getTransactionCache().getTransaction(logMinerEventRow.getTransactionId());
        if (transaction == null || !isTransactionOverEventThreshold(transaction)) {
            return false;
        }
        abandonTransactionOverEventThreshold(transaction);
        return true;
    }

    @Override // io.debezium.connector.oracle.logminer.AbstractLogMinerStreamingChangeEventSource
    protected void handleStartEvent(LogMinerEventRow logMinerEventRow) {
        String transactionId = logMinerEventRow.getTransactionId();
        if (isRecentlyProcessed(transactionId)) {
            return;
        }
        Transaction transaction = getTransactionCache().getTransaction(transactionId);
        if (transaction == null) {
            getTransactionCache().addTransaction(this.transactionFactory.createTransaction(logMinerEventRow));
            getMetrics().setActiveTransactionCount(getTransactionCache().getTransactionCount());
        } else {
            LOGGER.trace("Transaction {} is not yet committed and START event detected.", transactionId);
            getTransactionCache().resetTransactionToStart(transaction);
        }
    }

    @Override // io.debezium.connector.oracle.logminer.AbstractLogMinerStreamingChangeEventSource
    protected void handleCommitEvent(LogMinerEventRow logMinerEventRow) throws InterruptedException {
        String transactionId = logMinerEventRow.getTransactionId();
        if (isRecentlyProcessed(transactionId)) {
            LOGGER.debug("\tTransaction is already committed, skipped.");
            return;
        }
        Transaction andRemoveTransaction = getTransactionCache().getAndRemoveTransaction(transactionId);
        if (andRemoveTransaction == null) {
            if (!m41getOffsetContext().getCommitScn().hasEventScnBeenHandled(logMinerEventRow)) {
                LOGGER.debug("Transaction {} not found in cache with SCN {}, no events to commit.", transactionId, logMinerEventRow.getScn());
            }
            getTransactionCache().removeAbandonedTransaction(logMinerEventRow.getTransactionId());
        }
        Scn calculateSmallestScn = calculateSmallestScn();
        Scn scn = logMinerEventRow.getScn();
        if (m41getOffsetContext().getCommitScn().hasEventScnBeenHandled(logMinerEventRow)) {
            if (andRemoveTransaction != null) {
                if (andRemoveTransaction.getNumberOfEvents() > 0) {
                    LOGGER.debug("Transaction {} has already been processed. Offset Commit SCN {}, Transaction Commit SCN {}, Last Seen Commit SCN {}.", new Object[]{transactionId, m41getOffsetContext().getCommitScn(), scn, m41getOffsetContext().getCommitScn().getCommitScnForRedoThread(logMinerEventRow.getThread())});
                }
                cleanupAfterTransactionRemovedFromCache(andRemoveTransaction, false);
                getMetrics().setActiveTransactionCount(getTransactionCache().getTransactionCount());
                return;
            }
            return;
        }
        int transactionEventCount = andRemoveTransaction == null ? 0 : getTransactionEventCount(andRemoveTransaction);
        boolean z = logMinerEventRow.getThread() == 0 && transactionEventCount == 0;
        Logger logger = LOGGER;
        Object[] objArr = new Object[7];
        objArr[0] = z ? "Skipping commit for" : "Committing";
        objArr[1] = transactionId;
        objArr[2] = Integer.valueOf(transactionEventCount);
        objArr[3] = logMinerEventRow.getScn();
        objArr[4] = Integer.valueOf(logMinerEventRow.getThread());
        objArr[5] = calculateSmallestScn;
        objArr[6] = logMinerEventRow;
        logger.debug("{} transaction {} with {} events (scn: {}, thread: {}, oldest buffer scn: {}): {}", objArr);
        if (z) {
            if (andRemoveTransaction != null) {
                cleanupAfterTransactionRemovedFromCache(andRemoveTransaction, false);
                return;
            }
            return;
        }
        getBatchMetrics().commitObserved();
        m41getOffsetContext().getCommitScn().recordCommit(logMinerEventRow);
        Instant now = Instant.now();
        boolean z2 = false;
        if (transactionEventCount > 0) {
            boolean isTransactionSkippedAtCommit = isTransactionSkippedAtCommit(andRemoveTransaction);
            z2 = !isTransactionSkippedAtCommit;
            ZoneOffset databaseOffset = getMetrics().getDatabaseOffset();
            TransactionCommitConsumer transactionCommitConsumer = new TransactionCommitConsumer((logMinerEvent, j) -> {
                if (calculateSmallestScn.isNull() || scn.compareTo(calculateSmallestScn) < 0) {
                    m41getOffsetContext().setScn(logMinerEvent.getScn());
                    getMetrics().setOldestScnDetails(logMinerEvent.getScn(), logMinerEvent.getChangeTime());
                }
                m41getOffsetContext().setEventScn(logMinerEvent.getScn());
                m41getOffsetContext().setEventCommitScn(logMinerEventRow.getScn());
                m41getOffsetContext().setTransactionId(transactionId);
                m41getOffsetContext().setUserName(andRemoveTransaction.getUserName());
                m41getOffsetContext().setSourceTime(logMinerEvent.getChangeTime().minusSeconds(databaseOffset.getTotalSeconds()));
                m41getOffsetContext().setTableId(logMinerEvent.getTableId());
                m41getOffsetContext().setRedoThread(Integer.valueOf(logMinerEventRow.getThread()));
                m41getOffsetContext().setRsId(logMinerEvent.getRsId());
                m41getOffsetContext().setRowId(logMinerEvent.getRowId());
                m41getOffsetContext().setCommitTime(logMinerEventRow.getChangeTime().minusSeconds(databaseOffset.getTotalSeconds()));
                if (j == 1) {
                    m41getOffsetContext().setStartScn(logMinerEvent.getScn());
                    m41getOffsetContext().setStartTime(logMinerEvent.getChangeTime().minusSeconds(databaseOffset.getTotalSeconds()));
                }
                if (logMinerEvent instanceof RedoSqlDmlEvent) {
                    m41getOffsetContext().setRedoSql(((RedoSqlDmlEvent) logMinerEvent).getRedoSql());
                }
                DmlEvent dmlEvent = (DmlEvent) logMinerEvent;
                if (!isTransactionSkippedAtCommit) {
                    getEventDispatcher().dispatchDataChangeEvent(getPartition(), logMinerEvent.getTableId(), dmlEvent instanceof TruncateEvent ? new LogMinerChangeRecordEmitter(getConfig(), (Partition) getPartition(), (OffsetContext) m41getOffsetContext(), Envelope.Operation.TRUNCATE, dmlEvent.getDmlEntry().getOldValues(), dmlEvent.getDmlEntry().getNewValues(), getSchema().tableFor(logMinerEvent.getTableId()), getSchema(), Clock.system()) : new LogMinerChangeRecordEmitter(getConfig(), (Partition) getPartition(), (OffsetContext) m41getOffsetContext(), dmlEvent.getEventType(), dmlEvent.getDmlEntry().getOldValues(), dmlEvent.getDmlEntry().getNewValues(), getSchema().tableFor(logMinerEvent.getTableId()), getSchema(), Clock.system()));
                }
                m41getOffsetContext().setRedoSql(null);
            }, getConfig(), getSchema());
            try {
                getTransactionCache().forEachEvent(andRemoveTransaction, logMinerEvent2 -> {
                    if (!getContext().isRunning()) {
                        return false;
                    }
                    LOGGER.trace("Dispatching event {}", logMinerEvent2.getEventType());
                    transactionCommitConsumer.accept(logMinerEvent2);
                    return true;
                });
                transactionCommitConsumer.close();
            } catch (Throwable th) {
                try {
                    transactionCommitConsumer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
                throw th;
            }
        }
        m41getOffsetContext().setEventScn(scn);
        m41getOffsetContext().setRsId(logMinerEventRow.getRsId());
        m41getOffsetContext().setRowId("");
        m41getOffsetContext().setStartScn(Scn.NULL);
        m41getOffsetContext().setCommitTime(null);
        m41getOffsetContext().setStartTime(null);
        if (z2) {
            getEventDispatcher().dispatchTransactionCommittedEvent(getPartition(), m41getOffsetContext(), andRemoveTransaction.getChangeTime());
        } else {
            getEventDispatcher().dispatchHeartbeatEvent(getPartition(), m41getOffsetContext());
        }
        if (andRemoveTransaction != null) {
            finalizeTransaction(transactionId, scn, false);
            cleanupAfterTransactionRemovedFromCache(andRemoveTransaction, false);
            getMetrics().calculateLagFromSource(logMinerEventRow.getChangeTime());
            getMetrics().setActiveTransactionCount(getTransactionCache().getTransactionCount());
        }
        updateCommitMetrics(logMinerEventRow, Duration.between(now, Instant.now()));
    }

    @Override // io.debezium.connector.oracle.logminer.AbstractLogMinerStreamingChangeEventSource
    protected void handleRollbackEvent(LogMinerEventRow logMinerEventRow) {
        String transactionId = logMinerEventRow.getTransactionId();
        if (getTransactionCache().containsTransaction(transactionId)) {
            LOGGER.debug("Transaction {} was rolled back.", transactionId);
            finalizeTransaction(transactionId, logMinerEventRow.getScn(), true);
            getMetrics().setActiveTransactionCount(getTransactionCache().getTransactionCount());
        } else {
            LOGGER.debug("Transaction {} not found in cache, no events to rollback.", transactionId);
            getTransactionCache().removeAbandonedTransaction(transactionId);
        }
        getMetrics().incrementRolledBackTransactionCount();
        getMetrics().addRolledBackTransactionId(transactionId);
        getBatchMetrics().rollbackObserved();
    }

    @Override // io.debezium.connector.oracle.logminer.AbstractLogMinerStreamingChangeEventSource
    protected void handleSchemaChangeEvent(LogMinerEventRow logMinerEventRow) throws InterruptedException {
        if (isSchemaChangeEventSkipped(logMinerEventRow)) {
            return;
        }
        if (hasSchemaChangeBeenSeen(logMinerEventRow)) {
            LOGGER.trace("DDL: Scn {}, SQL '{}' has already been processed, skipped.", logMinerEventRow.getScn(), logMinerEventRow.getRedoSql());
            return;
        }
        if (Strings.isNullOrEmpty(logMinerEventRow.getTableName())) {
            return;
        }
        Loggings.logDebugAndTraceRecord(LOGGER, logMinerEventRow, "Processing DDL event with SCN {}: {}", new Object[]{logMinerEventRow.getScn(), logMinerEventRow.getRedoSql()});
        if (canAdvanceLowerScnBoundaryOnSchemaChange(logMinerEventRow)) {
            LOGGER.debug("Schema change advanced offset SCN to {}", logMinerEventRow.getScn());
            m41getOffsetContext().setScn(logMinerEventRow.getScn());
        }
        LOGGER.debug("Schema change advanced offset commit SCN to {} for thread {}", logMinerEventRow.getScn(), Integer.valueOf(logMinerEventRow.getThread()));
        m41getOffsetContext().getCommitScn().recordCommit(logMinerEventRow);
        if (getConfig().isLobEnabled()) {
            getSchemaChangesCache().put(logMinerEventRow.getScn().toString(), logMinerEventRow.getTableId().identifier());
        }
        dispatchSchemaChangeEventInternal(logMinerEventRow);
    }

    @Override // io.debezium.connector.oracle.logminer.AbstractLogMinerStreamingChangeEventSource
    protected void handleTruncateEvent(LogMinerEventRow logMinerEventRow) throws InterruptedException {
        try {
            Table tableForDataEvent = getTableForDataEvent(logMinerEventRow);
            if (tableForDataEvent != null) {
                LOGGER.debug("Dispatching TRUNCATE event for table '{}' with SCN {}", tableForDataEvent.id(), logMinerEventRow.getScn());
                enqueueEvent(logMinerEventRow, new TruncateEvent(logMinerEventRow, parseTruncateEvent(logMinerEventRow)));
            }
        } catch (SQLException e) {
            LOGGER.warn("Failed to process truncate event", e);
            getMetrics().incrementWarningCount();
        }
    }

    @Override // io.debezium.connector.oracle.logminer.AbstractLogMinerStreamingChangeEventSource
    protected boolean isDispatchAllowedForDataChangeEvent(LogMinerEventRow logMinerEventRow) {
        if (!logMinerEventRow.isRollbackFlag()) {
            return true;
        }
        removeEventWithRowId(logMinerEventRow);
        return false;
    }

    @Override // io.debezium.connector.oracle.logminer.AbstractLogMinerStreamingChangeEventSource
    protected void handleReplicationMarkerEvent(LogMinerEventRow logMinerEventRow) {
        String transactionId = logMinerEventRow.getTransactionId();
        Transaction transaction = getTransactionCache().getTransaction(transactionId);
        if (transaction != null) {
            LOGGER.debug("Skipping GoldenGate replication marker for transaction {} with SCN {}", transactionId, logMinerEventRow.getScn());
            getTransactionCache().removeTransactionEvents(transaction);
            getTransactionCache().removeTransaction(transaction);
        }
        getTransactionCache().removeAbandonedTransaction(transactionId);
    }

    private Scn calculateNewStartScn(Scn scn, Scn scn2, Scn scn3) throws InterruptedException {
        Scn scn4;
        Instant instant;
        if (!getBatchMetrics().hasJdbcRows()) {
            return scn;
        }
        Optional<LogMinerTransactionCache.ScnDetails> eldestTransactionScnDetailsInCache = getTransactionCache().getEldestTransactionScnDetailsInCache();
        if (eldestTransactionScnDetailsInCache.isPresent()) {
            scn4 = eldestTransactionScnDetailsInCache.get().scn();
            instant = eldestTransactionScnDetailsInCache.get().changeTime();
        } else {
            scn4 = Scn.NULL;
            instant = null;
        }
        if (scn4.isNull()) {
            getSchemaChangesCache().removeIf(entry -> {
                return true;
            });
        } else {
            abandonTransactions(getConfig().getLogMiningTransactionRetention());
            Scn scn5 = scn4;
            getProcessedTransactionsCache().removeIf(entry2 -> {
                return Scn.valueOf((String) entry2.getValue()).compareTo(scn5) < 0;
            });
            Scn scn6 = scn4;
            getSchemaChangesCache().removeIf(entry3 -> {
                return Scn.valueOf((String) entry3.getKey()).compareTo(scn6) < 0;
            });
        }
        if (!getConfig().isLobEnabled()) {
            if (!this.lastProcessedScn.isNull() && this.lastProcessedScn.compareTo(scn2) < 0) {
                scn2 = this.lastProcessedScn;
            }
            m41getOffsetContext().setScn(scn4.isNull() ? scn2 : scn4.subtract(Scn.valueOf(1)));
            getMetrics().setOldestScnDetails(scn4, instant);
            getMetrics().setOffsetScn(m41getOffsetContext().getScn());
            getEventDispatcher().dispatchHeartbeatEvent(getPartition(), m41getOffsetContext());
            return scn2;
        }
        if (getTransactionCache().isEmpty() && !scn3.isNull()) {
            m41getOffsetContext().setScn(scn3);
            getEventDispatcher().dispatchHeartbeatEvent(getPartition(), m41getOffsetContext());
        } else if (!scn4.isNull()) {
            Scn scn7 = scn4;
            getProcessedTransactionsCache().removeIf(entry4 -> {
                return Scn.valueOf((String) entry4.getValue()).compareTo(scn7) < 0;
            });
            m41getOffsetContext().setScn(scn4.subtract(Scn.valueOf(1)));
            getEventDispatcher().dispatchHeartbeatEvent(getPartition(), m41getOffsetContext());
        }
        return m41getOffsetContext().getScn();
    }

    private Scn calculateSmallestScn() {
        return (Scn) getTransactionCache().getEldestTransactionScnDetailsInCache().map(scnDetails -> {
            getMetrics().setOldestScnDetails(scnDetails.scn(), scnDetails.changeTime());
            return scnDetails.scn();
        }).orElseGet(() -> {
            getMetrics().setOldestScnDetails(Scn.valueOf(-1), null);
            return Scn.NULL;
        });
    }

    private void removeEventWithRowId(LogMinerEventRow logMinerEventRow) {
        Transaction transaction = getTransactionCache().getTransaction(logMinerEventRow.getTransactionId());
        if (transaction != null) {
            if (removeTransactionEventWithRowId(transaction, logMinerEventRow)) {
                return;
            }
            Loggings.logWarningAndTraceRecord(LOGGER, logMinerEventRow, "Cannot apply undo change in transaction '{}' with SCN '{}' on table '{}' since event with row-id {} was not found.", new Object[]{logMinerEventRow.getTransactionId(), logMinerEventRow.getScn(), logMinerEventRow.getTableId(), logMinerEventRow.getRowId()});
        } else {
            if (!logMinerEventRow.getTransactionId().endsWith(NO_SEQUENCE_TRX_ID_SUFFIX)) {
                if (getConfig().isLobEnabled()) {
                    Loggings.logWarningAndTraceRecord(LOGGER, logMinerEventRow, "Failed to apply undo change with SCN '{}' on table '{}' in transaction '{}' with row-id '{}'", new Object[]{logMinerEventRow.getScn(), logMinerEventRow.getTableId(), logMinerEventRow.getTransactionId(), logMinerEventRow.getRowId()});
                    return;
                } else {
                    Loggings.logWarningAndTraceRecord(LOGGER, logMinerEventRow, "Cannot apply undo change with SCN '{}' on table '{}' since transaction '{}' was not found.", new Object[]{logMinerEventRow.getScn(), logMinerEventRow.getTableId(), logMinerEventRow.getTransactionId()});
                    return;
                }
            }
            String substring = logMinerEventRow.getTransactionId().substring(0, 8);
            LOGGER.debug("Undo change refers to a transaction that has no explicit sequence, '{}'", logMinerEventRow.getTransactionId());
            LOGGER.debug("Checking all transactions with prefix '{}'", substring);
            if (((Boolean) getTransactionCache().streamTransactionsAndReturn(stream -> {
                return Boolean.valueOf(stream.filter(transaction2 -> {
                    return transaction2.getTransactionId().startsWith(substring);
                }).anyMatch(transaction3 -> {
                    return removeTransactionEventWithRowId(transaction3, logMinerEventRow);
                }));
            })).booleanValue()) {
                return;
            }
            Loggings.logWarningAndTraceRecord(LOGGER, logMinerEventRow, "Cannot apply undo change in transaction '{}' with SCN '{}' on table '{}' since event with row-id {} was not found.", new Object[]{logMinerEventRow.getTransactionId(), logMinerEventRow.getScn(), logMinerEventRow.getTableId(), logMinerEventRow.getRowId()});
        }
    }

    private boolean removeTransactionEventWithRowId(Transaction transaction, LogMinerEventRow logMinerEventRow) {
        if (!getTransactionCache().removeTransactionEventWithRowId(transaction, logMinerEventRow.getRowId())) {
            return false;
        }
        getMetrics().increasePartialRollbackCount();
        getBatchMetrics().partialRollbackObserved();
        Loggings.logDebugAndTraceRecord(LOGGER, logMinerEventRow, "Undo change on table '{}' applied to transaction event with row-id '{}'", new Object[]{logMinerEventRow.getTableId(), logMinerEventRow.getRowId()});
        return true;
    }

    private void cleanupAfterTransactionRemovedFromCache(Transaction transaction, boolean z) {
        if (z) {
            getTransactionCache().abandon(transaction);
        } else {
            getTransactionCache().removeAbandonedTransaction(transaction.getTransactionId());
        }
        getTransactionCache().removeTransactionEvents(transaction);
    }

    @Override // io.debezium.connector.oracle.logminer.AbstractLogMinerStreamingChangeEventSource
    protected boolean hasEventBeenProcessed(LogMinerEventRow logMinerEventRow) {
        String transactionId = logMinerEventRow.getTransactionId();
        if (isRecentlyProcessed(transactionId)) {
            LOGGER.debug("Transaction {} has been seen by connector, skipped.", transactionId);
            return true;
        }
        if (!getTransactionCache().isAbandoned(transactionId)) {
            return isEventIncludedInSnapshot(logMinerEventRow);
        }
        LOGGER.debug("Event for abandoned transaction {}, skipped.", transactionId);
        return true;
    }

    private boolean isTransactionSkippedAtCommit(Transaction transaction) {
        return transaction != null && (isUserNameSkipped(transaction.getUserName()) || isClientIdSkipped(transaction.getClientId()));
    }

    private void finalizeTransaction(String str, Scn scn, boolean z) {
        Transaction transaction;
        if (z && (transaction = getTransactionCache().getTransaction(str)) != null) {
            getTransactionCache().removeTransactionEvents(transaction);
            getTransactionCache().removeTransaction(transaction);
        }
        getTransactionCache().removeAbandonedTransaction(str);
        if (getConfig().isLobEnabled()) {
            getProcessedTransactionsCache().put(str, scn.toString());
        }
    }

    private boolean canAdvanceLowerScnBoundaryOnSchemaChange(LogMinerEventRow logMinerEventRow) {
        int transactionCount = getTransactionCache().getTransactionCount();
        if (transactionCount == 0) {
            return true;
        }
        if (transactionCount == 1) {
            return ((Boolean) getTransactionCache().streamTransactionsAndReturn(stream -> {
                return Boolean.valueOf(stream.map((v0) -> {
                    return v0.getTransactionId();
                }).allMatch(str -> {
                    return str.equals(logMinerEventRow.getTransactionId());
                }));
            })).booleanValue();
        }
        return false;
    }

    @Override // io.debezium.connector.oracle.logminer.AbstractLogMinerStreamingChangeEventSource
    protected void enqueueEvent(LogMinerEventRow logMinerEventRow, LogMinerEvent logMinerEvent) throws InterruptedException {
        String transactionId = logMinerEventRow.getTransactionId();
        Transaction transaction = getTransactionCache().getTransaction(transactionId);
        if (transaction == null) {
            LOGGER.trace("Transaction {} is not in cache, creating.", transactionId);
            transaction = this.transactionFactory.createTransaction(logMinerEventRow);
            getTransactionCache().addTransaction(transaction);
        }
        int nextEventId = transaction.getNextEventId();
        if (!getTransactionCache().containsTransactionEvent(transaction, nextEventId)) {
            LOGGER.trace("Transaction {}, adding event reference at key {}", transactionId, transaction.getEventId(nextEventId));
            getTransactionCache().addTransactionEvent(transaction, nextEventId, logMinerEvent);
            getMetrics().calculateLagFromSource(logMinerEventRow.getChangeTime());
        }
        getTransactionCache().syncTransaction(transaction);
        getMetrics().setActiveTransactionCount(getTransactionCache().getTransactionCount());
    }

    private boolean isTransactionOverEventThreshold(Transaction transaction) {
        return getConfig().getLogMiningBufferTransactionEventsThreshold() > 0 && ((long) getTransactionEventCount(transaction)) >= getConfig().getLogMiningBufferTransactionEventsThreshold();
    }

    private void abandonTransactionOverEventThreshold(Transaction transaction) {
        LOGGER.warn("Transaction {} exceeds maximum allowed number of events, transaction will be abandoned.", transaction.getTransactionId());
        getMetrics().incrementWarningCount();
        getTransactionCache().getAndRemoveTransaction(transaction.getTransactionId());
        cleanupAfterTransactionRemovedFromCache(transaction, true);
        getMetrics().incrementOversizedTransactionCount();
    }

    protected void abandonTransactions(Duration duration) throws InterruptedException {
        if (Duration.ZERO.equals(duration)) {
            return;
        }
        Optional<Scn> lastScnToAbandon = getLastScnToAbandon(getConnection(), duration);
        if (lastScnToAbandon.isPresent()) {
            Scn scn = lastScnToAbandon.get();
            Scn scn2 = (Scn) getTransactionCache().getEldestTransactionScnDetailsInCache().map((v0) -> {
                return v0.scn();
            }).orElse(Scn.NULL);
            if (!scn2.isNull() && scn.compareTo(scn2) >= 0) {
                Map map = (Map) getTransactionCache().streamTransactionsAndReturn(stream -> {
                    return (Map) stream.filter(transaction -> {
                        return transaction.getStartScn().compareTo(scn) <= 0;
                    }).collect(Collectors.toMap((v0) -> {
                        return v0.getTransactionId();
                    }, transaction2 -> {
                        return transaction2;
                    }));
                });
                boolean z = true;
                for (Map.Entry entry : map.entrySet()) {
                    if (z) {
                        LOGGER.warn("All transactions with SCN <= {} will be abandoned.", scn);
                        z = false;
                    }
                    String str = (String) entry.getKey();
                    Transaction transaction = (Transaction) entry.getValue();
                    LOGGER.warn("Transaction {} (start SCN {}, change time {}, redo thread {}, {} events{}) is being abandoned.", new Object[]{str, transaction.getStartScn(), transaction.getChangeTime(), Integer.valueOf(transaction.getRedoThreadId()), Integer.valueOf(transaction.getNumberOfEvents()), getLoggedAbandonedTransactionTableNames(transaction)});
                    cleanupAfterTransactionRemovedFromCache(transaction, true);
                    getTransactionCache().removeTransaction(transaction);
                    getMetrics().addAbandonedTransactionId(str);
                }
                getMetrics().setActiveTransactionCount(getTransactionCache().getTransactionCount());
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("List of transactions in the cache before transactions being abandoned: [{}]", String.join(",", map.keySet()));
                    getTransactionCache().transactions(stream2 -> {
                        LOGGER.debug("List of transactions in the cache after transactions begin abandoned: [{}]", stream2.map((v0) -> {
                            return v0.getTransactionId();
                        }).collect(Collectors.joining(",")));
                    });
                }
                getTransactionCache().getEldestTransactionScnDetailsInCache().ifPresentOrElse(scnDetails -> {
                    getMetrics().setOldestScnDetails(scnDetails.scn(), scnDetails.changeTime());
                }, () -> {
                    getMetrics().setOldestScnDetails(Scn.NULL, null);
                });
                m41getOffsetContext().setScn(scn);
            }
            getEventDispatcher().dispatchHeartbeatEvent(getPartition(), m41getOffsetContext());
        }
    }

    private String getLoggedAbandonedTransactionTableNames(Transaction transaction) throws InterruptedException {
        if (!ABANDONED_DETAILS_LOGGER.isDebugEnabled()) {
            return "";
        }
        HashSet hashSet = new HashSet();
        getTransactionCache().forEachEvent(transaction, logMinerEvent -> {
            hashSet.add(logMinerEvent.getTableId().identifier());
            return true;
        });
        return String.format(", %d tables [%s]", Integer.valueOf(hashSet.size()), String.join(",", hashSet));
    }

    private Optional<Scn> getLastScnToAbandon(OracleConnection oracleConnection, Duration duration) {
        try {
            return this.lastProcessedScn.isNull() ? Optional.empty() : Optional.of(new Scn((BigInteger) oracleConnection.singleOptionalValue(SqlUtils.getScnByTimeDeltaQuery(this.lastProcessedScn, duration), resultSet -> {
                return resultSet.getBigDecimal(1).toBigInteger();
            })));
        } catch (SQLException e) {
            if (this.lastProcessedScnChangeTime != null) {
                Scn lastScnToAbandonFallbackByTransactionChangeTime = getLastScnToAbandonFallbackByTransactionChangeTime(duration);
                if (!lastScnToAbandonFallbackByTransactionChangeTime.isNull()) {
                    return Optional.of(lastScnToAbandonFallbackByTransactionChangeTime);
                }
            }
            LOGGER.error("Cannot fetch SCN {} by given duration to calculate SCN to abandon", this.lastProcessedScn, e);
            getMetrics().incrementErrorCount();
            return Optional.empty();
        }
    }

    private Scn getLastScnToAbandonFallbackByTransactionChangeTime(Duration duration) {
        LOGGER.debug("Getting abandon SCN breakpoint based on change time {} (retention {} minutes).", this.lastProcessedScnChangeTime, Long.valueOf(duration.toMinutes()));
        return (Scn) getTransactionCache().streamTransactionsAndReturn(stream -> {
            return (Scn) stream.filter(transaction -> {
                Instant changeTime = transaction.getChangeTime();
                long minutes = Duration.between(this.lastProcessedScnChangeTime, changeTime).abs().toMinutes();
                LOGGER.debug("Transaction {} with SCN {} started at {}, age is {} minutes.", new Object[]{transaction.getTransactionId(), transaction.getStartScn(), changeTime, Long.valueOf(minutes)});
                return minutes > 0 && minutes > duration.toMinutes();
            }).max(Comparator.comparing((v0) -> {
                return v0.getStartScn();
            })).map((v0) -> {
                return v0.getStartScn();
            }).orElse(Scn.NULL);
        });
    }

    private void logActiveTransactions() {
        if (!LOGGER.isDebugEnabled() || getTransactionCache().isEmpty()) {
            return;
        }
        this.cacheProvider.getTransactionCache().transactions(stream -> {
            LOGGER.debug("All active transactions: {}", stream.map(transaction -> {
                return transaction.getTransactionId() + " (" + String.valueOf(transaction.getStartScn()) + ")";
            }).collect(Collectors.joining(",")));
        });
    }
}
