これは、なにをしたくて書いたもの?
- 前にJavaでgRPCを使うエントリを書いた時に、ネットワークまわりにNettyを使っているのを見て
- あれ?これ、ブロックするような処理を書いたらどうなるんだろう?とちょっと気になり
- gRPC-Java内で、スレッドがどういう扱いになっているか確認したい
というエントリです。
そんなわけで、簡単なプログラムを書いて確認してみたいと思います。
環境
今回の環境は、こちら。
$ java -version
openjdk version "1.8.0_181"
OpenJDK Runtime Environment (build 1.8.0_181-8u181-b13-1ubuntu0.18.04.1-b13)
OpenJDK 64-Bit Server VM (build 25.181-b13, mixed mode)
$ mvn -version
Apache Maven 3.6.0 (97c98ec64a1fdfee7767ce5ffb20918da4f719f3; 2018-10-25T03:41:47+09:00)
Maven home: $HOME/.sdkman/candidates/maven/current
Java version: 1.8.0_181, vendor: Oracle Corporation, runtime: /usr/lib/jvm/java-8-openjdk-amd64/jre
Default locale: ja_JP, platform encoding: UTF-8
OS name: "linux", version: "4.15.0-38-generic", arch: "amd64", family: "unix"
準備
Maven依存関係は、こちら。
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
<version>1.16.1</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>1.16.1</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>1.16.1</version>
</dependency>
プラグインやextensionの設定。
<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.5.0.Final</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<id>add-source</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>target/generated-sources/protobuf/grpc-java</source>
<sorce>target/generated-sources/protobuf/java</sorce>
</sources>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.5.1</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:3.5.1-1:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:1.16.1:exe:${os.detected.classifier}</pluginArtifact>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
ちょっとした事情で、自動生成するJavaソースコードを、明示的に追加ソースディレクトリとして加えています。
サンプルプログラム
サンプルプログラムは、単純にEchoのノリで作ってみます。
まずは、IDL。
src/main/proto/echo.proto
syntax = "proto3";
option java_multiple_files = true;
option java_package = "org.littlewings.grpc.echo";
option java_outer_classname = "EchoService";
service Echo {
rpc echo (EchoRequest) returns (EchoResponse) {}
}
message EchoRequest {
string message = 1;
}
message EchoResponse {
string message = 1;
}
この状態で、1度コンパイルします。
$ mvn compile
IDLからJavaソースコードが自動生成されるので、それを使ってクライアントおよびサーバー側のプログラムを作成します。
まずは、サーバー側。
src/main/java/org/littlewings/grpc/threading/EchoServer.java
package org.littlewings.grpc.threading;
import java.io.IOException;
import java.util.logging.Logger;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import org.littlewings.grpc.echo.EchoGrpc;
import org.littlewings.grpc.echo.EchoRequest;
import org.littlewings.grpc.echo.EchoResponse;
public class EchoServer {
Logger logger = Logger.getLogger(getClass().getName());
Server server;
public static void main(String... args) throws IOException, InterruptedException {
EchoServer echoServer = new EchoServer();
echoServer.start();
echoServer.blockUntilShutdown();
}
public void start() throws IOException {
server =
ServerBuilder
.forPort(8080)
.addService(new EchoServiceImpl())
.build()
.start();
logger.info("start gRPC server.");
}
public void stop() {
if (server != null) {
server.shutdown();
logger.info("shutdown gRPC server.");
}
}
public void blockUntilShutdown() throws InterruptedException {
if (server != null) {
server.awaitTermination();
}
}
public static class EchoServiceImpl extends EchoGrpc.EchoImplBase {
Logger logger = Logger.getLogger(getClass().getName());
@Override
public void echo(EchoRequest request, StreamObserver<EchoResponse> responseObserver) {
String receivedMessage = request.getMessage();
logger.info(String.format("[%s] %s", Thread.currentThread().getName(), receivedMessage));
responseObserver.onNext(EchoResponse.newBuilder().setMessage("★★★" + receivedMessage + "★★★").build());
responseObserver.onCompleted();
}
}
}
メッセージを送り返す時には、「★」を付けて返すようにしています。
responseObserver.onNext(EchoResponse.newBuilder().setMessage("★★★" + receivedMessage + "★★★").build());
また、アクセス時にスレッド名を出力するようにしました。
logger.info(String.format("[%s] %s", Thread.currentThread().getName(), receivedMessage));
続いて、クライアント側。こちらは、至ってシンプルです。
src/main/java/org/littlewings/grpc/threading/EchoClient.java
package org.littlewings.grpc.threading;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import org.littlewings.grpc.echo.EchoGrpc;
import org.littlewings.grpc.echo.EchoRequest;
import org.littlewings.grpc.echo.EchoResponse;
public class EchoClient {
Logger logger = Logger.getLogger(getClass().getName());
ManagedChannel channel;
EchoGrpc.EchoFutureStub futureStub;
public static void main(String... args) throws InterruptedException {
Logger logger = Logger.getLogger(EchoClient.class.getName());
EchoClient echoClient = EchoClient.create("localhost", 8080);
logger.info(echoClient.echo("こんにちは、世界").getMessage());
logger.info(echoClient.echo("Hello gRPC!!").getMessage());
echoClient.shutdown();
}
public static EchoClient create(String host, int port) {
EchoClient echoClient = new EchoClient();
echoClient.channel =
ManagedChannelBuilder
.forAddress(host, port)
.usePlaintext()
.build();
echoClient.futureStub =
EchoGrpc.newFutureStub(echoClient.channel);
return echoClient;
}
EchoResponse echo(String message) {
EchoRequest echoRequest = EchoRequest.newBuilder().setMessage(message).build();
try {
return futureStub.echo(echoRequest).get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
public void shutdown() throws InterruptedException {
channel.shutdown().awaitTermination(5L, TimeUnit.SECONDS);
}
}
クライアント側については、自動生成されたソースコードをちょっと修正して、サーバーからのレスポンスをIDL定義した
メッセージの型に変換する時にスレッド名を出力するようにしました。
target/generated-sources/protobuf/java/org/littlewings/grpc/echo/EchoResponse.java より抜粋
@java.lang.Override
public final com.google.protobuf.UnknownFieldSet
getUnknownFields() {
return this.unknownFields;
}
private EchoResponse(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
this();
java.util.logging.Logger.getLogger(getClass().getName()).info(String.format("thread name = %s", Thread.currentThread().getName()));
というわけで、サンプルプログラムが作成できました。
確認
で、こちらをコンパイルして実行します。
$ mvn compile exec:java -Dprotoc.skip=true -Dexec.mainClass=org.littlewings.grpc.threading.EchoServer
...
11 07, 2018 12:12:45 午前 org.littlewings.grpc.threading.EchoServer start
情報: start gRPC server.
11 07, 2018 12:12:48 午前 org.littlewings.grpc.threading.EchoServer$EchoServiceImpl echo
情報: [grpc-default-executor-0] こんにちは、世界
11 07, 2018 12:12:48 午前 org.littlewings.grpc.threading.EchoServer$EchoServiceImpl echo
情報: [grpc-default-executor-0] Hello gRPC!!
$ mvn compile exec:java -Dprotoc.skip=true -Dexec.mainClass=org.littlewings.grpc.threading.EchoClient
...
11 07, 2018 12:12:48 午前 org.littlewings.grpc.echo.EchoResponse <init>
情報: thread name = grpc-default-executor-0
11 07, 2018 12:12:48 午前 org.littlewings.grpc.threading.EchoClient main
情報: ★★★こんにちは、世界★★★
11 07, 2018 12:12:48 午前 org.littlewings.grpc.echo.EchoResponse <init>
情報: thread name = grpc-default-executor-0
11 07, 2018 12:12:48 午前 org.littlewings.grpc.threading.EchoClient main
情報: ★★★Hello gRPC!!★★★
「-Dprotoc.skip=true」を付けたのは、自動生成されたソースコードを修正しているので、再生成を抑止するため。
Maven Protocol Buffers Plugin – Introduction
出力されたスレッド名を見ると、クライアントもサーバーも「grpc-default-executor-0」という名前になっていますね。
11 07, 2018 12:12:48 午前 org.littlewings.grpc.threading.EchoServer$EchoServiceImpl echo
情報: [grpc-default-executor-0] こんにちは、世界
このスレッド名は、どこから来たのでしょう?
では、先ほどの確認時に出力されたスレッド名を頼りに、ソースコードを確認してみます。
デフォルトで使われていたスレッドプールは、こちらですね。
https://github.com/grpc/grpc-java/blob/v1.16.1/core/src/main/java/io/grpc/internal/GrpcUtil.java#L488-L505
特になにもExecutorを指定しないと、Serverの構築時と
https://github.com/grpc/grpc-java/blob/v1.16.1/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java#L60-L61
https://github.com/grpc/grpc-java/blob/v1.16.1/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java#L97
ManagedChannelの構築時に、このデフォルトのスレッドプールが使用されます。
https://github.com/grpc/grpc-java/blob/v1.16.1/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java#L83-L84
https://github.com/grpc/grpc-java/blob/v1.16.1/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java#L98
なお、デフォルトのスレッドプールは、CachedThreadPoolです。
@Override
public ExecutorService create() {
return Executors.newCachedThreadPool(getThreadFactory(NAME + "-%d", true));
}
なので、これを変更したければServerやManagedChannelの構築時に、Executorを指定することになります。
Serverの場合。
server =
ServerBuilder
.forPort(8080)
.addService(new EchoServiceImpl())
.executor(Executors.newFixedThreadPool(100))
.build()
.start();
ManagedChannelの場合。
echoClient.channel =
ManagedChannelBuilder
.forAddress(host, port)
.usePlaintext()
.executor(Executors.newFixedThreadPool(100))
.build();
ただ、元のスレッドプールはdaemon Threadのようなので、こういう渡し方をする場合はServerやManagedChannelをシャットダウン
する際に、合わせて渡したスレッドプールもシャットダウンした方が良さそうです。
※このサンプルのような渡し方をすると、shutdownメソッドで停止しなくなる…クライアントが…
サーバー側では、このあたりでNettyのHandlerから、gRPC-Javaで定義されたスレッドプールに引き渡されます。
https://github.com/grpc/grpc-java/blob/v1.16.1/netty/src/main/java/io/grpc/netty/NettyServerHandler.java#L437
https://github.com/grpc/grpc-java/blob/v1.16.1/core/src/main/java/io/grpc/internal/ServerImpl.java#L495
クライアント側は、いろいろ散らばってたので、追うのを諦めました…。
とはいえ、使われている箇所は、だいたいこの中に収まっているのですが。
https://github.com/grpc/grpc-java/tree/v1.16.1/core/src/main/java/io/grpc/internal
まとめ
というわけで、Nettyを使ってはいますが、ライブラリ利用側で作成するRPCの処理そのものを担当する処理は、スレッドが
割り当てられるモデルで動作することが確認できました。
デフォルトはCachedThreadPoolが使用されるので容量無制限ですが、絞りたい場合はスレッドプールを作成して
ServerやManagedChannelの構築時に設定すればよいでしょう。
気になっていたことは、これで確認できました、と。