Vert.x gRPC Client
Vert.x gRPC Client is a gRPC client powered by Vert.x HTTP client.
This client provides a gRPC request/response oriented API as well as a generated stub approach with a gRPC Channel
Using Vert.x gRPC Client
To use Vert.x gRPC Client, add the following dependency to the dependencies section of your build descriptor:
-
Maven (in your
pom.xml
):
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-grpc-client</artifactId>
<version>4.5.16-SNAPSHOT</version>
</dependency>
-
Gradle (in your
build.gradle
file):
dependencies {
compile 'io.vertx:vertx-grpc-client:4.5.16-SNAPSHOT'
}
Creating a gRPC client
You can easily create the gRPC client
GrpcClient client = GrpcClient.client(vertx);
Client request/response API
The gRPC request/response client API provides an alternative way to interact with a server without the need of a generated stub.
Request/response
Interacting with a gRPC server involves creating a request to the remote gRPC service
SocketAddress server = SocketAddress.inetSocketAddress(443, "example.com");
ServiceMethod<HelloReply, HelloRequest> sayHelloMethod = VertxGreeterGrpcClient.SayHello;
Future<GrpcClientRequest<HelloRequest, HelloReply>> fut = client.request(server, sayHelloMethod);
fut.onSuccess(request -> {
// The end method calls the service
request.end(HelloRequest.newBuilder().setName("Bob").build());
});
ServiceMethod
constants are generated by the Vert.x gRPC protoc plugin.
request.response().onSuccess(response -> {
Future<HelloReply> fut = response.last();
fut.onSuccess(reply -> {
System.out.println("Received " + reply.getMessage());
});
});
Future composition can combine all the previous steps together in a compact fashion
client
.request(server, VertxGreeterGrpcClient.SayHello).compose(request -> {
request.end(HelloRequest
.newBuilder()
.setName("Bob")
.build());
return request.response().compose(response -> response.last());
}).onSuccess(reply -> {
System.out.println("Received " + reply.getMessage());
});
Streaming request
Streaming requests involve calling write
for each element of the stream
and using end
to end the stream
client
.request(server, VertxStreamingGrpcClient.Sink)
.onSuccess(request -> {
for (int i = 0;i < 10;i++) {
request.write(Item.newBuilder().setValue("1").build());
}
request.end();
});
Streaming response
You can set handlers to process response events of a streaming response
client
.request(server, VertxStreamingGrpcClient.Source)
.compose(request -> {
request.end(Empty.getDefaultInstance());
return request.response();
})
.onSuccess(response -> {
response.handler(item -> {
// Process item
});
response.endHandler(v -> {
// Done
});
response.exceptionHandler(err -> {
// Something went bad
});
});
Bidi request/response
A bidi request/response is simply the combination of a streaming request and a streaming response.
Flow control
Request and response are back pressured Vert.x streams.
You can check the writability of a request and set a drain handler
if (request.writeQueueFull()) {
request.drainHandler(v -> {
// Writable again
});
} else {
request.write(item);
}
You can pause/resume/fetch a response
response.pause();
performAsyncOperation().onComplete(ar -> {
// And then resume
response.resume();
});
Cancellation
You can call cancel
to cancel a request
request.cancel();
Note
|
cancellation sends an HTTP/2 reset frame to the server |
Compression
You can compress request messages by setting the request encoding prior before sending any message
request.encoding("gzip");
// Write items after encoding has been defined
request.write(Item.newBuilder().setValue("item-1").build());
request.write(Item.newBuilder().setValue("item-2").build());
request.write(Item.newBuilder().setValue("item-3").build());
Decompression
Decompression is achieved transparently by the client when the server sends encoded responses.
Message level API
The client provides a message level API to interact directly with protobuf encoded gRPC messages.
Tip
|
the client message level API can be used with the server message level API to write a gRPC reverse proxy |
Such API is useful when you are not interested in the content of the messages, and instead you want to forward them to another service, e.g. you are writing a proxy.
Future<GrpcClientRequest<Buffer, Buffer>> requestFut = client.request(server);
requestFut.onSuccess(request -> {
// Set the service name and the method to call
request.serviceName(ServiceName.create("helloworld", "Greeter"));
request.methodName("SayHello");
// Send the protobuf request
request.end(protoHello);
// Handle the response
Future<GrpcClientResponse<Buffer, Buffer>> responseFut = request.response();
responseFut.onSuccess(response -> {
response.handler(protoReply -> {
// Handle the protobuf reply
});
});
});
You can also set a messageHandler
to handle GrpcMessage
, such messages preserve the server encoding.
Future<GrpcClientRequest<Buffer, Buffer>> requestFut = client.request(server);
requestFut.onSuccess(request -> {
// Set the service name and the method to call
request.serviceName(ServiceName.create("helloworld", "Greeter"));
request.methodName("SayHello");
// Send the protobuf request
request.endMessage(GrpcMessage.message("identity", protoHello));
// Handle the response
Future<GrpcClientResponse<Buffer, Buffer>> responseFut = request.response();
responseFut.onSuccess(response -> {
response.messageHandler(replyMessage -> {
System.out.println("Got reply message encoded as " + replyMessage.encoding());
});
});
});
The writeMessage
and endMessage
will
handle the message encoding:
-
when the message uses the response encoding, the message is sent as is
-
when the message uses a different encoding, it will be encoded, e.g. compressed or uncompressed
Client stub API
In addition to the request/response API, the Vert.x gRPC protoc plugin idiomatic service clients.
A client wraps a GrpcClient
and provides Vert.x idiomatic API to interact with the service:
VertxGreeterGrpcClient client = new VertxGreeterGrpcClient(grpcClient, SocketAddress.inetSocketAddress(port, host));
Unary services
Unary services returns a Vert.x Future
Future<HelloReply> response = client.sayHello(HelloRequest.newBuilder().setName("John").build());
response.onSuccess(result -> System.out.println("Service responded: " + response.result().getMessage()));
response.onFailure(err -> System.out.println("Service failure: " + response.cause().getMessage()));
Streaming requests
Streaming requests use a lambda passed a Vert.x WriteStream
of messages sent to the service
Future<Empty> response = client.sink(stream -> {
stream.write(Item.newBuilder().setValue("Value 1").build());
stream.write(Item.newBuilder().setValue("Value 2").build());
stream.end(Item.newBuilder().setValue("Value 3").build());
});
Streaming responses
Streaming responses get a Vert.x ReadStream
of messages sent by the service
Future<ReadStream<Item>> response = client.source(Empty.getDefaultInstance());
response.onSuccess(stream -> stream
.handler(item -> System.out.println("Item " + item.getValue()))
.exceptionHandler(err -> System.out.println("Stream failed " + err.getMessage()))
.endHandler(v -> System.out.println("Stream ended")));
response.onFailure(err -> System.out.println("Service failure: " + err.getMessage()));