CLOVER🍀

That was when it all began.

gRPC-Javaのスレッドが気になるという話

これは、なにをしたくて書いたもの?

  • 前にJavaでgRPCを使うエントリを書いた時に、ネットワークまわりにNettyを使っているのを見て
  • あれ?これ、ブロックするような処理を書いたらどうなるんだろう?とちょっと気になり
  • gRPC-Java内で、スレッドがどういう扱いになっているか確認したい

というエントリです。

そんなわけで、簡単なプログラムを書いて確認してみたいと思います。

環境

今回の環境は、こちら。

$ java -version
openjdk version "1.8.0_181"
OpenJDK Runtime Environment (build 1.8.0_181-8u181-b13-1ubuntu0.18.04.1-b13)
OpenJDK 64-Bit Server VM (build 25.181-b13, mixed mode)

$ mvn -version
Apache Maven 3.6.0 (97c98ec64a1fdfee7767ce5ffb20918da4f719f3; 2018-10-25T03:41:47+09:00)
Maven home: $HOME/.sdkman/candidates/maven/current
Java version: 1.8.0_181, vendor: Oracle Corporation, runtime: /usr/lib/jvm/java-8-openjdk-amd64/jre
Default locale: ja_JP, platform encoding: UTF-8
OS name: "linux", version: "4.15.0-38-generic", arch: "amd64", family: "unix"

準備

Maven依存関係は、こちら。

        <dependency>
            <groupId>io.grpc</groupId>
            <artifactId>grpc-netty-shaded</artifactId>
            <version>1.16.1</version>
        </dependency>
        <dependency>
            <groupId>io.grpc</groupId>
            <artifactId>grpc-protobuf</artifactId>
            <version>1.16.1</version>
        </dependency>
        <dependency>
            <groupId>io.grpc</groupId>
            <artifactId>grpc-stub</artifactId>
            <version>1.16.1</version>
        </dependency>

プラグインやextensionの設定。

    <build>
        <extensions>
            <extension>
                <groupId>kr.motd.maven</groupId>
                <artifactId>os-maven-plugin</artifactId>
                <version>1.5.0.Final</version>
            </extension>
        </extensions>
        <plugins>
            <plugin>
              <groupId>org.codehaus.mojo</groupId>
              <artifactId>build-helper-maven-plugin</artifactId>
              <version>3.0.0</version>
              <executions>
                <execution>
                  <id>add-source</id>
                  <phase>generate-sources</phase>
                  <goals>
                    <goal>add-source</goal>
                  </goals>
                  <configuration>
                    <sources>
                      <source>target/generated-sources/protobuf/grpc-java</source>
                      <sorce>target/generated-sources/protobuf/java</sorce>
                    </sources>
                  </configuration>
                </execution>
              </executions>
            </plugin>
            <plugin>
                <groupId>org.xolstice.maven.plugins</groupId>
                <artifactId>protobuf-maven-plugin</artifactId>
                <version>0.5.1</version>
                <configuration>
                    <protocArtifact>com.google.protobuf:protoc:3.5.1-1:exe:${os.detected.classifier}</protocArtifact>
                    <pluginId>grpc-java</pluginId>
                    <pluginArtifact>io.grpc:protoc-gen-grpc-java:1.16.1:exe:${os.detected.classifier}</pluginArtifact>
                </configuration>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>compile-custom</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

ちょっとした事情で、自動生成するJavaソースコードを、明示的に追加ソースディレクトリとして加えています。

サンプルプログラム

サンプルプログラムは、単純にEchoのノリで作ってみます。

まずは、IDL。
src/main/proto/echo.proto

syntax = "proto3";

option java_multiple_files = true;
option java_package = "org.littlewings.grpc.echo";
option java_outer_classname = "EchoService";

service Echo {
    rpc echo (EchoRequest) returns (EchoResponse) {}
}

message EchoRequest {
    string message = 1;
}

message EchoResponse {
    string message = 1;
}

この状態で、1度コンパイルします。

$ mvn compile

IDLからJavaソースコードが自動生成されるので、それを使ってクライアントおよびサーバー側のプログラムを作成します。

まずは、サーバー側。
src/main/java/org/littlewings/grpc/threading/EchoServer.java

package org.littlewings.grpc.threading;

import java.io.IOException;
import java.util.logging.Logger;

import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import org.littlewings.grpc.echo.EchoGrpc;
import org.littlewings.grpc.echo.EchoRequest;
import org.littlewings.grpc.echo.EchoResponse;

public class EchoServer {
    Logger logger = Logger.getLogger(getClass().getName());

    Server server;

    public static void main(String... args) throws IOException, InterruptedException {
        EchoServer echoServer = new EchoServer();
        echoServer.start();

        echoServer.blockUntilShutdown();
    }

    public void start() throws IOException {
        server =
                ServerBuilder
                        .forPort(8080)
                        .addService(new EchoServiceImpl())
                        .build()
                        .start();

        logger.info("start gRPC server.");
    }

    public void stop() {
        if (server != null) {
            server.shutdown();
            logger.info("shutdown gRPC server.");
        }
    }

    public void blockUntilShutdown() throws InterruptedException {
        if (server != null) {
            server.awaitTermination();
        }
    }

    public static class EchoServiceImpl extends EchoGrpc.EchoImplBase {
        Logger logger = Logger.getLogger(getClass().getName());

        @Override
        public void echo(EchoRequest request, StreamObserver<EchoResponse> responseObserver) {
            String receivedMessage = request.getMessage();

            logger.info(String.format("[%s] %s", Thread.currentThread().getName(), receivedMessage));

            responseObserver.onNext(EchoResponse.newBuilder().setMessage("★★★" + receivedMessage + "★★★").build());
            responseObserver.onCompleted();
        }
    }
}

メッセージを送り返す時には、「★」を付けて返すようにしています。

            responseObserver.onNext(EchoResponse.newBuilder().setMessage("★★★" + receivedMessage + "★★★").build());

また、アクセス時にスレッド名を出力するようにしました。

            logger.info(String.format("[%s] %s", Thread.currentThread().getName(), receivedMessage));

続いて、クライアント側。こちらは、至ってシンプルです。
src/main/java/org/littlewings/grpc/threading/EchoClient.java

package org.littlewings.grpc.threading;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import org.littlewings.grpc.echo.EchoGrpc;
import org.littlewings.grpc.echo.EchoRequest;
import org.littlewings.grpc.echo.EchoResponse;

public class EchoClient {
    Logger logger = Logger.getLogger(getClass().getName());

    ManagedChannel channel;

    EchoGrpc.EchoFutureStub futureStub;

    public static void main(String... args) throws InterruptedException {
        Logger logger = Logger.getLogger(EchoClient.class.getName());

        EchoClient echoClient = EchoClient.create("localhost", 8080);

        logger.info(echoClient.echo("こんにちは、世界").getMessage());
        logger.info(echoClient.echo("Hello gRPC!!").getMessage());

        echoClient.shutdown();
    }

    public static EchoClient create(String host, int port) {
        EchoClient echoClient = new EchoClient();
        echoClient.channel =
                ManagedChannelBuilder
                        .forAddress(host, port)
                        .usePlaintext()
                        .build();
        echoClient.futureStub =
                EchoGrpc.newFutureStub(echoClient.channel);

        return echoClient;
    }

    EchoResponse echo(String message) {
        EchoRequest echoRequest = EchoRequest.newBuilder().setMessage(message).build();

        try {
            return futureStub.echo(echoRequest).get();
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    public void shutdown() throws InterruptedException {
        channel.shutdown().awaitTermination(5L, TimeUnit.SECONDS);
    }
}

クライアント側については、自動生成されたソースコードをちょっと修正して、サーバーからのレスポンスをIDL定義した
メッセージの型に変換する時にスレッド名を出力するようにしました。
target/generated-sources/protobuf/java/org/littlewings/grpc/echo/EchoResponse.java より抜粋

  @java.lang.Override
  public final com.google.protobuf.UnknownFieldSet
  getUnknownFields() {
    return this.unknownFields;
  }
  private EchoResponse(
      com.google.protobuf.CodedInputStream input,
      com.google.protobuf.ExtensionRegistryLite extensionRegistry)
      throws com.google.protobuf.InvalidProtocolBufferException {
    this();
    java.util.logging.Logger.getLogger(getClass().getName()).info(String.format("thread name = %s", Thread.currentThread().getName()));

というわけで、サンプルプログラムが作成できました。

確認

で、こちらをコンパイルして実行します。

$ mvn compile exec:java -Dprotoc.skip=true -Dexec.mainClass=org.littlewings.grpc.threading.EchoServer
...
11 07, 2018 12:12:45 午前 org.littlewings.grpc.threading.EchoServer start
情報: start gRPC server.
11 07, 2018 12:12:48 午前 org.littlewings.grpc.threading.EchoServer$EchoServiceImpl echo
情報: [grpc-default-executor-0] こんにちは、世界
11 07, 2018 12:12:48 午前 org.littlewings.grpc.threading.EchoServer$EchoServiceImpl echo
情報: [grpc-default-executor-0] Hello gRPC!!
$ mvn compile exec:java -Dprotoc.skip=true -Dexec.mainClass=org.littlewings.grpc.threading.EchoClient
...
11 07, 2018 12:12:48 午前 org.littlewings.grpc.echo.EchoResponse <init>
情報: thread name = grpc-default-executor-0
11 07, 2018 12:12:48 午前 org.littlewings.grpc.threading.EchoClient main
情報: ★★★こんにちは、世界★★★
11 07, 2018 12:12:48 午前 org.littlewings.grpc.echo.EchoResponse <init>
情報: thread name = grpc-default-executor-0
11 07, 2018 12:12:48 午前 org.littlewings.grpc.threading.EchoClient main
情報: ★★★Hello gRPC!!★★★

「-Dprotoc.skip=true」を付けたのは、自動生成されたソースコードを修正しているので、再生成を抑止するため。

Maven Protocol Buffers Plugin – Introduction

出力されたスレッド名を見ると、クライアントもサーバーも「grpc-default-executor-0」という名前になっていますね。

11 07, 2018 12:12:48 午前 org.littlewings.grpc.threading.EchoServer$EchoServiceImpl echo
情報: [grpc-default-executor-0] こんにちは、世界

このスレッド名は、どこから来たのでしょう?

ソースコードを読む

では、先ほどの確認時に出力されたスレッド名を頼りに、ソースコードを確認してみます。

デフォルトで使われていたスレッドプールは、こちらですね。
https://github.com/grpc/grpc-java/blob/v1.16.1/core/src/main/java/io/grpc/internal/GrpcUtil.java#L488-L505

特になにもExecutorを指定しないと、Serverの構築時と
https://github.com/grpc/grpc-java/blob/v1.16.1/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java#L60-L61 https://github.com/grpc/grpc-java/blob/v1.16.1/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java#L97

ManagedChannelの構築時に、このデフォルトのスレッドプールが使用されます。
https://github.com/grpc/grpc-java/blob/v1.16.1/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java#L83-L84 https://github.com/grpc/grpc-java/blob/v1.16.1/core/src/main/java/io/grpc/internal/AbstractManagedChannelImplBuilder.java#L98

なお、デフォルトのスレッドプールは、CachedThreadPoolです。

        @Override
        public ExecutorService create() {
          return Executors.newCachedThreadPool(getThreadFactory(NAME + "-%d", true));
        }

なので、これを変更したければServerやManagedChannelの構築時に、Executorを指定することになります。

Serverの場合。

        server =
                ServerBuilder
                        .forPort(8080)
                        .addService(new EchoServiceImpl())
                        .executor(Executors.newFixedThreadPool(100))
                        .build()
                        .start();

ManagedChannelの場合。

        echoClient.channel =
                ManagedChannelBuilder
                        .forAddress(host, port)
                        .usePlaintext()
                        .executor(Executors.newFixedThreadPool(100))
                        .build();

ただ、元のスレッドプールはdaemon Threadのようなので、こういう渡し方をする場合はServerやManagedChannelをシャットダウン
する際に、合わせて渡したスレッドプールもシャットダウンした方が良さそうです。
※このサンプルのような渡し方をすると、shutdownメソッドで停止しなくなる…クライアントが…

サーバー側では、このあたりでNettyのHandlerから、gRPC-Javaで定義されたスレッドプールに引き渡されます。
https://github.com/grpc/grpc-java/blob/v1.16.1/netty/src/main/java/io/grpc/netty/NettyServerHandler.java#L437 https://github.com/grpc/grpc-java/blob/v1.16.1/core/src/main/java/io/grpc/internal/ServerImpl.java#L495

クライアント側は、いろいろ散らばってたので、追うのを諦めました…。

とはいえ、使われている箇所は、だいたいこの中に収まっているのですが。
https://github.com/grpc/grpc-java/tree/v1.16.1/core/src/main/java/io/grpc/internal

まとめ

というわけで、Nettyを使ってはいますが、ライブラリ利用側で作成するRPCの処理そのものを担当する処理は、スレッドが
割り当てられるモデルで動作することが確認できました。

デフォルトはCachedThreadPoolが使用されるので容量無制限ですが、絞りたい場合はスレッドプールを作成して
ServerやManagedChannelの構築時に設定すればよいでしょう。

気になっていたことは、これで確認できました、と。