CLOVER🍀

That was when it all began.

はじめてのgRPC(Java)

これは、なにをしたくて書いたもの?

  • gRPC-Webも出てきたということで、そろそろ1度gRPCに触れておこうと
  • とりあえず、JavaでgRPCを使ってみよう

gRPC-Webが正式リリース。WebブラウザからgRPCを直接呼び出し可能に - Publickey

gRPC-WEB がもたらす我々の生活への変化 - tokuhirom's blog

gRPC?

そもそも、gRPCとは?ということで、このあたりを参考に。

grpc / grpc.io

grpc / Guides

grpc / gRPC Concepts

gRPCって何? - Qiita

gRPCのオフィシャルサイトのトップページを見ると、次のようなことがポイントとして書かれていますね。

  • シンプルなサービス定義
  • 多くの言語、プラットフォームで動作する
  • 素早く始めて、スケールさせることができる
  • 双方向のストリーミングおよび認証ができる

gRPCは、サービス定義にはProtocol Buffers 3を使い、HTTP/2をベースに通信を行います。

Protocol Buffers 3を使って定義したIDLからコードの雛形を生成して、RPCサービスを開発していく感じみたいですね。
通信にはHTTP/2を使い、Protocol Buffersでデータをシリアライズして送受信する、と。

JavaでgRPCを扱う

では、JavaからgRPCを扱う方法を眺めてみましょう。

オフィシャルのドキュメントでは、こちら。

gRPC Basics - Java

使い方はサンプルは、GitHub上にもあります。

https://github.com/grpc/grpc-java/blob/v1.16.1/README.md

https://github.com/grpc/grpc-java/tree/v1.16.1/examples

日本語情報。

JavaでgRPC導入のためのポイント - Qiita

このあたりを参考に、JavaでgRPCを使ってみます。

お題

今回は、ツイート的なものをお題にしてみましょう。

  • クライアントから名前とツイートする内容、ツイートした時刻を含めてリクエストすると、サーバー側がその内容にidを追加して返す
  • 上記の内容を双方向のStreamingにして、ツイートするとサーバーからこれまでにツイートされた内容が非同期に返ってくる

環境

今回の環境は、こちら。

$ java -version
openjdk version "1.8.0_181"
OpenJDK Runtime Environment (build 1.8.0_181-8u181-b13-1ubuntu0.18.04.1-b13)
OpenJDK 64-Bit Server VM (build 25.181-b13, mixed mode)


$ mvn -version
Apache Maven 3.6.0 (97c98ec64a1fdfee7767ce5ffb20918da4f719f3; 2018-10-25T03:41:47+09:00)
Maven home: $HOME/.sdkman/candidates/maven/current
Java version: 1.8.0_181, vendor: Oracle Corporation, runtime: /usr/lib/jvm/java-8-openjdk-amd64/jre
Default locale: ja_JP, platform encoding: UTF-8
OS name: "linux", version: "4.15.0-38-generic", arch: "amd64", family: "unix"

準備

Maven依存関係は、こちら。

        <dependency>
            <groupId>io.grpc</groupId>
            <artifactId>grpc-netty-shaded</artifactId>
            <version>1.16.1</version>
        </dependency>
        <dependency>
            <groupId>io.grpc</groupId>
            <artifactId>grpc-protobuf</artifactId>
            <version>1.16.1</version>
        </dependency>
        <dependency>
            <groupId>io.grpc</groupId>
            <artifactId>grpc-stub</artifactId>
            <version>1.16.1</version>
        </dependency>

どう見ても、Nettyを使いそうな感じですね。

Download

あと、Maven Pluginも追加します。

    <build>
        <extensions>
            <extension>
                <groupId>kr.motd.maven</groupId>
                <artifactId>os-maven-plugin</artifactId>
                <version>1.5.0.Final</version>
            </extension>
        </extensions>
        <plugins>
            <plugin>
                <groupId>org.xolstice.maven.plugins</groupId>
                <artifactId>protobuf-maven-plugin</artifactId>
                <version>0.5.1</version>
                <configuration>
                    <protocArtifact>com.google.protobuf:protoc:3.5.1-1:exe:${os.detected.classifier}</protocArtifact>
                    <pluginId>grpc-java</pluginId>
                    <pluginArtifact>io.grpc:protoc-gen-grpc-java:1.16.1:exe:${os.detected.classifier}</pluginArtifact>
                </configuration>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>compile-custom</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

雰囲気、自動生成で使いそうな感じの…。

Generated Code

このあたりは、GitHubのREADMEを参照してください。

https://github.com/grpc/grpc-java/blob/v1.16.1/README.md

では、お題に沿って順番に書いていきましょう。

簡単なRPCを書く

最初は、ツイートをするとidを付けてレスポンスが戻ってくる、簡単なRPCを書きます。

まずは、IDLを書いてみましょう。
src/main/proto/tweet.proto

syntax = "proto3";

option java_multiple_files = true;
option java_package = "org.littlewings.grpc.simpletweet";
option java_outer_classname = "SimpleTweetService";
option objc_class_prefix = "SPT";

package simpletweet;

service SimpleTweeter {
    rpc Tweet (TweetRequest) returns (TweetResponse) {}
}

message TweetRequest {
    string name = 1;
    string text = 2;
    int64 time = 3;
}

message TweetResponse {
    int64 id = 1;
    string name = 2;
    string text = 3;
    int64 time = 4;
}

IDLの定義は、こちらを参考に。

gRPC Basics - Java

https://github.com/grpc/grpc-java/tree/v1.16.1/examples/src/main/proto

また、IDL自体はProtocol Buffers 3を使うので、構文はProtocol Buffers 3のドキュメントを参照します。

Language Guide (proto3)  |  Protocol Buffers  |  Google Developers

順を追って書いていくと、まずsyntaxでProtocol Buffers 3であることを表します。これがないと、Protocol Buffers 2として
解釈されるそうな。

syntax = "proto3";

java_multiple_files」は各ファイルに分割して出力するかどうか、「java_package」は自動生成されるコードの出力パッケージ、
java_outer_classname」は生成したい最も外側(?)のクラス名を指定します(指定すると、定義が指定したクラスに
ちょっと集約されます)、「objc_class_prefix」はObjective-C向けの設定です(ならJavaのサンプルにはなくても…)。

option java_multiple_files = true;
option java_package = "org.littlewings.grpc.simpletweet";
option java_outer_classname = "SimpleTweetService";
option objc_class_prefix = "SPT";

RPCの定義はこちらなのですが、

service SimpleTweeter {
    rpc Tweet (TweetRequest) returns (TweetResponse) {}
}

Protocol Buffers 3のドキュメントにも、しっかりと「Defining Services」として書かれていますね。

Defining Services

コンパイルすると、ソースコードが生成されます。

$ mvn compile
[INFO] Scanning for projects...
[INFO] ------------------------------------------------------------------------
[INFO] Detecting the operating system and CPU architecture
[INFO] ------------------------------------------------------------------------
[INFO] os.detected.name: linux
[INFO] os.detected.arch: x86_64
[INFO] os.detected.version: 4.15
[INFO] os.detected.version.major: 4
[INFO] os.detected.version.minor: 15
[INFO] os.detected.release: ubuntu
[INFO] os.detected.release.version: 18.04
[INFO] os.detected.release.like.ubuntu: true
[INFO] os.detected.release.like.debian: true
[INFO] os.detected.classifier: linux-x86_64
[INFO] 
[INFO] ------------------< org.littlewings:hello-grpc-java >-------------------
[INFO] Building hello-grpc-java 0.0.1-SNAPSHOT
[INFO] --------------------------------[ jar ]---------------------------------
[INFO] 
[INFO] --- protobuf-maven-plugin:0.5.1:compile (default) @ hello-grpc-java ---
[INFO] Compiling 2 proto file(s) to /path/to/hello-grpc-java/target/generated-sources/protobuf/java
[INFO] 
[INFO] --- protobuf-maven-plugin:0.5.1:compile-custom (default) @ hello-grpc-java ---
[INFO] Compiling 2 proto file(s) to /path/to/hello-grpc-java/target/generated-sources/protobuf/grpc-java

ソースコードは、「target/generated-sources/protobuf/grpc-java」と「target/generated-sources/protobuf/java」にそれぞれ
生成されます。

$ find target/generated-sources/protobuf -type f
target/generated-sources/protobuf/grpc-java/org/littlewings/grpc/simpletweet/SimpleTweeterGrpc.java
target/generated-sources/protobuf/java/org/littlewings/grpc/simpletweet/TweetRequest.java
target/generated-sources/protobuf/java/org/littlewings/grpc/simpletweet/TweetResponseOrBuilder.java
target/generated-sources/protobuf/java/org/littlewings/grpc/simpletweet/TweetRequestOrBuilder.java
target/generated-sources/protobuf/java/org/littlewings/grpc/simpletweet/TweetResponse.java
target/generated-sources/protobuf/java/org/littlewings/grpc/simpletweet/Tweet.java

サーバー側を書いてみます。
src/main/java/org/littlewings/grpc/simple/SimpleServer.java

package org.littlewings.grpc.simple;

import java.io.IOException;
import java.util.logging.Logger;

import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import org.littlewings.grpc.simpletweet.SimpleTweeterGrpc;
import org.littlewings.grpc.simpletweet.TweetRequest;
import org.littlewings.grpc.simpletweet.TweetResponse;

public class SimpleServer {
    Logger logger = Logger.getLogger(getClass().getName());

    Server server;

    public static void main(String... args) throws IOException, InterruptedException {
        SimpleServer simpleServer = new SimpleServer();
        simpleServer.start();

        //simpleServer.blockUntilShutdown();

        System.console().readLine("> Enter stop.");

        simpleServer.stop();
    }

    public void start() throws IOException {
        server =
                ServerBuilder
                        .forPort(8080)
                        .addService(new SimpleTweetServiceImpl())
                        .build()
                        .start();

        logger.info("start gRPC server.");
    }

    public void stop() {
        if (server != null) {
            server.shutdown();
            logger.info("shutdown gRPC server.");
        }
    }

    public void blockUntilShutdown() throws InterruptedException {
        if (server != null) {
            server.awaitTermination();
        }
    }

    public static class SimpleTweetServiceImpl extends SimpleTweeterGrpc.SimpleTweeterImplBase {
        Logger logger = Logger.getLogger(getClass().getName());

        @Override
        public void tweet(TweetRequest request,
                          StreamObserver<TweetResponse> responseObserver) {
            logger.info(String.format("request: name = %s, text = %s, time = %d", request.getName(), request.getText(), request.getTime()));

            TweetResponse tweetResponse =
                    TweetResponse
                            .newBuilder()
                            .setId(System.currentTimeMillis())
                            .setName(request.getName())
                            .setText(request.getText())
                            .setTime(request.getTime())
                            .build();

            responseObserver.onNext(tweetResponse);
            responseObserver.onCompleted();
        }
    }
}

自動生成されたベースクラスを継承したクラスを作成し、ここにリクエストを受け取り、レスポンスを返す処理を書きます。

    public static class SimpleTweetServiceImpl extends SimpleTweeterGrpc.SimpleTweeterImplBase {
        Logger logger = Logger.getLogger(getClass().getName());

        @Override
        public void tweet(TweetRequest request,
                          StreamObserver<TweetResponse> responseObserver) {
            logger.info(String.format("request: name = %s, text = %s, time = %d", request.getName(), request.getText(), request.getTime()));

            TweetResponse tweetResponse =
                    TweetResponse
                            .newBuilder()
                            .setId(System.currentTimeMillis())
                            .setName(request.getName())
                            .setText(request.getText())
                            .setTime(request.getTime())
                            .build();

            responseObserver.onNext(tweetResponse);
            responseObserver.onCompleted();
        }
    }

id以外はリクエストの内容をそのまま設定し、idは現在時刻(ミリ秒)としました。

このベースクラスは抽象クラスで、かつこのオーバーライド対象のメソッドはベースクラス側で例外を投げるように実装されて
いるので、オーバーライド必須です。

というかこのメソッドは、IDLで定義したやつですね。こちら。

service SimpleTweeter {
    rpc Tweet (TweetRequest) returns (TweetResponse) {}
}

レスポンスを返す処理はStreamObserver#onNextで書き、完了はStreamObserver#onCompletedで示します。

            responseObserver.onNext(tweetResponse);
            responseObserver.onCompleted();

この実装クラスをServerに設定します。

    public void start() throws IOException {
        server =
                ServerBuilder
                        .forPort(8080)
                        .addService(new SimpleTweetServiceImpl())
                        .build()
                        .start();

        logger.info("start gRPC server.");
    }

なお、このサーバーはEnterを打つと停止します。

    public static void main(String... args) throws IOException, InterruptedException {
        SimpleServer simpleServer = new SimpleServer();
        simpleServer.start();

        //simpleServer.blockUntilShutdown();

        System.console().readLine("> Enter stop.");

        simpleServer.stop();
    }

停止まわりについては、シャットダウンを行うServer#shutdownと、シャットダウンが行われるまで待機する
Server#awaitTerminationがあります。

    public void stop() {
        if (server != null) {
            server.shutdown();
            logger.info("shutdown gRPC server.");
        }
    }

    public void blockUntilShutdown() throws InterruptedException {
        if (server != null) {
            server.awaitTermination();
        }
    }

awaitTerminationの場合は、どこか別のタイミングでシャットダウンの指示が行われることになりますね。

続いて、クライアント側。 src/main/java/org/littlewings/grpc/simple/SimpleClient.java

package org.littlewings.grpc.simple;

import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import org.littlewings.grpc.simpletweet.SimpleTweeterGrpc;
import org.littlewings.grpc.simpletweet.TweetRequest;
import org.littlewings.grpc.simpletweet.TweetResponse;

public class SimpleClient {
    Logger logger = Logger.getLogger(getClass().getName());

    ManagedChannel channel;
    SimpleTweeterGrpc.SimpleTweeterBlockingStub blockingStub;

    public static void main(String... args) throws InterruptedException {
        SimpleClient simpleClient = SimpleClient.create("localhost", 8080);
        simpleClient.tweet("磯野 カツオ", "こんにちは!!gRPC!!");
        simpleClient.tweet("フグ田 タラオ", "Hello World!!");
        simpleClient.shutdown();
    }

    public static SimpleClient create(String host, int port) {
        SimpleClient simpleClient = new SimpleClient();
        simpleClient.channel =
                ManagedChannelBuilder
                        .forAddress(host, port)
                        .usePlaintext()
                        .build();
        simpleClient.blockingStub =
                SimpleTweeterGrpc.newBlockingStub(simpleClient.channel);

        return simpleClient;
    }

    public void tweet(String name, String text) {
        TweetRequest tweetRequest =
                TweetRequest
                        .newBuilder()
                        .setName(name)
                        .setText(text)
                        .setTime(System.currentTimeMillis())
                        .build();

        try {
            TweetResponse tweetResponse = blockingStub.tweet(tweetRequest);

            logger.info(String.format("response: id = %d, name = %s, text = %s, time = %d", tweetResponse.getId(), tweetResponse.getName(), tweetResponse.getText(), tweetResponse.getTime()));
        } catch (StatusRuntimeException e) {
            Status status = Status.fromThrowable(e);
            logger.info("error: statuscode = " + status.getCode() + ", description = " + status.getDescription());
            e.printStackTrace();
        }
    }

    public void shutdown() throws InterruptedException {
        channel.shutdown().awaitTermination(5L, TimeUnit.SECONDS);
    }
}

クライアントでは、ManagedChannelとStubが必要なようです。

    ManagedChannel channel;
    SimpleTweeterGrpc.SimpleTweeterBlockingStub blockingStub;

ManagedChannelはBuilderより作成し、StubにはブロッキングとFuture、StreamObserverを使うものがありますが、ここでは
ブロッキングなものを使います。

    public static SimpleClient create(String host, int port) {
        SimpleClient simpleClient = new SimpleClient();
        simpleClient.channel =
                ManagedChannelBuilder
                        .forAddress(host, port)
                        .usePlaintext()
                        .build();
        simpleClient.blockingStub =
                SimpleTweeterGrpc.newBlockingStub(simpleClient.channel);

        return simpleClient;
    }

ところで、usePlaintextとあるのですが、その他としてはALTSを使うような?

Application Layer Transport Security  |  ドキュメント  |  Google Cloud

リクエストは、ブロッキングなStubを通して行います。

    public void tweet(String name, String text) {
        TweetRequest tweetRequest =
                TweetRequest
                        .newBuilder()
                        .setName(name)
                        .setText(text)
                        .setTime(System.currentTimeMillis())
                        .build();

        try {
            TweetResponse tweetResponse = blockingStub.tweet(tweetRequest);

            logger.info(String.format("response: id = %d, name = %s, text = %s, time = %d", tweetResponse.getId(), tweetResponse.getName(), tweetResponse.getText(), tweetResponse.getTime()));
        } catch (StatusRuntimeException e) {
            Status status = Status.fromThrowable(e);
            logger.info("error: statuscode = " + status.getCode() + ", description = " + status.getDescription());
            e.printStackTrace();
        }
    }

予想に難くないですが、Future版のStubを使うとFutureが返ってきます。

では、実行してみましょう。

サーバーを起動。

$ mvn compile exec:java -Dexec.mainClass=org.littlewings.grpc.simple.SimpleServer

クライアントを実行。

$ mvn exec:java -Dexec.mainClass=org.littlewings.grpc.simple.SimpleClient

...

[INFO] --- exec-maven-plugin:1.6.0:java (default-cli) @ hello-grpc-java ---
11 04, 2018 10:24:16 午後 org.littlewings.grpc.simple.SimpleClient tweet
情報: response: id = 1541337856481, name = 磯野 カツオ, text = こんにちは!!gRPC!!, time = 1541337855966
11 04, 2018 10:24:16 午後 org.littlewings.grpc.simple.SimpleClient tweet
情報: response: id = 1541337856516, name = フグ田 タラオ, text = Hello World!!, time = 1541337856512

サーバー側では、こんなログが出力されます。

11 04, 2018 10:24:16 午後 org.littlewings.grpc.simple.SimpleServer$SimpleTweetServiceImpl tweet
情報: request: name = 磯野 カツオ, text = こんにちは!!gRPC!!, time = 1541337855966
11 04, 2018 10:24:16 午後 org.littlewings.grpc.simple.SimpleServer$SimpleTweetServiceImpl tweet
情報: request: name = フグ田 タラオ, text = Hello World!!, time = 1541337856512

OKそうですね。

なお、エラーの扱いですが、サーバー側の場合はStreamObserver#onErrorを使って例外を設定します。

responseObserver.onError(new RuntimeException("Oops"));  // エラーにする場合は、onErrorを使う

クライアント側は、try-cacheで。

        try {
            TweetResponse tweetResponse = blockingStub.tweet(tweetRequest);

            logger.info(String.format("response: id = %d, name = %s, text = %s, time = %d", tweetResponse.getId(), tweetResponse.getName(), tweetResponse.getText(), tweetResponse.getTime()));
        } catch (StatusRuntimeException e) {
            Status status = Status.fromThrowable(e);
            logger.info("error: statuscode = " + status.getCode() + ", description = " + status.getDescription());
            e.printStackTrace();
        }

ところで、レスポンスを送る時にStreamObserver#onNextと書きますが、今回のIDL定義でonNextを複数回呼び出すと
エラーになります。

            responseObserver.onNext(tweetResponse);
            responseObserver.onNext(tweetResponse);
            responseObserver.onCompleted();
            // Cancelling the stream with status Status{code=INTERNAL, description=Too many responses, cause=null}
            // Client側には、 Received Rst Stream, io.grpc.StatusRuntimeException: CANCELLED: HTTP/2 error code: CANCEL

こういうのに対応するには、Streamingを使います。

Streaming(双方向)なRPCを書く

続いては、先ほどのRPCをStreamingにしたものを書いてみます。

先ほどのIDLをベースに、パッケージ名と一部定義を書き換えたファイルを用意。
src/main/proto/tweet-streaming.proto

syntax = "proto3";

option java_multiple_files = true;
option java_package = "org.littlewings.grpc.streamingtweet";
option java_outer_classname = "StreamingTweetService";
option objc_class_prefix = "SMT";

package streamingtweet;

service StreamingTweeter {
    rpc TweetStreaming (stream TweetRequest) returns (stream TweetResponse) {}
}

message TweetRequest {
    string name = 1;
    string text = 2;
    int64 time = 3;
}

message TweetResponse {
    int64 id = 1;
    string name = 2;
    string text = 3;
    int64 time = 4;
}

ポイントは、リクエストとレスポンスの型の宣言のところに「stream」が付いていることです。

service StreamingTweeter {
    rpc TweetStreaming (stream TweetRequest) returns (stream TweetResponse) {}
}

今回は双方向ということで、両方に「stream」を付けました。

ドキュメントを見るとわかりますが、片方だけにつけることも可能です。

gRPC Basics - Java

レスポンスにだけstreamが付いているタイプをserver-side streaming RPC、

rpc ListFeatures(Rectangle) returns (stream Feature) {}

リクエストにだけstreamが付いているタイプをclient-side streaming RPCというみたいです。

rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}

では、IDLをもとにコードを自動生成して

$ mvn compile

サーバー側から作成していきます。
src/main/java/org/littlewings/grpc/streaming/StreamingServer.java

package org.littlewings.grpc.streaming;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;

import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import org.littlewings.grpc.streamingtweet.StreamingTweeterGrpc;
import org.littlewings.grpc.streamingtweet.TweetRequest;
import org.littlewings.grpc.streamingtweet.TweetResponse;

public class StreamingServer {
    Logger logger = Logger.getLogger(getClass().getName());

    Server server;

    public static void main(String... args) throws IOException, InterruptedException {
        StreamingServer streamingServer = new StreamingServer();
        streamingServer.start();

        // streamingServer.blockUntilShutdown();

        System.console().readLine("> Enter stop.");

        streamingServer.stop();
    }

    public void start() throws IOException {
        server =
                ServerBuilder
                        .forPort(8080)
                        .addService(new StreamingTweetServiceImpl())
                        .build()
                        .start();

        logger.info("start gRPC server.");
    }

    public void stop() {
        if (server != null) {
            server.shutdown();
            logger.info("shutdown gRPC server.");
        }
    }

    public void blockUntilShutdown() throws InterruptedException {
        if (server != null) {
            server.awaitTermination();
        }
    }

    public static class StreamingTweetServiceImpl extends StreamingTweeterGrpc.StreamingTweeterImplBase {
        Logger logger = Logger.getLogger(getClass().getName());

        List<TweetResponse> tweets = new ArrayList<>();

        @Override
        public StreamObserver<TweetRequest> tweetStreaming(StreamObserver<TweetResponse> responseObserver) {
            return new StreamObserver<TweetRequest>() {
                @Override
                public void onNext(TweetRequest tweetRequest) {
                    logger.info(String.format("request: name = %s, text = %s, time = %d", tweetRequest.getName(), tweetRequest.getText(), tweetRequest.getTime()));

                    TweetResponse tweetResponse =
                            TweetResponse
                                    .newBuilder()
                                    .setId(System.currentTimeMillis())
                                    .setName(tweetRequest.getName())
                                    .setText(tweetRequest.getText())
                                    .setTime(tweetRequest.getTime())
                                    .build();

                    List<TweetResponse> snapshot;
                    synchronized (this) {
                        tweets.add(tweetResponse);
                        snapshot = new ArrayList<>(tweets);
                    }

                    snapshot.forEach(response -> responseObserver.onNext(response));
                    logger.info("on request");
                }

                @Override
                public void onError(Throwable t) {
                    logger.log(Level.WARNING, "error", t);
                }

                @Override
                public void onCompleted() {
                    logger.info("request completed");
                    responseObserver.onCompleted();
                }
            };
        }
    }
}

ここでも、自動生成された抽象クラスを継承してRPC定義されたメソッドを実装します。

    public static class StreamingTweetServiceImpl extends StreamingTweeterGrpc.StreamingTweeterImplBase {

ポイントは、RPCで定義されたメソッドを実装するところですね。リクエストに対するStreamObververを返すことになります。

        @Override
        public StreamObserver<TweetRequest> tweetStreaming(StreamObserver<TweetResponse> responseObserver) {
            return new StreamObserver<TweetRequest>() {
                @Override
                public void onNext(TweetRequest tweetRequest) {
                    logger.info(String.format("request: name = %s, text = %s, time = %d", tweetRequest.getName(), tweetRequest.getText(), tweetRequest.getTime()));

                    TweetResponse tweetResponse =
                            TweetResponse
                                    .newBuilder()
                                    .setId(System.currentTimeMillis())
                                    .setName(tweetRequest.getName())
                                    .setText(tweetRequest.getText())
                                    .setTime(tweetRequest.getTime())
                                    .build();

                    List<TweetResponse> snapshot;
                    synchronized (this) {
                        tweets.add(tweetResponse);
                        snapshot = new ArrayList<>(tweets);
                    }

                    snapshot.forEach(response -> responseObserver.onNext(response));
                }

                @Override
                public void onError(Throwable t) {
                    logger.log(Level.WARNING, "error", t);
                }

                @Override
                public void onCompleted() {
                    logger.info("request completed");
                    responseObserver.onCompleted();
                }
            };
        }

StreamObserver#onNextで、リクエストひとつひとつに対する処理を書きます。

                @Override
                public void onNext(TweetRequest tweetRequest) {
                    logger.info(String.format("request: name = %s, text = %s, time = %d", tweetRequest.getName(), tweetRequest.getText(), tweetRequest.getTime()));

                    TweetResponse tweetResponse =
                            TweetResponse
                                    .newBuilder()
                                    .setId(System.currentTimeMillis())
                                    .setName(tweetRequest.getName())
                                    .setText(tweetRequest.getText())
                                    .setTime(tweetRequest.getTime())
                                    .build();

                    List<TweetResponse> snapshot;
                    synchronized (this) {
                        tweets.add(tweetResponse);
                        snapshot = new ArrayList<>(tweets);
                    }

                    snapshot.forEach(response -> responseObserver.onNext(response));
                }

ここでは、リクエストの度にツイートを蓄積し、全部返す感じで作成。

エラー発生にはonError、クライアントの処理が完了した際にはonCompletedが呼び出されます。

                @Override
                public void onError(Throwable t) {
                    logger.log(Level.WARNING, "error", t);
                }

                @Override
                public void onCompleted() {
                    logger.info("request completed");
                    responseObserver.onCompleted();
                }

続いて、クライアント側。
src/main/java/org/littlewings/grpc/streaming/StreamingClient.java

package org.littlewings.grpc.streaming;

import java.io.Console;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import org.littlewings.grpc.streamingtweet.StreamingTweeterGrpc;
import org.littlewings.grpc.streamingtweet.TweetRequest;
import org.littlewings.grpc.streamingtweet.TweetResponse;

public class StreamingClient {
    Logger logger = Logger.getLogger(getClass().getName());

    ManagedChannel channel;
    StreamingTweeterGrpc.StreamingTweeterStub stub;

    public static void main(String... args) throws InterruptedException {
        StreamingClient streamingClient = StreamingClient.create("localhost", 8080);

        streamingClient.tweetWhile("磯野 カツオ");

        streamingClient.shutdown();
    }

    public static StreamingClient create(String host, int port) {
        StreamingClient streamingClient = new StreamingClient();
        streamingClient.channel =
                ManagedChannelBuilder
                        .forAddress(host, port)
                        .usePlaintext()
                        .build();
        streamingClient.stub =
                StreamingTweeterGrpc.newStub(streamingClient.channel);

        return streamingClient;
    }

    public void tweetWhile(String name) {
        StreamObserver<TweetResponse> responseObserver = new StreamObserver<TweetResponse>() {
            @Override
            public void onNext(TweetResponse tweetResponse) {
                logger.info(String.format("response: id = %d, name = %s, text = %s, time = %d", tweetResponse.getId(), tweetResponse.getName(), tweetResponse.getText(), tweetResponse.getTime()));
            }

            @Override
            public void onError(Throwable t) {
                logger.log(Level.WARNING, "response error", t);
            }

            @Override
            public void onCompleted() {
                logger.info("response completed!!");
            }
        };

        StreamObserver<TweetRequest> requestObserver = stub.tweetStreaming(responseObserver);

        Console console = System.console();

        while (true) {
            String line = console.readLine("> ");

            if (line.isEmpty()) {
                continue;
            }

            if (line.equals("exit")) {
                break;
            }

            TweetRequest tweetRequest =
                    TweetRequest
                            .newBuilder()
                            .setName(name)
                            .setText(line)
                            .setTime(System.currentTimeMillis())
                            .build();

            requestObserver.onNext(tweetRequest);

            try {
                TimeUnit.SECONDS.sleep(1L);
            } catch (InterruptedException e) {
                // ignore
            }
        }

        requestObserver.onCompleted();

        logger.info("request completed!!");
    }

    public void shutdown() throws InterruptedException {
        channel.shutdown().awaitTermination(5L, TimeUnit.SECONDS);
    }
}

今回は、Stubの型が変わりStreamObserverを使用するものになります。

    ManagedChannel channel;
    StreamingTweeterGrpc.StreamingTweeterStub stub;

Stubを作成する時に、自動生成されたGrpcクラスのnewStubを使います。

        streamingClient.stub =
                StreamingTweeterGrpc.newStub(streamingClient.channel);

リクエストの送信については、今回は固定の名前でツイートし続けるものにしました。

    public void tweetWhile(String name) {
        StreamObserver<TweetResponse> responseObserver = new StreamObserver<TweetResponse>() {
            @Override
            public void onNext(TweetResponse tweetResponse) {
                logger.info(String.format("response: id = %d, name = %s, text = %s, time = %d", tweetResponse.getId(), tweetResponse.getName(), tweetResponse.getText(), tweetResponse.getTime()));
            }

            @Override
            public void onError(Throwable t) {
                logger.log(Level.WARNING, "response error", t);
            }

            @Override
            public void onCompleted() {
                logger.info("response completed!!");
            }
        };

        StreamObserver<TweetRequest> requestObserver = stub.tweetStreaming(responseObserver);

        Console console = System.console();

        while (true) {
            String line = console.readLine("> ");

            if (line.isEmpty()) {
                continue;
            }

            if (line.equals("exit")) {
                break;
            }

            TweetRequest tweetRequest =
                    TweetRequest
                            .newBuilder()
                            .setName(name)
                            .setText(line)
                            .setTime(System.currentTimeMillis())
                            .build();

            requestObserver.onNext(tweetRequest);

            try {
                TimeUnit.SECONDS.sleep(1L);
            } catch (InterruptedException e) {
                // ignore
            }
        }

        requestObserver.onCompleted();

        logger.info("request completed!!");
    }

こちらが、レスポンスを受け取った時の動作。ログ出力のみです。

        StreamObserver<TweetResponse> responseObserver = new StreamObserver<TweetResponse>() {
            @Override
            public void onNext(TweetResponse tweetResponse) {
                logger.info(String.format("response: id = %d, name = %s, text = %s, time = %d", tweetResponse.getId(), tweetResponse.getName(), tweetResponse.getText(), tweetResponse.getTime()));
            }

            @Override
            public void onError(Throwable t) {
                logger.log(Level.WARNING, "response error", t);
            }

            @Override
            public void onCompleted() {
                logger.info("response completed!!");
            }
        };

リクエストを送る方は、レスポンス受信時のStreamObserverの定義を使用してStubから作成します。

        StreamObserver<TweetRequest> requestObserver = stub.tweetStreaming(responseObserver);

        Console console = System.console();

        while (true) {
            String line = console.readLine("> ");

            if (line.isEmpty()) {
                continue;
            }

            if (line.equals("exit")) {
                break;
            }

            TweetRequest tweetRequest =
                    TweetRequest
                            .newBuilder()
                            .setName(name)
                            .setText(line)
                            .setTime(System.currentTimeMillis())
                            .build();

            requestObserver.onNext(tweetRequest);

            try {
                TimeUnit.SECONDS.sleep(1L);
            } catch (InterruptedException e) {
                // ignore
            }
        }

        requestObserver.onCompleted();

        logger.info("request completed!!");

送信は、StreamObserver#onNextで

            requestObserver.onNext(tweetRequest);

完了したら、onCompletedしましょう。

        requestObserver.onCompleted();

今回は、「exit」と入力すると停止するように作成しています。

            if (line.equals("exit")) {
                break;
            }

では、確認してみます。サーバー側を起動。

$ mvn compile exec:java -Dexec.mainClass=org.littlewings.grpc.streaming.StreamingServer

クライアント側を起動。

$ mvn exec:java -Dexec.mainClass=org.littlewings.grpc.streaming.StreamingClient

...

> 

待機状態になるので、適当にツイートしてみます。

> こんにちは、gRPC
11 04, 2018 11:01:16 午後 org.littlewings.grpc.streaming.StreamingClient$1 onNext
情報: response: id = 1541340076501, name = 磯野 カツオ, text = こんにちは、gRPC, time = 1541340076466
> JavaでgRPC
11 04, 2018 11:01:21 午後 org.littlewings.grpc.streaming.StreamingClient$1 onNext
情報: response: id = 1541340076501, name = 磯野 カツオ, text = こんにちは、gRPC, time = 1541340076466
11 04, 2018 11:01:21 午後 org.littlewings.grpc.streaming.StreamingClient$1 onNext
情報: response: id = 1541340081784, name = 磯野 カツオ, text = JavaでgRPC, time = 1541340081780

メッセージを送ると、サーバー側からこれまでに送ったツイート+今回のツイートが返ってきます。

サーバー側のログでは、onNextが呼び出されていることがわかります。

11 04, 2018 11:01:16 午後 org.littlewings.grpc.streaming.StreamingServer$StreamingTweetServiceImpl$1 onNext
情報: request: name = 磯野 カツオ, text = こんにちは、gRPC, time = 1541340076466
11 04, 2018 11:01:16 午後 org.littlewings.grpc.streaming.StreamingServer$StreamingTweetServiceImpl$1 onNext
情報: on request
11 04, 2018 11:01:21 午後 org.littlewings.grpc.streaming.StreamingServer$StreamingTweetServiceImpl$1 onNext
情報: request: name = 磯野 カツオ, text = JavaでgRPC, time = 1541340081780
11 04, 2018 11:01:21 午後 org.littlewings.grpc.streaming.StreamingServer$StreamingTweetServiceImpl$1 onNext
情報: on request

このままずっと続けていくとツイートがどんどん増えていきますが、停止してみましょう。

> exit    
11 04, 2018 11:02:22 午後 org.littlewings.grpc.streaming.StreamingClient tweetWhile
情報: request completed!!
11 04, 2018 11:02:22 午後 org.littlewings.grpc.streaming.StreamingClient$1 onCompleted
情報: response completed!!

サーバー側では、onCompletedが呼び出されます。

11 04, 2018 11:02:22 午後 org.littlewings.grpc.streaming.StreamingServer$StreamingTweetServiceImpl$1 onCompleted
情報: request completed

双方向のStreamingでしたが、確認することができました。

まとめ

gRPC入門ということで、Javaを使ったgRPCのサンプルをちょっと書いてみました。

Protocol Buffers 3を使ったIDL定義、RPCの定義とコードの自動生成、それから実際の動作確認まで簡単なケースを確認できたので、
とっかかりとしてはOKではないでしょうか。

これ以上は、GitHubのexampleをたくさん眺めるんでしょうねぇ。

https://github.com/grpc/grpc-java/tree/v1.16.1/examples

https://github.com/grpc/grpc-java/tree/v1.16.1/examples/src/main/java/io/grpc/examples