これは、なにをしたくて書いたもの?
- 前にJavaでgRPCを使うエントリを書いた時に、ネットワークまわりにNettyを使っているのを見て
- あれ?これ、ブロックするような処理を書いたらどうなるんだろう?とちょっと気になり
- gRPC-Java内で、スレッドがどういう扱いになっているか確認したい
というエントリです。
そんなわけで、簡単なプログラムを書いて確認してみたいと思います。
環境
今回の環境は、こちら。
$ java -version openjdk version "1.8.0_181" OpenJDK Runtime Environment (build 1.8.0_181-8u181-b13-1ubuntu0.18.04.1-b13) OpenJDK 64-Bit Server VM (build 25.181-b13, mixed mode) $ mvn -version Apache Maven 3.6.0 (97c98ec64a1fdfee7767ce5ffb20918da4f719f3; 2018-10-25T03:41:47+09:00) Maven home: $HOME/.sdkman/candidates/maven/current Java version: 1.8.0_181, vendor: Oracle Corporation, runtime: /usr/lib/jvm/java-8-openjdk-amd64/jre Default locale: ja_JP, platform encoding: UTF-8 OS name: "linux", version: "4.15.0-38-generic", arch: "amd64", family: "unix"
準備
Maven依存関係は、こちら。
<dependency> <groupId>io.grpc</groupId> <artifactId>grpc-netty-shaded</artifactId> <version>1.16.1</version> </dependency> <dependency> <groupId>io.grpc</groupId> <artifactId>grpc-protobuf</artifactId> <version>1.16.1</version> </dependency> <dependency> <groupId>io.grpc</groupId> <artifactId>grpc-stub</artifactId> <version>1.16.1</version> </dependency>
プラグインやextensionの設定。
<build> <extensions> <extension> <groupId>kr.motd.maven</groupId> <artifactId>os-maven-plugin</artifactId> <version>1.5.0.Final</version> </extension> </extensions> <plugins> <plugin> <groupId>org.codehaus.mojo</groupId> <artifactId>build-helper-maven-plugin</artifactId> <version>3.0.0</version> <executions> <execution> <id>add-source</id> <phase>generate-sources</phase> <goals> <goal>add-source</goal> </goals> <configuration> <sources> <source>target/generated-sources/protobuf/grpc-java</source> <sorce>target/generated-sources/protobuf/java</sorce> </sources> </configuration> </execution> </executions> </plugin> <plugin> <groupId>org.xolstice.maven.plugins</groupId> <artifactId>protobuf-maven-plugin</artifactId> <version>0.5.1</version> <configuration> <protocArtifact>com.google.protobuf:protoc:3.5.1-1:exe:${os.detected.classifier}</protocArtifact> <pluginId>grpc-java</pluginId> <pluginArtifact>io.grpc:protoc-gen-grpc-java:1.16.1:exe:${os.detected.classifier}</pluginArtifact> </configuration> <executions> <execution> <goals> <goal>compile</goal> <goal>compile-custom</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
ちょっとした事情で、自動生成するJavaソースコードを、明示的に追加ソースディレクトリとして加えています。
サンプルプログラム
サンプルプログラムは、単純にEchoのノリで作ってみます。
まずは、IDL。
src/main/proto/echo.proto
syntax = "proto3"; option java_multiple_files = true; option java_package = "org.littlewings.grpc.echo"; option java_outer_classname = "EchoService"; service Echo { rpc echo (EchoRequest) returns (EchoResponse) {} } message EchoRequest { string message = 1; } message EchoResponse { string message = 1; }
この状態で、1度コンパイルします。
$ mvn compile
IDLからJavaソースコードが自動生成されるので、それを使ってクライアントおよびサーバー側のプログラムを作成します。
まずは、サーバー側。
src/main/java/org/littlewings/grpc/threading/EchoServer.java
package org.littlewings.grpc.threading; import java.io.IOException; import java.util.logging.Logger; import io.grpc.Server; import io.grpc.ServerBuilder; import io.grpc.stub.StreamObserver; import org.littlewings.grpc.echo.EchoGrpc; import org.littlewings.grpc.echo.EchoRequest; import org.littlewings.grpc.echo.EchoResponse; public class EchoServer { Logger logger = Logger.getLogger(getClass().getName()); Server server; public static void main(String... args) throws IOException, InterruptedException { EchoServer echoServer = new EchoServer(); echoServer.start(); echoServer.blockUntilShutdown(); } public void start() throws IOException { server = ServerBuilder .forPort(8080) .addService(new EchoServiceImpl()) .build() .start(); logger.info("start gRPC server."); } public void stop() { if (server != null) { server.shutdown(); logger.info("shutdown gRPC server."); } } public void blockUntilShutdown() throws InterruptedException { if (server != null) { server.awaitTermination(); } } public static class EchoServiceImpl extends EchoGrpc.EchoImplBase { Logger logger = Logger.getLogger(getClass().getName()); @Override public void echo(EchoRequest request, StreamObserver<EchoResponse> responseObserver) { String receivedMessage = request.getMessage(); logger.info(String.format("[%s] %s", Thread.currentThread().getName(), receivedMessage)); responseObserver.onNext(EchoResponse.newBuilder().setMessage("★★★" + receivedMessage + "★★★").build()); responseObserver.onCompleted(); } } }
メッセージを送り返す時には、「★」を付けて返すようにしています。
responseObserver.onNext(EchoResponse.newBuilder().setMessage("★★★" + receivedMessage + "★★★").build());
また、アクセス時にスレッド名を出力するようにしました。
logger.info(String.format("[%s] %s", Thread.currentThread().getName(), receivedMessage));
続いて、クライアント側。こちらは、至ってシンプルです。
src/main/java/org/littlewings/grpc/threading/EchoClient.java
package org.littlewings.grpc.threading; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.logging.Logger; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import org.littlewings.grpc.echo.EchoGrpc; import org.littlewings.grpc.echo.EchoRequest; import org.littlewings.grpc.echo.EchoResponse; public class EchoClient { Logger logger = Logger.getLogger(getClass().getName()); ManagedChannel channel; EchoGrpc.EchoFutureStub futureStub; public static void main(String... args) throws InterruptedException { Logger logger = Logger.getLogger(EchoClient.class.getName()); EchoClient echoClient = EchoClient.create("localhost", 8080); logger.info(echoClient.echo("こんにちは、世界").getMessage()); logger.info(echoClient.echo("Hello gRPC!!").getMessage()); echoClient.shutdown(); } public static EchoClient create(String host, int port) { EchoClient echoClient = new EchoClient(); echoClient.channel = ManagedChannelBuilder .forAddress(host, port) .usePlaintext() .build(); echoClient.futureStub = EchoGrpc.newFutureStub(echoClient.channel); return echoClient; } EchoResponse echo(String message) { EchoRequest echoRequest = EchoRequest.newBuilder().setMessage(message).build(); try { return futureStub.echo(echoRequest).get(); } catch (InterruptedException | ExecutionException e) { throw new RuntimeException(e); } } public void shutdown() throws InterruptedException { channel.shutdown().awaitTermination(5L, TimeUnit.SECONDS); } }
クライアント側については、自動生成されたソースコードをちょっと修正して、サーバーからのレスポンスをIDL定義した
メッセージの型に変換する時にスレッド名を出力するようにしました。
target/generated-sources/protobuf/java/org/littlewings/grpc/echo/EchoResponse.java より抜粋
@java.lang.Override public final com.google.protobuf.UnknownFieldSet getUnknownFields() { return this.unknownFields; } private EchoResponse( com.google.protobuf.CodedInputStream input, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws com.google.protobuf.InvalidProtocolBufferException { this(); java.util.logging.Logger.getLogger(getClass().getName()).info(String.format("thread name = %s", Thread.currentThread().getName()));
というわけで、サンプルプログラムが作成できました。
確認
で、こちらをコンパイルして実行します。
$ mvn compile exec:java -Dprotoc.skip=true -Dexec.mainClass=org.littlewings.grpc.threading.EchoServer ... 11 07, 2018 12:12:45 午前 org.littlewings.grpc.threading.EchoServer start 情報: start gRPC server. 11 07, 2018 12:12:48 午前 org.littlewings.grpc.threading.EchoServer$EchoServiceImpl echo 情報: [grpc-default-executor-0] こんにちは、世界 11 07, 2018 12:12:48 午前 org.littlewings.grpc.threading.EchoServer$EchoServiceImpl echo 情報: [grpc-default-executor-0] Hello gRPC!!
$ mvn compile exec:java -Dprotoc.skip=true -Dexec.mainClass=org.littlewings.grpc.threading.EchoClient ... 11 07, 2018 12:12:48 午前 org.littlewings.grpc.echo.EchoResponse <init> 情報: thread name = grpc-default-executor-0 11 07, 2018 12:12:48 午前 org.littlewings.grpc.threading.EchoClient main 情報: ★★★こんにちは、世界★★★ 11 07, 2018 12:12:48 午前 org.littlewings.grpc.echo.EchoResponse <init> 情報: thread name = grpc-default-executor-0 11 07, 2018 12:12:48 午前 org.littlewings.grpc.threading.EchoClient main 情報: ★★★Hello gRPC!!★★★
「-Dprotoc.skip=true」を付けたのは、自動生成されたソースコードを修正しているので、再生成を抑止するため。
Maven Protocol Buffers Plugin – Introduction
出力されたスレッド名を見ると、クライアントもサーバーも「grpc-default-executor-0」という名前になっていますね。
11 07, 2018 12:12:48 午前 org.littlewings.grpc.threading.EchoServer$EchoServiceImpl echo 情報: [grpc-default-executor-0] こんにちは、世界
このスレッド名は、どこから来たのでしょう?
ソースコードを読む
では、先ほどの確認時に出力されたスレッド名を頼りに、ソースコードを確認してみます。
デフォルトで使われていたスレッドプールは、こちらですね。
https://github.com/grpc/grpc-java/blob/v1.16.1/core/src/main/java/io/grpc/internal/GrpcUtil.java#L488-L505
特になにもExecutorを指定しないと、Serverの構築時と
https://github.com/grpc/grpc-java/blob/v1.16.1/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java#L60-L61
https://github.com/grpc/grpc-java/blob/v1.16.1/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java#L97
ManagedChannelの構築時に、このデフォルトのスレッドプールが使用されます。
https://github.com/grpc/grpc-java/blob/v1.16.1/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java#L83-L84
https://github.com/grpc/grpc-java/blob/v1.16.1/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java#L98
なお、デフォルトのスレッドプールは、CachedThreadPoolです。
@Override public ExecutorService create() { return Executors.newCachedThreadPool(getThreadFactory(NAME + "-%d", true)); }
なので、これを変更したければServerやManagedChannelの構築時に、Executorを指定することになります。
Serverの場合。
server = ServerBuilder .forPort(8080) .addService(new EchoServiceImpl()) .executor(Executors.newFixedThreadPool(100)) .build() .start();
ManagedChannelの場合。
echoClient.channel = ManagedChannelBuilder .forAddress(host, port) .usePlaintext() .executor(Executors.newFixedThreadPool(100)) .build();
ただ、元のスレッドプールはdaemon Threadのようなので、こういう渡し方をする場合はServerやManagedChannelをシャットダウン
する際に、合わせて渡したスレッドプールもシャットダウンした方が良さそうです。
※このサンプルのような渡し方をすると、shutdownメソッドで停止しなくなる…クライアントが…
サーバー側では、このあたりでNettyのHandlerから、gRPC-Javaで定義されたスレッドプールに引き渡されます。
https://github.com/grpc/grpc-java/blob/v1.16.1/netty/src/main/java/io/grpc/netty/NettyServerHandler.java#L437
https://github.com/grpc/grpc-java/blob/v1.16.1/core/src/main/java/io/grpc/internal/ServerImpl.java#L495
クライアント側は、いろいろ散らばってたので、追うのを諦めました…。
とはいえ、使われている箇所は、だいたいこの中に収まっているのですが。
https://github.com/grpc/grpc-java/tree/v1.16.1/core/src/main/java/io/grpc/internal
まとめ
というわけで、Nettyを使ってはいますが、ライブラリ利用側で作成するRPCの処理そのものを担当する処理は、スレッドが
割り当てられるモデルで動作することが確認できました。
デフォルトはCachedThreadPoolが使用されるので容量無制限ですが、絞りたい場合はスレッドプールを作成して
ServerやManagedChannelの構築時に設定すればよいでしょう。
気になっていたことは、これで確認できました、と。