Vert.x gRPC Server
Vert.x gRPC Server is a gRPC server powered by Vert.x HTTP server superseding the integrated Netty based gRPC client.
This server provides a gRPC request/response oriented API as well as a generated stub approach with the Vert.x gRPC Generator.
Using Vert.x gRPC Server
To use Vert.x gRPC Server, add the following dependency to the dependencies section of your build descriptor:
-
Maven (in your
pom.xml
):
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-grpc-server</artifactId>
<version>5.0.1-SNAPSHOT</version>
</dependency>
-
Gradle (in your
build.gradle
file):
dependencies {
compile 'io.vertx:vertx-grpc-server:5.0.1-SNAPSHOT'
}
Creating a gRPC server
A GrpcServer
is a Handler<HttpServerRequest>
and can be used as an HTTP server request handler.
GrpcServer grpcServer = GrpcServer.server(vertx);
HttpServer server = vertx.createHttpServer(options);
server
.requestHandler(grpcServer)
.listen();
A GrpcServer
can be used within a Vert.x Web router:
route.consumes("application/grpc").handler(rc -> grpcServer.handle(rc.request()));
Idiomatic server stub generation
The Vert.x gRPC protoc plugin generates idiomatic service stubs:
-
examples/Greeter.java
-
examples/GreeterService.java
-
examples/GreeterGrpcService.java
The Greeter
interface defines the primary contract of the service:
public interface Greeter {
Future<examples.grpc.HelloReply> sayHello(examples.grpc.HelloRequest request);
}
The GreeterService
class implements the Greeter
contract:
public class GreeterService implements Greeter {
public Future<examples.grpc.HelloReply> sayHello(examples.grpc.HelloRequest request) {
// ...
}
protected void sayHello(examples.grpc.HelloRequest request, Completable<examples.grpc.HelloReply> response) {
// ...
}
}
Usually, you extend this class to provide a service implementation.
The GreeterGrpcService
class extends GreeterService
and let you turn your service implementation into a gRPC service
that can be bound to a Vert.x gRPC server:
GreeterGrpcService service = GreeterGrpcService.create(new GreeterService() {
@Override
public Future<HelloReply> sayHello(HelloRequest request) {
return Future.succeededFuture(HelloReply.newBuilder()
.setMessage("Hello " + request.getName())
.build());
}
});
grpcServer.add(service);
Each service method comes in two flavors, you can override the method you like depending on the style.
Unary methods
Unary methods return a Vert.x Future
GreeterService service = new GreeterService() {
@Override
public Future<HelloReply> sayHello(HelloRequest request) {
return Future.succeededFuture(HelloReply.newBuilder().setMessage("Hello " + request.getName()).build());
}
};
or process a Vert.x Promise
GreeterService service = new GreeterService() {
@Override
public void sayHello(HelloRequest request, Completable<HelloReply> response) {
response.succeed(HelloReply.newBuilder().setMessage("Hello " + request.getName()).build());
}
};
In both case you need to bind the stub to an existing GrpcServer
server.addService(service);
Streaming requests
Streaming requests are implemented with a ReadStream
:
StreamingGrpcService service = new StreamingGrpcService() {
@Override
public void sink(ReadStream<Item> stream, Completable<Empty> response) {
stream.handler(item -> {
System.out.println("Process item " + item.getValue());
});
// Send response
stream.endHandler(v -> response.succeed(Empty.getDefaultInstance()));
}
};
server.addService(service);
Streaming responses
Streaming responses are implemented with Vert.x streams and comes in two flavors.
You can return a Vert.x ReadStream
and let the service send it for you:
StreamingService service = new StreamingService() {
@Override
public Future<ReadStream<Item>> source(Empty request) {
return streamOfItems();
}
};
or you can process a WriteStream
:
StreamingService service = new StreamingService() {
@Override
public void source(Empty request, WriteStream<Item> response) {
response.write(Item.newBuilder().setValue("value-1").build());
response.end(Item.newBuilder().setValue("value-2").build());
}
};
Server request/response API
The gRPC request/response server API provides an alternative way to interact with a client without the need of extending a Java class.
Binding a gRPC service method
You will use a ServiceMethod
to bind a gRPC service method
ServiceName serviceName = ServiceName.create("examples.grpc", "Greeter");
ServiceMethod<HelloRequest, HelloReply> sayHello = ServiceMethod.server(
serviceName,
"SayHello",
GrpcMessageEncoder.encoder(),
GrpcMessageDecoder.decoder(HelloRequest.newBuilder()));
However, most of the time you will use ServiceMethod
constants generated by the Vert.x gRPC protoc plugin:
ServiceMethod<HelloRequest, HelloReply> sayHello = GreeterGrpcService.SayHello;
Request/response
Each service method is processed by a handler, the handler is bound using a ServiceMethod
.
server.callHandler(GreeterGrpcService.SayHello, request -> {
request.handler(hello -> {
GrpcServerResponse<HelloRequest, HelloReply> response = request.response();
HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + hello.getName()).build();
response.end(reply);
});
});
Streaming request
You can set handlers to process request events
server.callHandler(StreamingGrpcService.Sink, request -> {
request.handler(item -> {
// Process item
});
request.endHandler(v ->{
// No more items
// Send the response
request.response().end(Empty.getDefaultInstance());
});
request.exceptionHandler(err -> {
// Something wrong happened
});
});
Streaming response
A streaming response involves calling write
for each element of the stream
and using end
to end the stream
server.callHandler(StreamingGrpcService.Source, request -> {
GrpcServerResponse<Empty, Item> response = request.response();
request.handler(empty -> {
for (int i = 0;i < 10;i++) {
response.write(Item.newBuilder().setValue("1").build());
}
response.end();
});
});
Bidi request/response
A bidi request/response is simply the combination of a streaming request and a streaming response
server.callHandler(StreamingGrpcService.Pipe, request -> {
request.handler(item -> request.response().write(item));
request.endHandler(v -> request.response().end());
});
gRPC protocols
HTTP/2 protocol
The default protocol served by the Vert.x gRPC server is HTTP/2.
In addition, Vert.x gRPC server also supports gRPC-Web protocol and HTTP/JSON transcoding.
gRPC-Web protocol
The Vert.x gRPC Server supports the gRPC-Web protocol by default.
If your website server and the gRPC server are different, you have to configure the gRPC server for CORS. This can be done with a Vert.x Web router and the CORS handler:
CorsHandler corsHandler = CorsHandler.create()
.addRelativeOrigin("https://www.mycompany.com")
.allowedHeaders(Set.of("keep-alive","user-agent","cache-control","content-type","content-transfer-encoding","x-custom-key","x-user-agent","x-grpc-web","grpc-timeout"))
.exposedHeaders(Set.of("x-custom-key","grpc-status","grpc-message"));
router.route("/com.mycompany.MyService/*").handler(corsHandler);
gRPC Transcoding
The Vert.x gRPC server supports gRPC transcoding that enables mapping between HTTP/JSON requests and gRPC services.
Protocol configuration
By default, a gRPC server accepts all protocols.
To disable a specific protocol support, configure options with removeEnabledProtocol
and then create a server with GrpcServer#server(vertx, options)
.
GrpcServer server = GrpcServer.server(vertx, new GrpcServerOptions()
.removeEnabledProtocol(GrpcProtocol.WEB)
.removeEnabledProtocol(GrpcProtocol.WEB_TEXT)
);
Flow control
Request and response are back pressured Vert.x streams.
You can pause/resume/fetch a request
request.pause();
performAsyncOperation().onComplete(ar -> {
// And then resume
request.resume();
});
You can check the writability of a response and set a drain handler
if (response.writeQueueFull()) {
response.drainHandler(v -> {
// Writable again
});
} else {
response.write(item);
}
Timeout and deadlines
The gRPC server handles timeout and deadlines.
Whenever the service receives a request indicating a timeout, the timeout can be retrieved.
long timeout = request.timeout();
if (timeout > 0L) {
// A timeout has been received
}
By default, the server
-
does not schedule automatically a deadline for a given request
-
does not automatically propagate the deadline to a vertx client
The server can schedule deadlines: when a request carries a timeout, the server schedules locally a timer to cancel the request when the response has not been sent in time.
The server can propagate deadlines: when a request carries a timeout, the server calculate the deadline and associate the current server request with this deadline. Vert.x gRPC client can use this deadline to compute a timeout to be sent and cascade the timeout to another gRPC server.
GrpcServer server = GrpcServer.server(vertx, new GrpcServerOptions()
.setScheduleDeadlineAutomatically(true)
.setDeadlinePropagation(true)
);
JSON wire format
gRPC implicitly assumes the usage of the Protobuf wire format.
The Vert.x gRPC server supports the JSON wire format as well, that is gRPC requests carrying the application/grpc+json
content type.
The com.google.protobuf:protobuf-java-util
library performs the JSON encoding/decoding.
Anemic JSON is also supported with Vert.x JsonObject
ServiceMethod<JsonObject, JsonObject> sayHello = ServiceMethod.server(
ServiceName.create("helloworld", "Greeter"),
"SayHello",
GrpcMessageEncoder.JSON_OBJECT,
GrpcMessageDecoder.JSON_OBJECT
);
server.callHandler(sayHello, request -> {
request.last().onSuccess(helloRequest -> {
request.response().end(new JsonObject().put("message", "Hello " + helloRequest.getString("name")));
});
});
Compression
You can compress response messages by setting the response encoding prior before sending any message
response.encoding("gzip");
// Write items after encoding has been defined
response.write(Item.newBuilder().setValue("item-1").build());
response.write(Item.newBuilder().setValue("item-2").build());
response.write(Item.newBuilder().setValue("item-3").build());
Compression is not supported over the gRPC-Web protocol. |
Decompression
Decompression is done transparently by the server when the client send encoded requests.
Decompression is not supported over the gRPC-Web protocol. |
Message level API
The server provides a message level API to interact directly with protobuf encoded gRPC messages.
the server message level API can be used with the client message level API to write a gRPC reverse proxy |
Such API is useful when you are not interested in the content of the messages, and instead you want to forward them to another service, e.g. you are writing a proxy.
ServiceName greeterServiceName = ServiceName.create("helloworld", "Greeter");
server.callHandler(request -> {
if (request.serviceName().equals(greeterServiceName) && request.methodName().equals("SayHello")) {
request.handler(protoHello -> {
// Handle protobuf encoded hello
performAsyncOperation(protoHello)
.onSuccess(protoReply -> {
// Reply with protobuf encoded reply
request.response().end(protoReply);
}).onFailure(err -> {
request.response()
.status(GrpcStatus.ABORTED)
.end();
});
});
} else {
request.response()
.status(GrpcStatus.NOT_FOUND)
.end();
}
});
You can also set a messageHandler
to handle GrpcMessage
, such messages preserve the
client encoding, which is useful the service you are forwarding to can handle compressed messages directly, in this case
the message does not need to be decompressed and compressed again.
ServiceName greeterServiceName = ServiceName.create("helloworld", "Greeter");
server.callHandler(request -> {
if (request.serviceName().equals(greeterServiceName) && request.methodName().equals("SayHello")) {
request.messageHandler(helloMessage -> {
// Can be identity or gzip
String helloEncoding = helloMessage.encoding();
// Handle hello message
handleGrpcMessage(helloMessage)
.onSuccess(replyMessage -> {
// Reply with reply message
// Can be identity or gzip
String replyEncoding = replyMessage.encoding();
// Send the reply
request.response().endMessage(replyMessage);
}).onFailure(err -> {
request.response()
.status(GrpcStatus.ABORTED)
.end();
});
});
} else {
request.response()
.status(GrpcStatus.NOT_FOUND)
.end();
}
});
The writeMessage
and endMessage
will
handle the message encoding:
-
when the message uses the response encoding, the message is sent as is
-
when the message uses a different encoding, it will be encoded, e.g. compressed or uncompressed
gRPC Reflection service
Support for the gRPC reflection service can be added to your Vert.x gRPC Server.
To use the Reflection service, add the following dependency:
-
Maven (in your
pom.xml
):
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-grpc-reflection</artifactId>
<version>5.0.1-SNAPSHOT</version>
</dependency>
-
Gradle (in your
build.gradle
file):
dependencies {
compile 'io.vertx:vertx-grpc-reflection:5.0.1-SNAPSHOT'
}
You can then deploy the reflection service in your server:
GrpcServer grpcServer = GrpcServer.server(vertx);
// Add reflection service
grpcServer.addService(ReflectionService.v1());
GreeterGrpcService greeterService = new GreeterGrpcService() {
@Override
public Future<HelloReply> sayHello(HelloRequest request) {
return Future.succeededFuture(HelloReply.newBuilder().setMessage("Hello " + request.getName()).build());
}
};
grpcServer.addService(greeterService);
// Start the server
vertx.createHttpServer(options)
.requestHandler(grpcServer)
.listen();
gRPC Health service
The gRPC Health service implements the standard gRPC health checking protocol, which allows clients to check the health status of your services.
By default HealthService
returns all registered services with status SERVING
, if you want to override this, you need to register health check, for the specified service.
The Health service provides a simple way to expose health status of your services via gRPC. It implements two RPCs:
-
Check
: For checking the health status of a service -
List
: For listing services and their health status -
Watch
: For watching the health status of a service over time
To use the Health Service, add the following dependency:
-
Maven (in your
pom.xml
):
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-grpc-health</artifactId>
<version>5.0.1-SNAPSHOT</version>
</dependency>
-
Gradle (in your
build.gradle
file):
dependencies {
compile 'io.vertx:vertx-grpc-health:5.0.1-SNAPSHOT'
}
Here’s how to create and bind a Health service to your gRPC server:
GrpcServer grpcServer = GrpcServer.server(vertx);
// Create a health service instance
HealthService healthService = HealthService.create(vertx);
// Register health checks for your services
healthService.register("my.service.name", () -> Future.succeededFuture(true));
// Add the health service to the gRPC server
grpcServer.addService(healthService);
// Start the server
vertx.createHttpServer(options)
.requestHandler(grpcServer)
.listen();
Clients can then check the health of your services using the standard gRPC health checking protocol.
health check service is in tech preview in Vert.x 5.0 until the API becomes stable. |