Vert.x gRPC Server

Vert.x gRPC Server is a gRPC server powered by Vert.x HTTP server superseding the integrated Netty based gRPC client.

This server provides a gRPC request/response oriented API as well as a generated stub approach with the Vert.x gRPC Generator.

Using Vert.x gRPC Server

To use Vert.x gRPC Server, add the following dependency to the dependencies section of your build descriptor:

  • Maven (in your pom.xml):

<dependency>
  <groupId>io.vertx</groupId>
  <artifactId>vertx-grpc-server</artifactId>
  <version>5.0.1-SNAPSHOT</version>
</dependency>
  • Gradle (in your build.gradle file):

dependencies {
  compile 'io.vertx:vertx-grpc-server:5.0.1-SNAPSHOT'
}

Creating a gRPC server

A GrpcServer is a Handler<HttpServerRequest> and can be used as an HTTP server request handler.

GrpcServer grpcServer = GrpcServer.server(vertx);

HttpServer server = vertx.createHttpServer(options);

server
  .requestHandler(grpcServer)
  .listen();

A GrpcServer can be used within a Vert.x Web router:

route.consumes("application/grpc").handler(rc -> grpcServer.handle(rc.request()));

Idiomatic server stub generation

The Vert.x gRPC protoc plugin generates idiomatic service stubs:

  • examples/Greeter.java

  • examples/GreeterService.java

  • examples/GreeterGrpcService.java

The Greeter interface defines the primary contract of the service:

public interface Greeter {

  Future<examples.grpc.HelloReply> sayHello(examples.grpc.HelloRequest request);

}

The GreeterService class implements the Greeter contract:

public class GreeterService implements Greeter {

  public Future<examples.grpc.HelloReply> sayHello(examples.grpc.HelloRequest request) {
    // ...
  }

  protected void sayHello(examples.grpc.HelloRequest request, Completable<examples.grpc.HelloReply> response) {
    // ...
  }
}

Usually, you extend this class to provide a service implementation.

The GreeterGrpcService class extends GreeterService and let you turn your service implementation into a gRPC service that can be bound to a Vert.x gRPC server:

GreeterGrpcService service = GreeterGrpcService.create(new GreeterService() {
  @Override
  public Future<HelloReply> sayHello(HelloRequest request) {
    return Future.succeededFuture(HelloReply.newBuilder()
      .setMessage("Hello " + request.getName())
      .build());
  }
});

grpcServer.add(service);

Each service method comes in two flavors, you can override the method you like depending on the style.

Unary methods

Unary methods return a Vert.x Future

GreeterService service = new GreeterService() {
  @Override
  public Future<HelloReply> sayHello(HelloRequest request) {
    return Future.succeededFuture(HelloReply.newBuilder().setMessage("Hello " + request.getName()).build());
  }
};

or process a Vert.x Promise

GreeterService service = new GreeterService() {
  @Override
  public void sayHello(HelloRequest request, Completable<HelloReply> response) {
    response.succeed(HelloReply.newBuilder().setMessage("Hello " + request.getName()).build());
  }
};

In both case you need to bind the stub to an existing GrpcServer

server.addService(service);

Streaming requests

Streaming requests are implemented with a ReadStream:

StreamingGrpcService service = new StreamingGrpcService() {
  @Override
  public void sink(ReadStream<Item> stream, Completable<Empty> response) {
    stream.handler(item -> {
      System.out.println("Process item " + item.getValue());
    });
    // Send response
    stream.endHandler(v -> response.succeed(Empty.getDefaultInstance()));
  }
};
server.addService(service);

Streaming responses

Streaming responses are implemented with Vert.x streams and comes in two flavors.

You can return a Vert.x ReadStream and let the service send it for you:

StreamingService service = new StreamingService() {
  @Override
  public Future<ReadStream<Item>> source(Empty request) {
    return streamOfItems();
  }
};

or you can process a WriteStream:

StreamingService service = new StreamingService() {
  @Override
  public void source(Empty request, WriteStream<Item> response) {
    response.write(Item.newBuilder().setValue("value-1").build());
    response.end(Item.newBuilder().setValue("value-2").build());
  }
};

Server request/response API

The gRPC request/response server API provides an alternative way to interact with a client without the need of extending a Java class.

Binding a gRPC service method

You will use a ServiceMethod to bind a gRPC service method

ServiceName serviceName = ServiceName.create("examples.grpc", "Greeter");
ServiceMethod<HelloRequest, HelloReply> sayHello = ServiceMethod.server(
  serviceName,
  "SayHello",
  GrpcMessageEncoder.encoder(),
  GrpcMessageDecoder.decoder(HelloRequest.newBuilder()));

However, most of the time you will use ServiceMethod constants generated by the Vert.x gRPC protoc plugin:

ServiceMethod<HelloRequest, HelloReply> sayHello = GreeterGrpcService.SayHello;

Request/response

Each service method is processed by a handler, the handler is bound using a ServiceMethod.

server.callHandler(GreeterGrpcService.SayHello, request -> {

  request.handler(hello -> {

    GrpcServerResponse<HelloRequest, HelloReply> response = request.response();

    HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + hello.getName()).build();

    response.end(reply);
  });
});

Streaming request

You can set handlers to process request events

server.callHandler(StreamingGrpcService.Sink, request -> {
  request.handler(item -> {
    // Process item
  });
  request.endHandler(v ->{
    // No more items
    // Send the response
    request.response().end(Empty.getDefaultInstance());
  });
  request.exceptionHandler(err -> {
    // Something wrong happened
  });
});

Streaming response

A streaming response involves calling write for each element of the stream and using end to end the stream

server.callHandler(StreamingGrpcService.Source, request -> {
  GrpcServerResponse<Empty, Item> response = request.response();
  request.handler(empty -> {
    for (int i = 0;i < 10;i++) {
      response.write(Item.newBuilder().setValue("1").build());
    }
    response.end();
  });
});

Bidi request/response

A bidi request/response is simply the combination of a streaming request and a streaming response

server.callHandler(StreamingGrpcService.Pipe, request -> {

  request.handler(item -> request.response().write(item));
  request.endHandler(v -> request.response().end());
});

gRPC protocols

HTTP/2 protocol

The default protocol served by the Vert.x gRPC server is HTTP/2.

In addition, Vert.x gRPC server also supports gRPC-Web protocol and HTTP/JSON transcoding.

gRPC-Web protocol

The Vert.x gRPC Server supports the gRPC-Web protocol by default.

If your website server and the gRPC server are different, you have to configure the gRPC server for CORS. This can be done with a Vert.x Web router and the CORS handler:

CorsHandler corsHandler = CorsHandler.create()
  .addRelativeOrigin("https://www.mycompany.com")
  .allowedHeaders(Set.of("keep-alive","user-agent","cache-control","content-type","content-transfer-encoding","x-custom-key","x-user-agent","x-grpc-web","grpc-timeout"))
  .exposedHeaders(Set.of("x-custom-key","grpc-status","grpc-message"));
router.route("/com.mycompany.MyService/*").handler(corsHandler);

gRPC Transcoding

The Vert.x gRPC server supports gRPC transcoding that enables mapping between HTTP/JSON requests and gRPC services.

Protocol configuration

By default, a gRPC server accepts all protocols.

To disable a specific protocol support, configure options with removeEnabledProtocol and then create a server with GrpcServer#server(vertx, options).

Removing gRPC-Web support
GrpcServer server = GrpcServer.server(vertx, new GrpcServerOptions()
  .removeEnabledProtocol(GrpcProtocol.WEB)
  .removeEnabledProtocol(GrpcProtocol.WEB_TEXT)
);

Flow control

Request and response are back pressured Vert.x streams.

You can pause/resume/fetch a request

request.pause();

performAsyncOperation().onComplete(ar -> {
  // And then resume
  request.resume();
});

You can check the writability of a response and set a drain handler

if (response.writeQueueFull()) {
  response.drainHandler(v -> {
    // Writable again
  });
} else {
  response.write(item);
}

Timeout and deadlines

The gRPC server handles timeout and deadlines.

Whenever the service receives a request indicating a timeout, the timeout can be retrieved.

long timeout = request.timeout();

if (timeout > 0L) {
  // A timeout has been received
}

By default, the server

  • does not schedule automatically a deadline for a given request

  • does not automatically propagate the deadline to a vertx client

The server can schedule deadlines: when a request carries a timeout, the server schedules locally a timer to cancel the request when the response has not been sent in time.

The server can propagate deadlines: when a request carries a timeout, the server calculate the deadline and associate the current server request with this deadline. Vert.x gRPC client can use this deadline to compute a timeout to be sent and cascade the timeout to another gRPC server.

GrpcServer server = GrpcServer.server(vertx, new GrpcServerOptions()
  .setScheduleDeadlineAutomatically(true)
  .setDeadlinePropagation(true)
);

JSON wire format

gRPC implicitly assumes the usage of the Protobuf wire format.

The Vert.x gRPC server supports the JSON wire format as well, that is gRPC requests carrying the application/grpc+json content type.

The com.google.protobuf:protobuf-java-util library performs the JSON encoding/decoding.

Anemic JSON is also supported with Vert.x JsonObject

ServiceMethod<JsonObject, JsonObject> sayHello = ServiceMethod.server(
  ServiceName.create("helloworld", "Greeter"),
  "SayHello",
  GrpcMessageEncoder.JSON_OBJECT,
  GrpcMessageDecoder.JSON_OBJECT
);

server.callHandler(sayHello, request -> {
  request.last().onSuccess(helloRequest -> {
    request.response().end(new JsonObject().put("message", "Hello " + helloRequest.getString("name")));
  });
});

Compression

You can compress response messages by setting the response encoding prior before sending any message

response.encoding("gzip");

// Write items after encoding has been defined
response.write(Item.newBuilder().setValue("item-1").build());
response.write(Item.newBuilder().setValue("item-2").build());
response.write(Item.newBuilder().setValue("item-3").build());
Compression is not supported over the gRPC-Web protocol.

Decompression

Decompression is done transparently by the server when the client send encoded requests.

Decompression is not supported over the gRPC-Web protocol.

Message level API

The server provides a message level API to interact directly with protobuf encoded gRPC messages.

the server message level API can be used with the client message level API to write a gRPC reverse proxy

Such API is useful when you are not interested in the content of the messages, and instead you want to forward them to another service, e.g. you are writing a proxy.

ServiceName greeterServiceName = ServiceName.create("helloworld", "Greeter");

server.callHandler(request -> {

  if (request.serviceName().equals(greeterServiceName) && request.methodName().equals("SayHello")) {

    request.handler(protoHello -> {
      // Handle protobuf encoded hello
      performAsyncOperation(protoHello)
        .onSuccess(protoReply -> {
          // Reply with protobuf encoded reply
          request.response().end(protoReply);
        }).onFailure(err -> {
          request.response()
            .status(GrpcStatus.ABORTED)
            .end();
        });
    });
  } else {
    request.response()
      .status(GrpcStatus.NOT_FOUND)
      .end();
  }
});

You can also set a messageHandler to handle GrpcMessage, such messages preserve the client encoding, which is useful the service you are forwarding to can handle compressed messages directly, in this case the message does not need to be decompressed and compressed again.

ServiceName greeterServiceName = ServiceName.create("helloworld", "Greeter");

server.callHandler(request -> {

  if (request.serviceName().equals(greeterServiceName) && request.methodName().equals("SayHello")) {

    request.messageHandler(helloMessage -> {

      // Can be identity or gzip
      String helloEncoding = helloMessage.encoding();

      // Handle hello message
      handleGrpcMessage(helloMessage)
        .onSuccess(replyMessage -> {
          // Reply with reply message

          // Can be identity or gzip
          String replyEncoding = replyMessage.encoding();

          // Send the reply
          request.response().endMessage(replyMessage);
        }).onFailure(err -> {
          request.response()
            .status(GrpcStatus.ABORTED)
            .end();
        });
    });
  } else {
    request.response()
      .status(GrpcStatus.NOT_FOUND)
      .end();
  }
});

The writeMessage and endMessage will handle the message encoding:

  • when the message uses the response encoding, the message is sent as is

  • when the message uses a different encoding, it will be encoded, e.g. compressed or uncompressed

gRPC Reflection service

Support for the gRPC reflection service can be added to your Vert.x gRPC Server.

To use the Reflection service, add the following dependency:

  • Maven (in your pom.xml):

<dependency>
  <groupId>io.vertx</groupId>
  <artifactId>vertx-grpc-reflection</artifactId>
  <version>5.0.1-SNAPSHOT</version>
</dependency>
  • Gradle (in your build.gradle file):

dependencies {
  compile 'io.vertx:vertx-grpc-reflection:5.0.1-SNAPSHOT'
}

You can then deploy the reflection service in your server:

GrpcServer grpcServer = GrpcServer.server(vertx);

// Add reflection service
grpcServer.addService(ReflectionService.v1());

GreeterGrpcService greeterService = new GreeterGrpcService() {
  @Override
  public Future<HelloReply> sayHello(HelloRequest request) {
    return Future.succeededFuture(HelloReply.newBuilder().setMessage("Hello " + request.getName()).build());
  }
};

grpcServer.addService(greeterService);

// Start the server
vertx.createHttpServer(options)
  .requestHandler(grpcServer)
  .listen();

gRPC Health service

The gRPC Health service implements the standard gRPC health checking protocol, which allows clients to check the health status of your services.

By default HealthService returns all registered services with status SERVING, if you want to override this, you need to register health check, for the specified service.

The Health service provides a simple way to expose health status of your services via gRPC. It implements two RPCs:

  • Check: For checking the health status of a service

  • List: For listing services and their health status

  • Watch: For watching the health status of a service over time

To use the Health Service, add the following dependency:

  • Maven (in your pom.xml):

<dependency>
  <groupId>io.vertx</groupId>
  <artifactId>vertx-grpc-health</artifactId>
  <version>5.0.1-SNAPSHOT</version>
</dependency>
  • Gradle (in your build.gradle file):

dependencies {
  compile 'io.vertx:vertx-grpc-health:5.0.1-SNAPSHOT'
}

Here’s how to create and bind a Health service to your gRPC server:

GrpcServer grpcServer = GrpcServer.server(vertx);

// Create a health service instance
HealthService healthService = HealthService.create(vertx);

// Register health checks for your services
healthService.register("my.service.name", () -> Future.succeededFuture(true));

// Add the health service to the gRPC server
grpcServer.addService(healthService);

// Start the server
vertx.createHttpServer(options)
  .requestHandler(grpcServer)
  .listen();

Clients can then check the health of your services using the standard gRPC health checking protocol.

health check service is in tech preview in Vert.x 5.0 until the API becomes stable.