Vert.x gRPC Server
Vert.x gRPC Server is a gRPC server powered by Vert.x HTTP server superseding the integrated Netty based gRPC client.
This server provides a gRPC request/response oriented API as well as a generated stub approach with the Vert.x gRPC Generator.
Using Vert.x gRPC Server
To use Vert.x gRPC Server, add the following dependency to the dependencies section of your build descriptor:
-
Maven (in your
pom.xml
):
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-grpc-server</artifactId>
<version>4.5.16-SNAPSHOT</version>
</dependency>
-
Gradle (in your
build.gradle
file):
dependencies {
compile 'io.vertx:vertx-grpc-server:4.5.16-SNAPSHOT'
}
Creating a gRPC server
A GrpcServer
is a Handler<HttpServerRequest>
and can be used as an HTTP server request handler.
GrpcServer grpcServer = GrpcServer.server(vertx);
HttpServer server = vertx.createHttpServer(options);
server
.requestHandler(grpcServer)
.listen();
Tip
|
A
|
Server request/response API
The gRPC request/response server API provides an alternative way to interact with a client without the need of extending a Java class.
Request/response
Each service method is processed by a handler, the handler is bound using a ServiceMethod
.
ServiceMethod<HelloRequest, HelloReply> serviceMethod = VertxGreeterGrpcServer.SayHello;
server.callHandler(serviceMethod, request -> {
request.handler(hello -> {
GrpcServerResponse<HelloRequest, HelloReply> response = request.response();
HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + hello.getName()).build();
response.end(reply);
});
});
ServiceMethod
constants are generated by the Vert.x gRPC protoc plugin.
Streaming request
You can set handlers to process request events
server.callHandler(VertxStreamingGrpcServer.Sink, request -> {
request.handler(item -> {
// Process item
});
request.endHandler(v ->{
// No more items
// Send the response
request.response().end(Empty.getDefaultInstance());
});
request.exceptionHandler(err -> {
// Something wrong happened
});
});
Streaming response
A streaming response involves calling write
for each element of the stream
and using end
to end the stream
server.callHandler(VertxStreamingGrpcServer.Source, request -> {
GrpcServerResponse<Empty, Item> response = request.response();
request.handler(empty -> {
for (int i = 0;i < 10;i++) {
response.write(Item.newBuilder().setValue("1").build());
}
response.end();
});
});
Bidi request/response
A bidi request/response is simply the combination of a streaming request and a streaming response
server.callHandler(VertxStreamingGrpcServer.Pipe, request -> {
request.handler(item -> request.response().write(item));
request.endHandler(v -> request.response().end());
});
Note
|
The gRPC-Web protocol does not support bidirectional streaming. |
Flow control
Request and response are back pressured Vert.x streams.
You can pause/resume/fetch a request
request.pause();
performAsyncOperation().onComplete(ar -> {
// And then resume
request.resume();
});
You can check the writability of a response and set a drain handler
if (response.writeQueueFull()) {
response.drainHandler(v -> {
// Writable again
});
} else {
response.write(item);
}
Compression
You can compress response messages by setting the response encoding prior before sending any message
response.encoding("gzip");
// Write items after encoding has been defined
response.write(Item.newBuilder().setValue("item-1").build());
response.write(Item.newBuilder().setValue("item-2").build());
response.write(Item.newBuilder().setValue("item-3").build());
Note
|
Compression is not supported over the gRPC-Web protocol. |
Decompression
Decompression is done transparently by the server when the client send encoded requests.
Note
|
Decompression is not supported over the gRPC-Web protocol. |
Message level API
The server provides a message level API to interact directly with protobuf encoded gRPC messages.
Tip
|
the server message level API can be used with the client message level API to write a gRPC reverse proxy |
Such API is useful when you are not interested in the content of the messages, and instead you want to forward them to another service, e.g. you are writing a proxy.
ServiceName greeterServiceName = ServiceName.create("helloworld", "Greeter");
server.callHandler(request -> {
if (request.serviceName().equals(greeterServiceName) && request.methodName().equals("SayHello")) {
request.handler(protoHello -> {
// Handle protobuf encoded hello
performAsyncOperation(protoHello)
.onSuccess(protoReply -> {
// Reply with protobuf encoded reply
request.response().end(protoReply);
}).onFailure(err -> {
request.response()
.status(GrpcStatus.ABORTED)
.end();
});
});
} else {
request.response()
.status(GrpcStatus.NOT_FOUND)
.end();
}
});
You can also set a messageHandler
to handle GrpcMessage
, such messages preserve the
client encoding, which is useful the service you are forwarding to can handle compressed messages directly, in this case
the message does not need to be decompressed and compressed again.
ServiceName greeterServiceName = ServiceName.create("helloworld", "Greeter");
server.callHandler(request -> {
if (request.serviceName().equals(greeterServiceName) && request.methodName().equals("SayHello")) {
request.messageHandler(helloMessage -> {
// Can be identity or gzip
String helloEncoding = helloMessage.encoding();
// Handle hello message
handleGrpcMessage(helloMessage)
.onSuccess(replyMessage -> {
// Reply with reply message
// Can be identity or gzip
String replyEncoding = replyMessage.encoding();
// Send the reply
request.response().endMessage(replyMessage);
}).onFailure(err -> {
request.response()
.status(GrpcStatus.ABORTED)
.end();
});
});
} else {
request.response()
.status(GrpcStatus.NOT_FOUND)
.end();
}
});
The writeMessage
and endMessage
will
handle the message encoding:
-
when the message uses the response encoding, the message is sent as is
-
when the message uses a different encoding, it will be encoded, e.g. compressed or uncompressed
Server stub API
In addition to the request/response API, the Vert.x gRPC protoc plugin idiomatic service stubs.
Each service comes in two flavors, you can override the method you like depending on the style.
Unary services
Unary services can return a Vert.x Future
:
VertxGreeterGrpcServer.GreeterApi stub = new VertxGreeterGrpcServer.GreeterApi() {
@Override
public Future<HelloReply> sayHello(HelloRequest request) {
return Future.succeededFuture(HelloReply.newBuilder().setMessage("Hello " + request.getName()).build());
}
};
or process a Vert.x Promise
VertxGreeterGrpcServer.GreeterApi stub = new VertxGreeterGrpcServer.GreeterApi() {
@Override
public void sayHello(HelloRequest request, Promise<HelloReply> response) {
response.complete(HelloReply.newBuilder().setMessage("Hello " + request.getName()).build());
}
};
In both case you need to bind the stub to an existing GrpcServer
:
stub.bindAll(server);
Streaming requests
Streaming requests are implemented with a ReadStream
:
VertxStreamingGrpcServer.StreamingApi stub = new VertxStreamingGrpcServer.StreamingApi() {
@Override
public void sink(ReadStream<Item> stream, Promise<Empty> response) {
stream.handler(item -> {
System.out.println("Process item " + item.getValue());
});
// Send response
stream.endHandler(v -> response.complete(Empty.getDefaultInstance()));
}
};
Streaming responses
Streaming responses are implemented with Vert.x streams and comes in two flavors.
You can return a Vert.x ReadStream
and let the service send it for you:
VertxStreamingGrpcServer.StreamingApi stub = new VertxStreamingGrpcServer.StreamingApi() {
@Override
public ReadStream<Item> source(Empty request) {
return streamOfItems();
}
};
or you can process a WriteStream
:
VertxStreamingGrpcServer.StreamingApi stub = new VertxStreamingGrpcServer.StreamingApi() {
@Override
public void source(Empty request, WriteStream<Item> response) {
response.write(Item.newBuilder().setValue("value-1").build());
response.end(Item.newBuilder().setValue("value-2").build());
}
};