package io.debezium.server.milvus;

import com.google.gson.JsonObject;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.server.TestConfigSource;
import io.debezium.server.events.ConnectorCompletedEvent;
import io.debezium.util.Testing;
import io.milvus.v2.client.ConnectConfig;
import io.milvus.v2.client.MilvusClientV2;
import io.milvus.v2.service.vector.request.QueryReq;
import io.milvus.v2.service.vector.response.QueryResp;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import jakarta.enterprise.event.Observes;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

@QuarkusTest
@QuarkusTestResource.List({@QuarkusTestResource(VectorPostgresTestResourceLifecycleManager.class), @QuarkusTestResource(MilvusTestResourceLifecycleManager.class)})
/* loaded from: input_file:io/debezium/server/milvus/MilvusIT.class */
public class MilvusIT {
    private static final int MESSAGE_COUNT = 2;
    public static final String COLLECTION_NAME = "testc_inventory_t_vector";

    @ConfigProperty(name = "debezium.source.database.hostname")
    String dbHostname;

    @ConfigProperty(name = "debezium.source.database.port")
    String dbPort;

    @ConfigProperty(name = "debezium.source.database.user")
    String dbUser;

    @ConfigProperty(name = "debezium.source.database.password")
    String dbPassword;

    @ConfigProperty(name = "debezium.source.database.dbname")
    String dbName;

    @ConfigProperty(name = "debezium.sink.milvus.uri")
    String milvusUri;
    private MilvusClientV2 client;

    public MilvusIT() {
        Testing.Files.delete(TestConfigSource.OFFSET_STORE_PATH);
        Testing.Files.createTestingFile(MilvusTestConfigSource.OFFSET_STORE_PATH);
    }

    @BeforeEach
    void setupDependencies() throws Exception {
        Testing.Print.enable();
        this.client = new MilvusClientV2(ConnectConfig.builder().uri(this.milvusUri).build());
    }

    void connectorCompleted(@Observes ConnectorCompletedEvent connectorCompletedEvent) throws Exception {
        if (!connectorCompletedEvent.isSuccess()) {
            throw new RuntimeException((Throwable) connectorCompletedEvent.getError().get());
        }
    }

    @Test
    public void testMilvus() throws Exception {
        AtomicReference atomicReference = new AtomicReference();
        Awaitility.await().atMost(Duration.ofSeconds(MilvusTestConfigSource.waitForSeconds())).until(() -> {
            QueryResp query = this.client.query(QueryReq.builder().collectionName(COLLECTION_NAME).filter("value like \"%\"").build());
            atomicReference.set(query.getQueryResults());
            return Boolean.valueOf(query.getQueryResults().size() == MESSAGE_COUNT);
        });
        Assertions.assertThat((List) atomicReference.get()).hasSize(MESSAGE_COUNT);
        Map entity = ((QueryResp.QueryResult) ((List) atomicReference.get()).get(0)).getEntity();
        Map entity2 = ((QueryResp.QueryResult) ((List) atomicReference.get()).get(1)).getEntity();
        Assertions.assertThat(entity.get("pk")).isEqualTo(1L);
        Assertions.assertThat(entity.get("value")).isEqualTo("one");
        Assertions.assertThat(entity.get("f_vector")).isEqualTo(List.of(Float.valueOf(1.1f), Float.valueOf(1.2f), Float.valueOf(1.3f)));
        Assertions.assertThat(entity.get("f_json")).isInstanceOf(JsonObject.class);
        Assertions.assertThat(entity2.get("pk")).isEqualTo(2L);
        Assertions.assertThat(entity2.get("value")).isEqualTo("two");
        Assertions.assertThat(entity2.get("f_vector")).isEqualTo(List.of(Float.valueOf(2.1f), Float.valueOf(2.2f), Float.valueOf(2.3f)));
        Assertions.assertThat(entity2.get("f_json")).isInstanceOf(JsonObject.class);
        JdbcConfiguration build = JdbcConfiguration.create().with("hostname", this.dbHostname).with("port", this.dbPort).with("user", this.dbUser).with("password", this.dbPassword).with("dbname", this.dbName).build();
        PostgresConnection postgresConnection = new PostgresConnection(build, "Debezium Milvus Test");
        try {
            postgresConnection.execute(new String[]{"UPDATE inventory.t_vector SET value = 'two-up' WHERE pk = 2"});
            postgresConnection.close();
            Awaitility.await().atMost(Duration.ofSeconds(MilvusTestConfigSource.waitForSeconds())).until(() -> {
                QueryResp query = this.client.query(QueryReq.builder().collectionName(COLLECTION_NAME).filter("value like \"two-up\"").build());
                atomicReference.set(query.getQueryResults());
                return Boolean.valueOf(query.getQueryResults().size() == 1);
            });
            Assertions.assertThat((List) atomicReference.get()).hasSize(1);
            Map entity3 = ((QueryResp.QueryResult) ((List) atomicReference.get()).get(0)).getEntity();
            Assertions.assertThat(entity3.get("pk")).isEqualTo(2L);
            Assertions.assertThat(entity3.get("value")).isEqualTo("two-up");
            Assertions.assertThat(entity3.get("f_vector")).isEqualTo(List.of(Float.valueOf(2.1f), Float.valueOf(2.2f), Float.valueOf(2.3f)));
            postgresConnection = new PostgresConnection(build, "Debezium Milvus Test");
            try {
                postgresConnection.execute(new String[]{"DELETE FROM inventory.t_vector WHERE pk = 2"});
                postgresConnection.close();
                Awaitility.await().atMost(Duration.ofSeconds(MilvusTestConfigSource.waitForSeconds())).until(() -> {
                    QueryResp query = this.client.query(QueryReq.builder().collectionName(COLLECTION_NAME).filter("value like \"two-up\"").build());
                    atomicReference.set(query.getQueryResults());
                    return Boolean.valueOf(query.getQueryResults().size() == 0);
                });
                Assertions.assertThat((List) atomicReference.get()).hasSize(0);
            } finally {
            }
        } finally {
        }
    }
}
