Vert.x gRPC Client

The Vert.x gRPC Client provides a gRPC request/response oriented API as well as a generated client approach.

Using Vert.x gRPC Client

To use Vert.x gRPC Client, add the following dependency to the dependencies section of your build descriptor:

  • Maven (in your pom.xml):

<dependency>
  <groupId>io.vertx</groupId>
  <artifactId>vertx-grpc-client</artifactId>
  <version>5.1.0-SNAPSHOT</version>
</dependency>
  • Gradle (in your build.gradle file):

dependencies {
  compile 'io.vertx:vertx-grpc-client:5.1.0-SNAPSHOT'
}

Creating a gRPC client

You can easily create the gRPC client

GrpcClient client = GrpcClient.client(vertx);

Idiomatic client API generation

The Vert.x gRPC protoc plugin generates idiomatic client code:

  • examples/Greeter.java

  • examples/GreeterClient.java

  • examples/GreeterGrpcClient.java

An idiomatic client wraps a GrpcClient and provides a Vert.x idiomatic API to interact with the service

The Greeter interface defines the primary contract of the service:

public interface Greeter {

  Future<examples.grpc.HelloReply> sayHello(examples.grpc.HelloRequest request);

}

The GreeterClient interface extends the Greeter contract and annotates it with Vert.x codegen annotations, providing the option to perform further code generation such as generating an RxJava or a Mutiny client.

@io.vertx.codegen.annotations.VertxGen
public interface GreeterClient extends Greeter {

  @io.vertx.codegen.annotations.GenIgnore(io.vertx.codegen.annotations.GenIgnore.PERMITTED_TYPE)
  Future<examples.grpc.HelloReply> sayHello(examples.grpc.HelloRequest request);
}

The GreeterGrpcClient interface extends GreeterClient and provides an implementation that uses the Vert.x gRPC Client to interact with a service:

GreeterGrpcClient greeterClient = GreeterGrpcClient.create(client, SocketAddress.inetSocketAddress(port, host));

Unary services

Unary services returns a Vert.x Future

Future<HelloReply> response = greeterClient.sayHello(HelloRequest.newBuilder().setName("John").build());

response.onSuccess(result -> System.out.println("Service responded: " + response.result().getMessage()));

response.onFailure(err -> System.out.println("Service failure: " + response.cause().getMessage()));

Streaming requests

Streaming requests use a lambda passed a Vert.x WriteStream of messages sent to the service

Future<Empty> response = streamingClient.sink((stream, err) -> {
  stream.write(Item.newBuilder().setValue("Value 1").build());
  stream.write(Item.newBuilder().setValue("Value 2").build());
  stream.end(Item.newBuilder().setValue("Value 3").build());
});

Alternatively, you can pass a stream of messages to be sent to the service

Future<Empty> response = streamingClient.sink(stream);

Streaming responses

Streaming responses get a Vert.x ReadStream of messages sent by the service

Future<ReadStream<Item>> response = streamingClient.source(Empty.getDefaultInstance());

response.onSuccess(stream -> stream
  .handler(item -> System.out.println("Item " + item.getValue()))
  .exceptionHandler(err -> System.out.println("Stream failed " + err.getMessage()))
  .endHandler(v -> System.out.println("Stream ended")));

response.onFailure(err -> System.out.println("Service failure: " + err.getMessage()));

RxJava client wrapper generation

If you want to generate your client with Vert.x codegen annotations, you can use the option in vertx-grpc-protoc-plugin2 to generate a client service interface annotated with Vert.x codegen annotations. To do this you need to pass --vertx-codegen option to the to the vertx-grpc-protoc-plugin:

<protocPlugin>
  <id>vertx-grpc-protoc-plugin2</id>
  <groupId>io.vertx</groupId>
  <artifactId>vertx-grpc-protoc-plugin2</artifactId>
  <version>${stack.version}</version>
  <mainClass>io.vertx.grpc.plugin.VertxGrpcGenerator</mainClass>
  <args>
    <arg>--grpc-client</arg>
    <arg>--vertx-codegen</arg>
  </args>
</protocPlugin>

The generated client service interface will be annotated with @io.vertx.codegen.annotations.VertxGen.

@io.vertx.codegen.annotations.VertxGen
public interface GreeterClient {
  ...
}

Therefore, a Vert.x codegen processor can process it, as such a Vert.x RxJava generator will generate an RxJava client wrapper with idiomatic RxJava API.

Here is a Maven configuration example:

<plugin>
  <artifactId>maven-compiler-plugin</artifactId>
  <executions>
    <execution>
      <id>default-compile</id>
      <configuration>
        <annotationProcessorPaths>
          <annotationProcessorPath>
            <groupId>io.vertx</groupId>
            <artifactId>vertx-codegen</artifactId>
            <classifier>processor</classifier>
            <version>${vertx.version}</version>
          </annotationProcessorPath>
          <annotationProcessorPath>
            <groupId>io.vertx</groupId>
            <artifactId>vertx-rx-java3-gen</artifactId>
            <version>${vertx.version}</version>
          </annotationProcessorPath>
        </annotationProcessorPaths>
      </configuration>
    </execution>
  </executions>
</plugin>

The generated can be then used

// Use the RxJava version
GreeterClient client = io.grpc.examples.rxjava3.helloworld.GreeterClient.create(grpcClient, SocketAddress.inetSocketAddress(8080, "localhost"));

// Get a Single instead of a Future
Single<HelloReply> reply = client.sayHello(HelloRequest.newBuilder().setName("World").build());

Flowable<HelloRequest> requestStream = Flowable.just("World", "Monde", "Mundo")
      .map(name -> HelloRequest.newBuilder().setName(name).build());

// Use Flowable instead of Vert.x streams
Flowable<String> responseStream = client
  .sayHelloStreaming(stream)
  .map(HelloReply::getMessage);
this requires RxJava version of GrpcClient

Client request/response API

The gRPC request/response client API provides an alternative way to interact with a server without the need of a generated stub.

Addressing a gRPC service method

You will use a ServiceMethod to address a gRPC service method

ServiceName serviceName = ServiceName.create("examples.grpc", "Greeter");
ServiceMethod<HelloReply, HelloRequest> sayHello = ServiceMethod.client(
  serviceName,
  "SayHello",
  GrpcMessageEncoder.encoder(),
  GrpcMessageDecoder.decoder(HelloReply.newBuilder()));

However, most of the time you can use ServiceMethod generated constants by the Vert.x gRPC protoc plugin:

ServiceMethod<HelloReply, HelloRequest> sayHello = GreeterGrpcClient.SayHello;

Request/response

Interacting with a gRPC server involves the creation of a request to the remote gRPC service.

SocketAddress server = SocketAddress.inetSocketAddress(443, "example.com");
Future<GrpcClientRequest<HelloRequest, HelloReply>> fut = client.request(server, GreeterGrpcClient.SayHello);
fut.onSuccess(request -> {
  // The end method calls the service
  request.end(HelloRequest.newBuilder().setName("Bob").build());
});

The response holds the response and the last holds the result

request.response().onSuccess(response -> {
  Future<HelloReply> fut = response.last();
  fut.onSuccess(reply -> {
    System.out.println("Received " + reply.getMessage());
  });
});

Future composition can combine all the previous steps together in a compact fashion

client
  .request(server, GreeterGrpcClient.SayHello).compose(request -> {
    request.end(HelloRequest
      .newBuilder()
      .setName("Bob")
      .build());
    return request.response().compose(response -> response.last());
  }).onSuccess(reply -> {
    System.out.println("Received " + reply.getMessage());
  });

Streaming request

Streaming requests involve calling write for each element of the stream and using end to end the stream

client
  .request(server, StreamingGrpcClient.Sink)
  .onSuccess(request -> {
  for (int i = 0;i < 10;i++) {
    request.write(Item.newBuilder().setValue("1").build());
  }
  request.end();
});

Streaming response

You can set handlers to process response events of a streaming response

client
  .request(server, StreamingGrpcClient.Source)
  .compose(request -> {
    request.end(Empty.getDefaultInstance());
    return request.response();
  })
  .onSuccess(response -> {
    response.handler(item -> {
      // Process item
    });
    response.endHandler(v -> {
      // Done
    });
    response.exceptionHandler(err -> {
      // Something went bad
    });
  });

Bidi request/response

A bidi request/response is simply the combination of a streaming request and a streaming response.

Flow control

Whether you use the request/response API or the idiomatic client API, you interact with stream which are back pressured Vert.x streams.

You can check the writability of a request and set a drain handler

if (request.writeQueueFull()) {
  request.drainHandler(v -> {
    // Writable again
  });
} else {
  request.write(item);
}

You can pause/resume/fetch a response to precisely control the message you read.

response.pause();

performAsyncOperation().onComplete(ar -> {
  // And then resume
  response.resume();
});
you can learn more about Vert.x streams in the Vert.x Core documentation

Timeout and deadlines

The gRPC client handles timeout and deadlines, setting a timeout on a gRPC request instructs the client to send the timeout information to make the server aware that the client desires a response within a defined time.

In addition, the client shall be configured to schedule a deadline: when a timeout is set on a request, the client schedules locally a timer to cancel the request when the response has not been received in time.

GrpcClient client = GrpcClient.client(vertx, new GrpcClientOptions()
  .setTimeout(10)
  .setTimeoutUnit(TimeUnit.SECONDS)
  .setScheduleDeadlineAutomatically(true));

The timeout can also be set on a per-request basis.

Future<GrpcClientRequest<HelloRequest, HelloReply>> fut = client.request(server, GreeterGrpcClient.SayHello);
fut.onSuccess(request -> {

  request
    // Given this request, set a 10 seconds timeout that will be sent to the gRPC service
    .timeout(10, TimeUnit.SECONDS);

  request.end(HelloRequest.newBuilder().setName("Bob").build());
});

Cancellation

You can call cancel to cancel a request

request.cancel();
cancellation sends an HTTP/2 reset frame to the server

Client side load balancing

The gRPC Client can be configured to perform client side load balancing.

DNS based load balancing

DNS based load balancing works with DNS queries resolving a single host to multiple IP addresses (usually A records).

You can set a load balancer to enable DNS-based load balancing

GrpcClient client = GrpcClient
  .builder(vertx)
  .withLoadBalancer(LoadBalancer.ROUND_ROBIN)
  .build();

client
  .request(SocketAddress.inetSocketAddress(port, server), GreeterGrpcClient.SayHello)
  .compose(request -> {
    request.end(HelloRequest
      .newBuilder()
      .setName("Bob")
      .build());
    return request.response().compose(response -> response.last());
  }).onSuccess(reply -> {
    System.out.println("Received " + reply.getMessage());
  });

The usual load balancing strategies are available, you can refer to the Vert.x HTTP client side load balancing documentation to configure them.

Address based load balancing

Address based load balancing relies on the Vert.x address resolver to resolve a single address to multiple host/port socket addresses.

You can set an address resolver to enable load balancing, the Vert.x Service Resolver implements a few address resolver, e.g. like a Kubernetes resolver.

GrpcClient client = GrpcClient
  .builder(vertx)
  .withAddressResolver(KubeResolver.create())
  .withLoadBalancer(LoadBalancer.ROUND_ROBIN)
  .build();

Unlike DNS based load balancing, address based load balancing uses an abstract Address instead of a SocketAddress. The address resolver implementation resolves address to a list of socket addresses.

The Vert.x Servicer Resolver defines a ServiceAddress.

ServiceAddress address = ServiceAddress.of("GreeterService");

client
  .request(address, GreeterGrpcClient.SayHello)
  .compose(request -> {
    request.end(HelloRequest
      .newBuilder()
      .setName("Bob")
      .build());
    return request.response().compose(response -> response.last());
  }).onSuccess(reply -> {
    System.out.println("Received " + reply.getMessage());
  });

You can refer to the Vert.x Service Resolver project documentation for more details.

JSON wire format

gRPC implicitly assumes the usage of the Protobuf wire format.

The Vert.x gRPC client supports the JSON wire format as well.

You can call a JSON service method with the application/grpc+json content-type.

client
  .request(server, GreeterGrpcClient.SayHello).compose(request -> {
    request.format(WireFormat.JSON);
    request.end(HelloRequest
      .newBuilder()
      .setName("Bob")
      .build());
    return request.response().compose(response -> response.last());
  }).onSuccess(reply -> {
    System.out.println("Received " + reply.getMessage());
  });

The client will send application/grpc+json requests.

JSON encoding/decoding is achieved by com.google.protobuf:protobuf-java-util library.

Anemic JSON is also supported with Vert.x JsonObject

ServiceMethod<JsonObject, JsonObject> sayHello = ServiceMethod.client(
  ServiceName.create("helloworld", "Greeter"),
  "SayHello",
  GrpcMessageEncoder.JSON_OBJECT,
  GrpcMessageDecoder.JSON_OBJECT
);
client
  .request(server, sayHello).compose(request -> {
    request.end(new JsonObject().put("name", "Bob"));
    return request.response().compose(response -> response.last());
  }).onSuccess(reply -> {
    System.out.println("Received " + reply.getString("message"));
  });

You can also specify the JSON wire format when creating an idiomatic client

GreeterGrpcClient greeterClient = GreeterGrpcClient.create(client, SocketAddress.inetSocketAddress(port, host), WireFormat.JSON);

Compression

You can compress request messages by setting the request encoding prior before sending any message

request.encoding("gzip");

// Write items after encoding has been defined
request.write(Item.newBuilder().setValue("item-1").build());
request.write(Item.newBuilder().setValue("item-2").build());
request.write(Item.newBuilder().setValue("item-3").build());

Decompression

Decompression is achieved transparently by the client when the server sends encoded responses.

Message level API

The client provides a message level API to interact directly with protobuf encoded gRPC messages.

the client message level API can be combined with the server message level API to write a gRPC intermediary for which messages are opaque such as a reverse proxy.

Such API is useful when you are not interested in the content of the messages, and instead you want to forward them to another service, e.g. you are writing a proxy.

Future<GrpcClientRequest<Buffer, Buffer>> requestFut = client.request(server);

requestFut.onSuccess(request -> {

  // Set the service name and the method to call
  request.serviceName(ServiceName.create("helloworld", "Greeter"));
  request.methodName("SayHello");

  // Send the protobuf request
  request.end(protoHello);

  // Handle the response
  Future<GrpcClientResponse<Buffer, Buffer>> responseFut = request.response();
  responseFut.onSuccess(response -> {
    response.handler(protoReply -> {
      // Handle the protobuf reply
    });
  });
});

You can also set a messageHandler to handle GrpcMessage, such messages preserve the server encoding.

Future<GrpcClientRequest<Buffer, Buffer>> requestFut = client.request(server);

requestFut.onSuccess(request -> {

  // Set the service name and the method to call
  request.serviceName(ServiceName.create("helloworld", "Greeter"));
  request.methodName("SayHello");

  // Send the protobuf request
  request.endMessage(GrpcMessage.message("identity", protoHello));

  // Handle the response
  Future<GrpcClientResponse<Buffer, Buffer>> responseFut = request.response();
  responseFut.onSuccess(response -> {
    response.messageHandler(replyMessage -> {
      System.out.println("Got reply message encoded as " + replyMessage.encoding());
    });
  });
});

The writeMessage and endMessage will handle the message encoding:

  • when the message uses the response encoding, the message is sent as is

  • when the message uses a different encoding, it will be encoded, e.g. compressed or uncompressed