package io.debezium.server.qdrant;

import io.debezium.DebeziumException;
import io.debezium.data.Envelope;
import io.debezium.embedded.EmbeddedEngineChangeEvent;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.schema.SchemaFactory;
import io.debezium.server.BaseChangeConsumer;
import io.debezium.server.CustomConsumerBuilder;
import io.grpc.Grpc;
import io.grpc.InsecureChannelCredentials;
import io.qdrant.client.QdrantClient;
import io.qdrant.client.QdrantGrpcClient;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import jakarta.enterprise.context.Dependent;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.eclipse.microprofile.config.ConfigProvider;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Named("qdrant")
@Dependent
/* loaded from: input_file:io/debezium/server/qdrant/QdrantChangeConsumer.class */
public class QdrantChangeConsumer extends BaseChangeConsumer implements DebeziumEngine.ChangeConsumer<ChangeEvent<Object, Object>> {
    private static final Logger LOGGER = LoggerFactory.getLogger(QdrantChangeConsumer.class);
    private static final String PROP_PREFIX = "debezium.sink.qdrant.";
    private static final String FIELD_INCLUDE_LIST_PROP_PREFIX = "debezium.sink.qdrant.field.include.list.";
    private QdrantClient qdrantClient;
    private QdrantMessageFactory messageFactory;

    @ConfigProperty(name = "debezium.sink.qdrant.host", defaultValue = "localhost")
    String host;

    @ConfigProperty(name = "debezium.sink.qdrant.port", defaultValue = "6333")
    int port;

    @ConfigProperty(name = "debezium.sink.qdrant.api.key")
    Optional<String> apiKey;

    @ConfigProperty(name = "debezium.sink.qdrant.vector.field.names")
    Optional<String> vectorFieldNames;

    @Inject
    @CustomConsumerBuilder
    Instance<QdrantClient> customClient;

    /* renamed from: io.debezium.server.qdrant.QdrantChangeConsumer$1, reason: invalid class name */
    /* loaded from: input_file:io/debezium/server/qdrant/QdrantChangeConsumer$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$io$debezium$data$Envelope$Operation = new int[Envelope.Operation.values().length];

        static {
            try {
                $SwitchMap$io$debezium$data$Envelope$Operation[Envelope.Operation.READ.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$debezium$data$Envelope$Operation[Envelope.Operation.CREATE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$io$debezium$data$Envelope$Operation[Envelope.Operation.UPDATE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$io$debezium$data$Envelope$Operation[Envelope.Operation.DELETE.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    @PostConstruct
    void connect() {
        if (this.customClient.isResolvable()) {
            this.qdrantClient = (QdrantClient) this.customClient.get();
            LOGGER.info("Obtained custom configured QdrantClient '{}'", this.qdrantClient);
        } else {
            QdrantGrpcClient.Builder newBuilder = QdrantGrpcClient.newBuilder(Grpc.newChannelBuilder("%s:%s".formatted(this.host, Integer.valueOf(this.port)), InsecureChannelCredentials.create()).build(), true);
            if (this.apiKey.isPresent()) {
                newBuilder = newBuilder.withApiKey(this.apiKey.get());
            }
            this.qdrantClient = new QdrantClient(newBuilder.build());
            LOGGER.info("Created standard QdrantClient '{}'", this.qdrantClient);
        }
        this.messageFactory = new QdrantMessageFactory(this.vectorFieldNames, getConfigSubset(ConfigProvider.getConfig(), FIELD_INCLUDE_LIST_PROP_PREFIX));
    }

    @PreDestroy
    void close() {
        try {
            this.qdrantClient.close();
        } catch (Exception e) {
            LOGGER.warn("Exception while closing client", e);
        }
    }

    public void handleBatch(List<ChangeEvent<Object, Object>> list, DebeziumEngine.RecordCommitter<ChangeEvent<Object, Object>> recordCommitter) throws InterruptedException {
        for (ChangeEvent<Object, Object> changeEvent : list) {
            LOGGER.trace("Received event '{}'", changeEvent);
            String map = this.streamNameMapper.map(changeEvent.destination());
            SourceRecord sourceRecord = toSourceRecord(changeEvent);
            if (isSchemaChange(sourceRecord)) {
                LOGGER.debug("Schema change event, ignoring it");
                recordCommitter.markProcessed(changeEvent);
            } else {
                if (changeEvent.key() == null) {
                    throw new DebeziumException("Qdrant does not support collections without primary key");
                }
                if (sourceRecord.value() == null) {
                    deleteRecord(map, changeEvent, recordCommitter);
                } else if (Envelope.isEnvelopeSchema(sourceRecord.valueSchema())) {
                    Struct struct = (Struct) sourceRecord.value();
                    Struct struct2 = ((Struct) sourceRecord.value()).getStruct("after");
                    switch (AnonymousClass1.$SwitchMap$io$debezium$data$Envelope$Operation[Envelope.Operation.forCode(struct.getString("op")).ordinal()]) {
                        case 1:
                        case 2:
                        case 3:
                            upsertRecord(map, changeEvent, struct2, recordCommitter);
                            break;
                        case 4:
                            deleteRecord(map, changeEvent, recordCommitter);
                            break;
                        default:
                            LOGGER.info("Unsupported operation, skipping record '{}'", changeEvent);
                            break;
                    }
                } else {
                    upsertRecord(map, changeEvent, (Struct) sourceRecord.value(), recordCommitter);
                }
            }
        }
        recordCommitter.markBatchFinished();
    }

    protected SourceRecord toSourceRecord(ChangeEvent<Object, Object> changeEvent) {
        return ((EmbeddedEngineChangeEvent) changeEvent).sourceRecord();
    }

    private boolean isSchemaChange(SourceRecord sourceRecord) {
        return (sourceRecord.valueSchema() == null || sourceRecord.valueSchema().name() == null || !SchemaFactory.get().isSchemaChangeSchema(sourceRecord.valueSchema())) ? false : true;
    }

    private void upsertRecord(String str, ChangeEvent<Object, Object> changeEvent, Struct struct, DebeziumEngine.RecordCommitter<ChangeEvent<Object, Object>> recordCommitter) throws InterruptedException {
        Struct struct2 = (Struct) toSourceRecord(changeEvent).key();
        try {
            recordCommitter.markProcessed(changeEvent);
        } catch (ExecutionException e) {
            throw new DebeziumException("Error while upserting data into Qdrant", e.getCause());
        }
    }

    private void deleteRecord(String str, ChangeEvent<Object, Object> changeEvent, DebeziumEngine.RecordCommitter<ChangeEvent<Object, Object>> recordCommitter) throws InterruptedException {
        try {
            recordCommitter.markProcessed(changeEvent);
        } catch (ExecutionException e) {
            throw new DebeziumException("Error while deleteing data from Qdrant", e.getCause());
        }
    }
}
