package io.debezium.pipeline.txmetadata;

import io.debezium.annotation.NotThreadSafe;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.function.BlockingConsumer;
import io.debezium.pipeline.source.spi.EventMetadataProvider;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Partition;
import io.debezium.schema.SchemaFactory;
import io.debezium.schema.SchemaNameAdjuster;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.util.Loggings;
import java.time.Instant;
import java.util.Objects;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
/* loaded from: input_file:io/debezium/pipeline/txmetadata/TransactionMonitor.class */
public class TransactionMonitor {
    private static final Logger LOGGER = LoggerFactory.getLogger(TransactionMonitor.class);
    public static final Schema TRANSACTION_BLOCK_SCHEMA = SchemaFactory.get().transactionBlockSchema();
    private final EventMetadataProvider eventMetadataProvider;
    private final String topicName;
    private final BlockingConsumer<SourceRecord> sender;
    private final CommonConnectorConfig connectorConfig;
    private final TransactionStructMaker transactionStructMaker;
    protected final Schema transactionKeySchema;
    protected final String DEBEZIUM_TRANSACTION_ID_KEY = "id";

    public TransactionMonitor(CommonConnectorConfig commonConnectorConfig, EventMetadataProvider eventMetadataProvider, SchemaNameAdjuster schemaNameAdjuster, BlockingConsumer<SourceRecord> blockingConsumer, String str) {
        Objects.requireNonNull(eventMetadataProvider);
        this.transactionStructMaker = commonConnectorConfig.getTransactionMetadataFactory().getTransactionStructMaker();
        this.transactionKeySchema = this.transactionStructMaker.getTransactionKeySchema();
        this.topicName = str;
        this.eventMetadataProvider = eventMetadataProvider;
        this.sender = blockingConsumer;
        this.connectorConfig = commonConnectorConfig;
    }

    public void dataEvent(Partition partition, DataCollectionId dataCollectionId, OffsetContext offsetContext, Object obj, Struct struct) throws InterruptedException {
        if (this.connectorConfig.shouldProvideTransactionMetadata()) {
            TransactionContext transactionContext = offsetContext.getTransactionContext();
            String transactionId = this.eventMetadataProvider.getTransactionId(dataCollectionId, offsetContext, obj, struct);
            TransactionInfo transactionInfo = this.eventMetadataProvider.getTransactionInfo(dataCollectionId, offsetContext, obj, struct);
            if (transactionId == null) {
                if (LOGGER.isTraceEnabled()) {
                    LOGGER.trace("Event '{}' has no transaction id", this.eventMetadataProvider.toSummaryString(dataCollectionId, offsetContext, Loggings.maybeRedactSensitiveData(obj), struct));
                }
                if (transactionContext.isTransactionInProgress()) {
                    LOGGER.trace("Transaction was in progress, executing implicit transaction commit");
                    endTransaction(partition, offsetContext, this.eventMetadataProvider.getEventTimestamp(dataCollectionId, offsetContext, obj, struct));
                    return;
                }
                return;
            }
            if (!transactionContext.isTransactionInProgress()) {
                transactionContext.beginTransaction(transactionInfo);
                beginTransaction(partition, offsetContext, this.eventMetadataProvider.getEventTimestamp(dataCollectionId, offsetContext, obj, struct));
            } else if (!transactionContext.getTransactionId().equals(transactionId)) {
                endTransaction(partition, offsetContext, this.eventMetadataProvider.getEventTimestamp(dataCollectionId, offsetContext, obj, struct));
                transactionContext.endTransaction();
                transactionContext.beginTransaction(transactionInfo);
                beginTransaction(partition, offsetContext, this.eventMetadataProvider.getEventTimestamp(dataCollectionId, offsetContext, obj, struct));
            }
            transactionEvent(offsetContext, dataCollectionId, struct);
        }
    }

    public void transactionComittedEvent(Partition partition, OffsetContext offsetContext, Instant instant) throws InterruptedException {
        if (this.connectorConfig.shouldProvideTransactionMetadata()) {
            if (offsetContext.getTransactionContext().isTransactionInProgress()) {
                endTransaction(partition, offsetContext, instant);
            }
            offsetContext.getTransactionContext().endTransaction();
        }
    }

    public void transactionStartedEvent(Partition partition, TransactionInfo transactionInfo, OffsetContext offsetContext, Instant instant) throws InterruptedException {
        if (this.connectorConfig.shouldProvideTransactionMetadata()) {
            offsetContext.getTransactionContext().beginTransaction(transactionInfo);
            beginTransaction(partition, offsetContext, instant);
        }
    }

    protected Struct prepareTxKey(OffsetContext offsetContext) {
        return this.transactionStructMaker.buildTransactionKey(offsetContext);
    }

    protected Struct prepareTxBeginValue(OffsetContext offsetContext, Instant instant) {
        return this.transactionStructMaker.buildBeginTransactionValue(offsetContext, instant);
    }

    protected Struct prepareTxEndValue(OffsetContext offsetContext, Instant instant) {
        return this.transactionStructMaker.buildEndTransactionValue(offsetContext, instant);
    }

    protected Struct prepareTxStruct(OffsetContext offsetContext, long j, Struct struct) {
        return this.transactionStructMaker.addTransactionBlock(offsetContext, j, struct);
    }

    private void transactionEvent(OffsetContext offsetContext, DataCollectionId dataCollectionId, Struct struct) {
        long event = offsetContext.getTransactionContext().event(dataCollectionId);
        if (struct == null) {
            LOGGER.debug("Event with key {} without value. Cannot enrich source block.");
        } else {
            struct.put("transaction", prepareTxStruct(offsetContext, event, struct));
        }
    }

    private void beginTransaction(Partition partition, OffsetContext offsetContext, Instant instant) throws InterruptedException {
        Struct prepareTxKey = prepareTxKey(offsetContext);
        Struct prepareTxBeginValue = prepareTxBeginValue(offsetContext, instant);
        this.sender.accept(new SourceRecord(partition.getSourcePartition(), offsetContext.getOffset(), this.topicName, (Integer) null, prepareTxKey.schema(), prepareTxKey, prepareTxBeginValue.schema(), prepareTxBeginValue));
    }

    private void endTransaction(Partition partition, OffsetContext offsetContext, Instant instant) throws InterruptedException {
        Struct prepareTxKey = prepareTxKey(offsetContext);
        Struct prepareTxEndValue = prepareTxEndValue(offsetContext, instant);
        this.sender.accept(new SourceRecord(partition.getSourcePartition(), offsetContext.getOffset(), this.topicName, (Integer) null, prepareTxKey.schema(), prepareTxKey, prepareTxEndValue.schema(), prepareTxEndValue));
    }
}
