package io.debezium.server.milvus;

import com.google.gson.Gson;
import com.google.gson.JsonObject;
import io.debezium.DebeziumException;
import io.debezium.data.Envelope;
import io.debezium.embedded.EmbeddedEngineChangeEvent;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.schema.SchemaFactory;
import io.debezium.server.BaseChangeConsumer;
import io.debezium.server.CustomConsumerBuilder;
import io.milvus.v2.client.ConnectConfig;
import io.milvus.v2.client.MilvusClientV2;
import io.milvus.v2.service.vector.request.DeleteReq;
import io.milvus.v2.service.vector.request.UpsertReq;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import jakarta.enterprise.context.Dependent;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import java.util.List;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Named("milvus")
@Dependent
/* loaded from: input_file:io/debezium/server/milvus/MilvusChangeConsumer.class */
public class MilvusChangeConsumer extends BaseChangeConsumer implements DebeziumEngine.ChangeConsumer<ChangeEvent<Object, Object>> {
    private static final Logger LOGGER = LoggerFactory.getLogger(MilvusChangeConsumer.class);
    private static final String PROP_PREFIX = "debezium.sink.milvus.";
    private MilvusClientV2 milvusClient;
    private MilvusSchema schema;
    private final Gson gson = new Gson();

    @ConfigProperty(name = "debezium.sink.milvus.uri", defaultValue = "http://localhost:19530")
    String uri;

    @ConfigProperty(name = "debezium.sink.milvus.database", defaultValue = "default")
    String databaseName;

    @ConfigProperty(name = "debezium.sink.milvus.unwind.json", defaultValue = "false")
    boolean unwindJson;

    @Inject
    @CustomConsumerBuilder
    Instance<MilvusClientV2> customClient;

    /* renamed from: io.debezium.server.milvus.MilvusChangeConsumer$1, reason: invalid class name */
    /* loaded from: input_file:io/debezium/server/milvus/MilvusChangeConsumer$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$io$debezium$data$Envelope$Operation = new int[Envelope.Operation.values().length];

        static {
            try {
                $SwitchMap$io$debezium$data$Envelope$Operation[Envelope.Operation.READ.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$debezium$data$Envelope$Operation[Envelope.Operation.CREATE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$io$debezium$data$Envelope$Operation[Envelope.Operation.UPDATE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$io$debezium$data$Envelope$Operation[Envelope.Operation.DELETE.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    @PostConstruct
    void connect() {
        if (this.customClient.isResolvable()) {
            this.milvusClient = (MilvusClientV2) this.customClient.get();
            LOGGER.info("Obtained custom configured MilvusClientV2 '{}'", this.milvusClient);
        } else {
            this.milvusClient = new MilvusClientV2(ConnectConfig.builder().uri(this.uri).build());
            this.schema = new MilvusSchema(this.milvusClient);
        }
        if (!this.milvusClient.listDatabases().getDatabaseNames().contains(this.databaseName)) {
            throw new DebeziumException(String.format("Database '%s' does not exist", this.databaseName));
        }
    }

    @PreDestroy
    void close() {
        try {
            this.milvusClient.close();
        } catch (Exception e) {
            LOGGER.warn("Exception while closing client", e);
        }
    }

    public void handleBatch(List<ChangeEvent<Object, Object>> list, DebeziumEngine.RecordCommitter<ChangeEvent<Object, Object>> recordCommitter) throws InterruptedException {
        for (ChangeEvent<Object, Object> changeEvent : list) {
            LOGGER.trace("Received event '{}'", changeEvent);
            SourceRecord sourceRecord = toSourceRecord(changeEvent);
            String replace = this.streamNameMapper.map(changeEvent.destination()).replace('.', '_');
            if (isSchemaChange(sourceRecord)) {
                LOGGER.debug("Schema change event, ignoring it");
                recordCommitter.markProcessed(changeEvent);
            } else {
                if (changeEvent.key() == null) {
                    throw new DebeziumException("Milvus does not support collections without primary key");
                }
                this.schema.validateKey(replace, sourceRecord.keySchema());
                if (sourceRecord.value() == null) {
                    deleteRecord(replace, changeEvent, recordCommitter);
                } else if (Envelope.isEnvelopeSchema(sourceRecord.valueSchema())) {
                    switch (AnonymousClass1.$SwitchMap$io$debezium$data$Envelope$Operation[Envelope.Operation.forCode(((Struct) sourceRecord.value()).getString("op")).ordinal()]) {
                        case 1:
                        case 2:
                        case 3:
                            upsertRecord(replace, changeEvent, recordCommitter);
                            break;
                        case 4:
                            deleteRecord(replace, changeEvent, recordCommitter);
                            break;
                        default:
                            LOGGER.info("Unsupported operation, skipping record '{}'", changeEvent);
                            break;
                    }
                } else {
                    upsertRecord(replace, changeEvent, recordCommitter);
                }
            }
        }
        recordCommitter.markBatchFinished();
    }

    protected SourceRecord toSourceRecord(ChangeEvent<Object, Object> changeEvent) {
        return ((EmbeddedEngineChangeEvent) changeEvent).sourceRecord();
    }

    private void upsertRecord(String str, ChangeEvent<Object, Object> changeEvent, DebeziumEngine.RecordCommitter<ChangeEvent<Object, Object>> recordCommitter) throws InterruptedException {
        this.milvusClient.upsert(UpsertReq.builder().collectionName(str).data(List.of(getValue(changeEvent, toSourceRecord(changeEvent)))).build());
        recordCommitter.markProcessed(changeEvent);
    }

    private void deleteRecord(String str, ChangeEvent<Object, Object> changeEvent, DebeziumEngine.RecordCommitter<ChangeEvent<Object, Object>> recordCommitter) throws InterruptedException {
        Struct struct = (Struct) toSourceRecord(changeEvent).key();
        this.milvusClient.delete(DeleteReq.builder().collectionName(str).ids(List.of(struct.get((Field) struct.schema().fields().get(0)))).build());
        recordCommitter.markProcessed(changeEvent);
    }

    private JsonObject getValue(ChangeEvent<Object, Object> changeEvent, SourceRecord sourceRecord) {
        String string = getString(changeEvent.value());
        Schema valueSchema = sourceRecord.valueSchema();
        JsonObject jsonObject = (JsonObject) this.gson.fromJson(string, JsonObject.class);
        if ((jsonObject.has("schema") || jsonObject.has("schemaId")) && jsonObject.has("payload")) {
            jsonObject = jsonObject.getAsJsonObject("payload");
        }
        if (Envelope.isEnvelopeSchema(sourceRecord.valueSchema())) {
            jsonObject = jsonObject.getAsJsonObject("after");
            valueSchema = valueSchema.field("after").schema();
        }
        if (this.unwindJson) {
            for (Field field : valueSchema.fields()) {
                if ("io.debezium.data.Json".equals(field.schema().name()) && jsonObject.has(field.name())) {
                    jsonObject.add(field.name(), (JsonObject) this.gson.fromJson(jsonObject.get(field.name()).getAsString(), JsonObject.class));
                }
            }
        }
        return jsonObject;
    }

    private boolean isSchemaChange(SourceRecord sourceRecord) {
        return (sourceRecord.valueSchema() == null || sourceRecord.valueSchema().name() == null || !SchemaFactory.get().isSchemaChangeSchema(sourceRecord.valueSchema())) ? false : true;
    }
}
