package io.debezium.transforms.openlineage;

import io.debezium.Module;
import io.debezium.config.Configuration;
import io.debezium.connector.common.BaseSourceTask;
import io.debezium.openlineage.ConnectorContext;
import io.debezium.openlineage.DebeziumOpenLineageEmitter;
import io.debezium.openlineage.dataset.DatasetMetadata;
import io.debezium.transforms.SmtManager;
import io.debezium.util.BoundedConcurrentHashMap;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.components.Versioned;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.transforms.Transformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/transforms/openlineage/OpenLineage.class */
public class OpenLineage<R extends ConnectRecord<R>> implements Transformation<R>, Versioned {
    private static final Logger LOGGER = LoggerFactory.getLogger(OpenLineage.class);
    private static final int CACHE_SIZE = 64;
    private ZonedDateTime lastEmissionTime;
    private final BoundedConcurrentHashMap<String, Boolean> recentlySeenTopics = new BoundedConcurrentHashMap<>(64);
    private final BoundedConcurrentHashMap<Schema, Boolean> recentlySeenSchemas = new BoundedConcurrentHashMap<>(64);
    private SmtManager<R> smtManager;

    public ConfigDef config() {
        return new ConfigDef();
    }

    public void configure(Map<String, ?> map) {
        this.smtManager = new SmtManager<>(Configuration.from(map));
    }

    public R apply(R r) {
        if (isInvalidLineageRecord(r)) {
            return r;
        }
        if (this.recentlySeenTopics.put(r.topic(), true) != null && this.recentlySeenSchemas.put(r.valueSchema(), true) != null) {
            return r;
        }
        if (this.recentlySeenSchemas.put(r.valueSchema(), true) == null) {
            DebeziumOpenLineageEmitter.emit(ConnectorContext.from(r.headers()), BaseSourceTask.State.RUNNING, (List<DatasetMetadata>) List.of(new DatasetMetadata(r.topic(), DatasetMetadata.DatasetType.OUTPUT, r.valueSchema().fields().stream().map(this::buildFieldDefinition).toList())));
            this.lastEmissionTime = ZonedDateTime.now();
        }
        LOGGER.debug("Emitting running event for output dataset {}", r.topic());
        return r;
    }

    private boolean isInvalidLineageRecord(R r) {
        return r.value() == null || this.smtManager.isValidSchemaChange(r) || this.smtManager.isValidNotification(r) || this.smtManager.isValidHeartBeat(r);
    }

    private DatasetMetadata.FieldDefinition buildFieldDefinition(Field field) {
        Schema schema = field.schema();
        String name = field.name();
        String name2 = schema.type().name();
        String doc = schema.doc();
        return (schema.type() != Schema.Type.STRUCT || schema.fields() == null || schema.fields().isEmpty()) ? new DatasetMetadata.FieldDefinition(name, name2, doc) : new DatasetMetadata.FieldDefinition(name, name2, doc, schema.fields().stream().map(this::buildFieldDefinition).toList());
    }

    public void close() {
    }

    public String version() {
        return Module.version();
    }
}
