これは、なにをしたくて書いたもの?
- RSocketに、Protocol Buffersを使ったRPCがあると聞いて
- ちょっと試してみようかと
RSocket RPC?
RSocket上で動作する、RPCの仕組みです。
RSocket自体は、以前に軽く試してみたことがあります。
RSocket Java(0.11)でEcho Server/Clientっぽいものを書いてみる - CLOVER🍀
RSocket RPCでは、Protocol Buffers 3を使い、リクエスト/レスポンスという形態だけではなく、ストリーム(双方向含む)も
扱うことができます。
RSocket RPCは複数の言語に対応していますが、現時点で作られているリポジトリは、以下のようです。
GitHub - rsocket/rsocket-rpc-java: Standard RSocket RPC Java Implementation
GitHub - rsocket/rsocket-rpc-js: Standard RSocket RPC Implementation
GitHub - rsocket/rsocket-rpc-kotlin: Standard RSocket RPC Kotlin Implementation
これと似たようなものとして、gRPCがありますね。gRPCも、少し前に試してみました。
では、RSocket RPCがgRPCとどう異なるかですが…RSocket RPCがどのような動機で作られたのかは、こちらを
参照するとよいでしょう。
https://github.com/rsocket/rsocket-rpc-java/blob/0.2.11/docs/motivation.md
端的に言うと、Reactive Streams、Back Pressureへの対応です。gRPCでは、フロー制御への対応がないことを、
このドキュメントでは指摘しています。
今回は同じProtocol Buffers 3を使うgRPCについて過去に自分で書いたエントリの内容を使いつつ、RSocket RPC Javaを
試してみたいと思います。
参考にするドキュメントは、こちらです。
https://github.com/rsocket/rsocket-rpc-java/blob/0.2.11/docs/get-started.md
お題
今回のお題は、前のgRPCのエントリと同じく、ツイート的なものです。
- クライアントから名前とツイートする内容、ツイートした時刻を含めてリクエストすると、サーバー側がその内容にidを追加して返す
- 上記の内容を双方向のStreamingにして、ツイートするとサーバーからこれまでにツイートされた内容が非同期に返ってくる
環境
今回の環境は、こちら。
$ java -version openjdk version "1.8.0_191" OpenJDK Runtime Environment (build 1.8.0_191-8u191-b12-0ubuntu0.18.04.1-b12) OpenJDK 64-Bit Server VM (build 25.191-b12, mixed mode) $ mvn -version Apache Maven 3.6.0 (97c98ec64a1fdfee7767ce5ffb20918da4f719f3; 2018-10-25T03:41:47+09:00) Maven home: $HOME/.sdkman/candidates/maven/current Java version: 1.8.0_191, vendor: Oracle Corporation, runtime: /usr/lib/jvm/java-8-openjdk-amd64/jre Default locale: ja_JP, platform encoding: UTF-8 OS name: "linux", version: "4.15.0-43-generic", arch: "amd64", family: "unix"
準備
Maven依存関係は、こちら。
<dependency> <groupId>io.rsocket.rpc</groupId> <artifactId>rsocket-rpc-core</artifactId> <version>0.2.11</version> <exclusions> <exclusion> <groupId>io.rsocket.rpc</groupId> <artifactId>rsocket-rpc-protobuf</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>io.rsocket</groupId> <artifactId>rsocket-transport-netty</artifactId> <version>0.11.13</version> </dependency> <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>3.6.1</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.25</version> </dependency>
「rsocket-rpc-core」と、通信ライブラリとして「rsocket-transport-netty」、そしてProtocol Buffersのライブラリが必要です。
「rsocket-rpc-protobuf」をexcludeしているのは、これを書かないと依存関係の解決に失敗するからです…。
コンパイル時に、こんなエラーを受け取ることになります。
[ERROR] Failed to execute goal on project rsocket-hello-rpc: Could not resolve dependencies for project org.littlewings:rsocket-hello-rpc:jar:0.0.1-SNAPSHOT: Could not find artifact io.rsocket.rpc:rsocket-rpc-protobuf:jar:0.2.11 in central (https://repo.maven.apache.org/maven2) -> [Help 1]
このライブラリがなにかは、次で説明します。
あと、「slf4j-simple」はログ出力用ですね。
そして、Maven Pluginの設定が必要です。
Getting StartedのドキュメントではGradle向けの記述しか書かれていないのですが、
https://github.com/rsocket/rsocket-rpc-java/blob/0.2.11/docs/get-started.md
これをMavenの設定に読み替えるとこんな感じになりました。
<build> <extensions> <extension> <groupId>kr.motd.maven</groupId> <artifactId>os-maven-plugin</artifactId> <version>1.6.1</version> </extension> </extensions> <plugins> <plugin> <groupId>org.xolstice.maven.plugins</groupId> <artifactId>protobuf-maven-plugin</artifactId> <version>0.6.1</version> <configuration> <protocArtifact>com.google.protobuf:protoc:3.6.1:exe:${os.detected.classifier}</protocArtifact> <pluginId>rsocket-rpc</pluginId> <pluginArtifact>io.rsocket.rpc:rsocket-rpc-protobuf:0.2.11:exe:${os.detected.classifier} </pluginArtifact> </configuration> <executions> <execution> <goals> <goal>compile</goal> <goal>compile-custom</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
どこかで見たことがあります。
ほぼ、gRPCの時の設定と同じです。
変えているのは、Protocol BuffersのMaven Pluginに追加している、protocに対するプラグインです。
<configuration> <protocArtifact>com.google.protobuf:protoc:3.6.1:exe:${os.detected.classifier}</protocArtifact> <pluginId>rsocket-rpc</pluginId> <pluginArtifact>io.rsocket.rpc:rsocket-rpc-protobuf:0.2.11:exe:${os.detected.classifier} </pluginArtifact> </configuration>
Maven Protocol Buffers Plugin – Introduction
Maven Centralを見るとわかるのですが、このアーティファクトはexeとして提供されており、protocに組み込まれて動作します。
https://repo1.maven.org/maven2/io/rsocket/rpc/rsocket-rpc-protobuf/0.2.11/
なので、これを依存関係に含めている「rsocket-rpc-core」をそのまま使うと、コンパイルに失敗するという…。
環境的な準備は、ここまでです。
簡単なRPCを書く
ここからは、gRPCの時のエントリと同じ流れで書いていきます。
まずは、ツイートをするとidを付けてレスポンスが戻ってくる、簡単なRPCを書きます。
IDLの定義。
src/main/proto/tweet.proto
syntax = "proto3"; import "google/protobuf/empty.proto"; option java_multiple_files = true; option java_package = "org.littlewings.rsocket.simpletweet"; option java_outer_classname = "SimpleTweetService"; package simpletweet; service SimpleTweeter { rpc Tweet (TweetRequest) returns (TweetResponse) {} } message TweetRequest { string name = 1; string text = 2; int64 time = 3; } message TweetResponse { int64 id = 1; string name = 2; string text = 3; int64 time = 4; }
RPCの定義はこの部分で、
service SimpleTweeter { rpc Tweet (TweetRequest) returns (TweetResponse) {} }
残りは、送受信するメッセージの定義です。
gRPCの時の定義とほぼ同じなのですが、少しだけ変えたところが。
「objc_class_prefix」を省略したことと、以下のimport文を追加したことです。
import "google/protobuf/empty.proto";
これは、今回は使用しませんが、Fire-and-Forgetのパターンの時の、戻り値の定義として使います。
import "google/protobuf/empty.proto"; ... rpc FireAndForget (SimpleRequest) returns (google.protobuf.Empty) {}
$ mvn compile [INFO] Scanning for projects... [INFO] ------------------------------------------------------------------------ [INFO] Detecting the operating system and CPU architecture [INFO] ------------------------------------------------------------------------ [INFO] os.detected.name: linux [INFO] os.detected.arch: x86_64 [INFO] os.detected.version: 4.15 [INFO] os.detected.version.major: 4 [INFO] os.detected.version.minor: 15 [INFO] os.detected.release: ubuntu [INFO] os.detected.release.version: 18.04 [INFO] os.detected.release.like.ubuntu: true [INFO] os.detected.release.like.debian: true [INFO] os.detected.classifier: linux-x86_64 [INFO] [INFO] -----------------< org.littlewings:rsocket-hello-rpc >------------------ [INFO] Building rsocket-hello-rpc 0.0.1-SNAPSHOT [INFO] --------------------------------[ jar ]--------------------------------- [INFO] [INFO] --- protobuf-maven-plugin:0.6.1:compile (default) @ rsocket-hello-rpc --- [INFO] Compiling 2 proto file(s) to /path/to/rsocket-hello-rpc/target/generated-sources/protobuf/java [WARNING] PROTOC: tweet-streaming.proto: warning: Import google/protobuf/empty.proto but not used. tweet.proto: warning: Import google/protobuf/empty.proto but not used. [INFO] [INFO] --- protobuf-maven-plugin:0.6.1:compile-custom (default) @ rsocket-hello-rpc --- [INFO] Compiling 2 proto file(s) to /path/to/target/generated-sources/protobuf/rsocket-rpc [WARNING] PROTOC: tweet-streaming.proto: warning: Import google/protobuf/empty.proto but not used. tweet.proto: warning: Import google/protobuf/empty.proto but not used.
あら、importしたのに使っていないと、警告されるんですね…。
tweet.proto: warning: Import google/protobuf/empty.proto but not used.
まあ、今回はメモの目的も兼ねているので、このままいくことにします。
生成されたソースコードは、今回の設定では「target/generated-sources/protobuf/rsocket-java」と
「target/generated-sources/protobuf/java」にそれぞれ出力されます。
$ find target/generated-sources -type f target/generated-sources/protobuf/rsocket-rpc/org/littlewings/rsocket/simpletweet/BlockingSimpleTweeterClient.java target/generated-sources/protobuf/rsocket-rpc/org/littlewings/rsocket/simpletweet/BlockingSimpleTweeter.java target/generated-sources/protobuf/rsocket-rpc/org/littlewings/rsocket/simpletweet/SimpleTweeterServer.java target/generated-sources/protobuf/rsocket-rpc/org/littlewings/rsocket/simpletweet/SimpleTweeterClient.java target/generated-sources/protobuf/rsocket-rpc/org/littlewings/rsocket/simpletweet/BlockingSimpleTweeterServer.java target/generated-sources/protobuf/rsocket-rpc/org/littlewings/rsocket/simpletweet/SimpleTweeter.java target/generated-sources/protobuf/java/org/littlewings/rsocket/simpletweet/TweetRequest.java target/generated-sources/protobuf/java/org/littlewings/rsocket/simpletweet/SimpleTweetService.java target/generated-sources/protobuf/java/org/littlewings/rsocket/simpletweet/TweetResponseOrBuilder.java target/generated-sources/protobuf/java/org/littlewings/rsocket/simpletweet/TweetRequestOrBuilder.java target/generated-sources/protobuf/java/org/littlewings/rsocket/simpletweet/TweetResponse.java
では、Server側を書いていってみます。
RPCの部分。自動生成されたインターフェースをimplementsします。
src/main/java/org/littlewings/rsocket/rpc/simple/DefaultSimpleTwitter.java
package org.littlewings.rsocket.rpc.simple; import io.netty.buffer.ByteBuf; import org.littlewings.rsocket.simpletweet.SimpleTweeter; import org.littlewings.rsocket.simpletweet.TweetRequest; import org.littlewings.rsocket.simpletweet.TweetResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Mono; public class DefaultSimpleTwitter implements SimpleTweeter { Logger logger = LoggerFactory.getLogger(getClass()); @Override public Mono<TweetResponse> tweet(TweetRequest request, ByteBuf metadata) { logger.info("request: name = {}, text = {}, time = {}", request.getName(), request.getText(), request.getTime()); TweetResponse tweetResponse = TweetResponse .newBuilder() .setId(System.currentTimeMillis()) .setName(request.getName()) .setText(request.getText()) .setTime(request.getTime()) .build(); return Mono.just(tweetResponse); } }
リクエスト/レスポンスのスタイルだと、Monoを返すように実装するようです。
Serverとして起動するクラス。 src/main/java/org/littlewings/rsocket/rpc/simple/SimpleServer.java
package org.littlewings.rsocket.rpc.simple; import java.util.Optional; import io.rsocket.RSocketFactory; import io.rsocket.rpc.rsocket.RequestHandlingRSocket; import io.rsocket.transport.netty.server.CloseableChannel; import io.rsocket.transport.netty.server.TcpServerTransport; import org.littlewings.rsocket.simpletweet.SimpleTweeterServer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Mono; public class SimpleServer { Logger logger = LoggerFactory.getLogger(getClass()); SimpleTweeterServer server; CloseableChannel serverChannel; public static void main(String... args) { SimpleServer simpleServer = new SimpleServer(); simpleServer.start(); System.console().readLine("> Enter stop."); simpleServer.stop(); } public void start() { server = new SimpleTweeterServer(new DefaultSimpleTwitter(), Optional.empty(), Optional.empty()); serverChannel = RSocketFactory .receive() .acceptor((setup, sendingSocket) -> Mono.just(new RequestHandlingRSocket(server))) .transport(TcpServerTransport.create("localhost", 8080)) .start() .log() .block(); logger.info("start RSocket RPC server."); } public void stop() { server.dispose(); serverChannel.dispose(); logger.info("stop RSocket RPC server."); } }
先ほど作成したクラスのインスタンスを生成して、RSocketFactoryからServerを起動します。
server = new SimpleTweeterServer(new DefaultSimpleTwitter(), Optional.empty(), Optional.empty()); serverChannel = RSocketFactory .receive() .acceptor((setup, sendingSocket) -> Mono.just(new RequestHandlingRSocket(server))) .transport(TcpServerTransport.create("localhost", 8080)) .start() .log() .block();
なお、このServerはEnterを押すと停止します。
Client側。
src/main/java/org/littlewings/rsocket/rpc/simple/SimpleClient.java
package org.littlewings.rsocket.rpc.simple; import io.rsocket.RSocket; import io.rsocket.RSocketFactory; import io.rsocket.transport.netty.client.TcpClientTransport; import org.littlewings.rsocket.simpletweet.SimpleTweeterClient; import org.littlewings.rsocket.simpletweet.TweetRequest; import org.littlewings.rsocket.simpletweet.TweetResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class SimpleClient { Logger logger = LoggerFactory.getLogger(getClass()); SimpleTweeterClient client; RSocket rsocket; public static void main(String... args) { SimpleClient simpleClient = SimpleClient.create("localhost", 8080); simpleClient.tweet("磯野 カツオ", "こんにちは!!RSocket RPC!!"); simpleClient.tweet("フグ田 タラオ", "Hello World!!"); simpleClient.shutdown(); } public static SimpleClient create(String host, int port) { SimpleClient simpleClient = new SimpleClient(); RSocket rsocket = RSocketFactory .connect() .transport(TcpClientTransport.create(host, port)) .start() .log() .block(); simpleClient.client = new SimpleTweeterClient(rsocket); simpleClient.rsocket = rsocket; return simpleClient; } public void tweet(String name, String text) { TweetRequest tweetRequest = TweetRequest .newBuilder() .setName(name) .setText(text) .setTime(System.currentTimeMillis()) .build(); TweetResponse tweetResponse = client .tweet(tweetRequest) .block(); logger.info("response: id = {}, name = {}, text = {}, time = {}", tweetResponse.getId(), tweetResponse.getName(), tweetResponse.getText(), tweetResponse.getTime()); } public void shutdown() { rsocket.dispose(); } }
Client側も、やっぱりRSocketFactoryからRSocketを作ります。
RSocket rsocket = RSocketFactory .connect() .transport(TcpClientTransport.create(host, port)) .start() .log() .block();
今回はリクエスト/レスポンスを受け取る時はblockしていますが、Reactorを使ってコードを書きます。
public void tweet(String name, String text) { TweetRequest tweetRequest = TweetRequest .newBuilder() .setName(name) .setText(text) .setTime(System.currentTimeMillis()) .build(); TweetResponse tweetResponse = client .tweet(tweetRequest) .block(); logger.info("response: id = {}, name = {}, text = {}, time = {}", tweetResponse.getId(), tweetResponse.getName(), tweetResponse.getText(), tweetResponse.getTime()); }
では、実行してみましょう。
Serverを起動。
$ mvn compile exec:java -Dexec.mainClass=org.littlewings.rsocket.rpc.simple.SimpleServer
Clientを実行。
$ mvn compile exec:java -Dexec.mainClass=org.littlewings.rsocket.rpc.simple.SimpleClient ... [org.littlewings.rsocket.rpc.simple.SimpleClient.main()] INFO org.littlewings.rsocket.rpc.simple.SimpleClient - response: id = 1545834020324, name = 磯野 カツオ, text = こんにちは!!RSocket RPC!!, time = 1545834020248 [org.littlewings.rsocket.rpc.simple.SimpleClient.main()] INFO org.littlewings.rsocket.rpc.simple.SimpleClient - response: id = 1545834020358, name = フグ田 タラオ, text = Hello World!!, time = 1545834020356
Server側では、こんなログが出力されます。
[reactor-tcp-server-epoll-10] INFO org.littlewings.rsocket.rpc.simple.DefaultSimpleTwitter - request: name = 磯野 カツオ, text = こんにちは!!RSocket RPC!!, time = 1545834020248 [reactor-tcp-server-epoll-10] INFO org.littlewings.rsocket.rpc.simple.DefaultSimpleTwitter - request: name = フグ田 タラオ, text = Hello World!!, time = 1545834020356
OKそうですね。
Streaming(双方向)なRPCを書く
続いて、双方向なRPCを書いてみます。
IDLの定義。
src/main/proto/tweet-streaming.proto
syntax = "proto3"; import "google/protobuf/empty.proto"; option java_multiple_files = true; option java_package = "org.littlewings.rsocket.streamingtweet"; option java_outer_classname = "StreamingTweetService"; package streamingtweet; service StreamingTweeter { rpc TweetStreaming (stream TweetRequest) returns (stream TweetResponse) {} } message TweetRequest { string name = 1; string text = 2; int64 time = 3; } message TweetResponse { int64 id = 1; string name = 2; string text = 3; int64 time = 4; }
RCPの引数と戻り値が、streamになりました。
service StreamingTweeter { rpc TweetStreaming (stream TweetRequest) returns (stream TweetResponse) {} }
これで、双方向のstreamとなります。というか、gRPCの時と同じです。
では、コードを自動生成して
$ mvn compile
Server側を作成しましょう。
まずはRPCの部分。
src/main/java/org/littlewings/rsocket/rpc/streaming/DefaultStreamingTwitter.java
package org.littlewings.rsocket.rpc.streaming; import java.util.ArrayList; import java.util.List; import io.netty.buffer.ByteBuf; import org.littlewings.rsocket.streamingtweet.StreamingTweeter; import org.littlewings.rsocket.streamingtweet.TweetRequest; import org.littlewings.rsocket.streamingtweet.TweetResponse; import org.reactivestreams.Publisher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Flux; public class DefaultStreamingTwitter implements StreamingTweeter { Logger logger = LoggerFactory.getLogger(getClass()); List<TweetResponse> tweets = new ArrayList<>(); @Override public Flux<TweetResponse> tweetStreaming(Publisher<TweetRequest> request, ByteBuf metadata) { return Flux.from(request) .log() .flatMap(tweetRequest -> { logger.info("request: name = {}, text = {}, time = {}", tweetRequest.getName(), tweetRequest.getText(), tweetRequest.getTime()); TweetResponse tweetResponse = TweetResponse .newBuilder() .setId(System.currentTimeMillis()) .setName(tweetRequest.getName()) .setText(tweetRequest.getText()) .setTime(tweetRequest.getTime()) .build(); List<TweetResponse> snapshot; synchronized (this) { tweets.add(tweetResponse); snapshot = new ArrayList<>(tweets); } return Flux.fromIterable(snapshot); }); } }
今度は、メソッドの引数がPublisherに、戻り値がFluxとなります。
また、リクエストの度にツイートを蓄積し、全部返す感じで作成しました。
@Override public Flux<TweetResponse> tweetStreaming(Publisher<TweetRequest> request, ByteBuf metadata) { return Flux.from(request) .log() .flatMap(tweetRequest -> { logger.info("request: name = {}, text = {}, time = {}", tweetRequest.getName(), tweetRequest.getText(), tweetRequest.getTime()); TweetResponse tweetResponse = TweetResponse .newBuilder() .setId(System.currentTimeMillis()) .setName(tweetRequest.getName()) .setText(tweetRequest.getText()) .setTime(tweetRequest.getTime()) .build(); List<TweetResponse> snapshot; synchronized (this) { tweets.add(tweetResponse); snapshot = new ArrayList<>(tweets); } return Flux.fromIterable(snapshot); }); }
Serverを起動するクラス。
src/main/java/org/littlewings/rsocket/rpc/streaming/StreamingServer.java
package org.littlewings.rsocket.rpc.streaming; import java.util.Optional; import io.rsocket.RSocketFactory; import io.rsocket.rpc.rsocket.RequestHandlingRSocket; import io.rsocket.transport.netty.server.CloseableChannel; import io.rsocket.transport.netty.server.TcpServerTransport; import org.littlewings.rsocket.streamingtweet.StreamingTweeterServer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Mono; public class StreamingServer { Logger logger = LoggerFactory.getLogger(getClass()); StreamingTweeterServer server; CloseableChannel serverChannel; public static void main(String... args) { StreamingServer streamingServer = new StreamingServer(); streamingServer.start(); System.console().readLine("> Enter stop."); streamingServer.stop(); } public void start() { server = new StreamingTweeterServer(new DefaultStreamingTwitter(), Optional.empty(), Optional.empty()); serverChannel = RSocketFactory .receive() .acceptor((setup, sendingSocket) -> Mono.just(new RequestHandlingRSocket(server))) .transport(TcpServerTransport.create("localhost", 8080)) .start() .log() .block(); logger.info("start RSocket RPC server."); } public void stop() { server.dispose(); serverChannel.dispose(); logger.info("stop RSocket RPC server."); } }
こちらは、リクエスト/レスポンスのスタイルの時と、ほぼ変わりません。
続いて、Client側。
src/main/java/org/littlewings/rsocket/rpc/streaming/StreamingClient.java
package org.littlewings.rsocket.rpc.streaming; import java.io.Console; import java.util.concurrent.TimeUnit; import io.rsocket.RSocket; import io.rsocket.RSocketFactory; import io.rsocket.transport.netty.client.TcpClientTransport; import org.littlewings.rsocket.streamingtweet.StreamingTweeterClient; import org.littlewings.rsocket.streamingtweet.TweetRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Mono; public class StreamingClient { Logger logger = LoggerFactory.getLogger(getClass()); StreamingTweeterClient client; RSocket rsocket; public static void main(String... args) { StreamingClient simpleClient = StreamingClient.create("localhost", 8080); simpleClient.tweetWhile("磯野 カツオ"); simpleClient.shutdown(); } public static StreamingClient create(String host, int port) { StreamingClient streamingClient = new StreamingClient(); RSocket rsocket = RSocketFactory .connect() .transport(TcpClientTransport.create(host, port)) .start() .log() .block(); streamingClient.client = new StreamingTweeterClient(rsocket); streamingClient.rsocket = rsocket; return streamingClient; } public void tweetWhile(String name) { Console console = System.console(); while (true) { String line = console.readLine("> "); if (line.isEmpty()) { continue; } if (line.equals("exit")) { break; } TweetRequest tweetRequest = TweetRequest .newBuilder() .setName(name) .setText(line) .setTime(System.currentTimeMillis()) .build(); client .tweetStreaming(Mono.just(tweetRequest)) .log() .subscribe(tweetResponse -> logger.info("response: id = {}, name = {}, text = {}, time = {}", tweetResponse.getId(), tweetResponse.getName(), tweetResponse.getText(), tweetResponse.getTime()) ); try { TimeUnit.SECONDS.sleep(1L); } catch (InterruptedException e) { // ignore } } logger.info("request completed!!"); } public void shutdown() { rsocket.dispose(); } }
リクエストの送信については、固定の名前でツイートし続けるものに。まあ、gRPCの時のサンプルと同じです。
public void tweetWhile(String name) { Console console = System.console(); while (true) { String line = console.readLine("> "); if (line.isEmpty()) { continue; } if (line.equals("exit")) { break; } TweetRequest tweetRequest = TweetRequest .newBuilder() .setName(name) .setText(line) .setTime(System.currentTimeMillis()) .build(); client .tweetStreaming(Mono.just(tweetRequest)) .log() .subscribe(tweetResponse -> logger.info("response: id = {}, name = {}, text = {}, time = {}", tweetResponse.getId(), tweetResponse.getName(), tweetResponse.getText(), tweetResponse.getTime()) ); try { TimeUnit.SECONDS.sleep(1L); } catch (InterruptedException e) { // ignore } } logger.info("request completed!!"); }
RSocketを作るところは、リクエスト/レスポンススタイルの時と変わりません。
なお、「exit」と入力すると終了します。
では、確認してみます。Server側を起動。
$ mvn compile exec:java -Dexec.mainClass=org.littlewings.rsocket.rpc.streaming.StreamingServer
Client側を起動。
$ mvn compile exec:java -Dexec.mainClass=org.littlewings.rsocket.rpc.streaming.StreamingClient ... [reactor-tcp-client-epoll-12] INFO reactor.Mono.FlatMap.1 - | onNext(io.rsocket.RSocketClient@52a40755) [reactor-tcp-client-epoll-12] INFO reactor.Mono.FlatMap.1 - | onComplete() >
待機状態になるので、適当にツイートしてみます。
> こんにちは、RSockeet RPC > こんにちは、RSocket RPC Java [org.littlewings.rsocket.rpc.streaming.StreamingClient.main()] INFO reactor.Flux.Map.2 - onSubscribe(FluxMap.MapSubscriber) [org.littlewings.rsocket.rpc.streaming.StreamingClient.main()] INFO reactor.Flux.Map.2 - request(unbounded) [reactor-tcp-client-epoll-12] INFO reactor.Flux.Map.2 - onNext(id: 1545835418281 name: "\347\243\257\351\207\216 \343\202\253\343\203\204\343\202\252" text: "\343\201\223\343\202\223\343\201\253\343\201\241\343\201\257\343\200\201RSocket RPC Java" time: 1545835418155 ) [reactor-tcp-client-epoll-12] INFO org.littlewings.rsocket.rpc.streaming.StreamingClient - response: id = 1545835418281, name = 磯野 カツオ, text = こんにちは、RSocket RPC Java, time = 1545835418155 [reactor-tcp-client-epoll-12] INFO reactor.Flux.Map.2 - onComplete() > JavaでRSocket RPC [org.littlewings.rsocket.rpc.streaming.StreamingClient.main()] INFO reactor.Flux.Map.3 - onSubscribe(FluxMap.MapSubscriber) [org.littlewings.rsocket.rpc.streaming.StreamingClient.main()] INFO reactor.Flux.Map.3 - request(unbounded) [reactor-tcp-client-epoll-12] INFO reactor.Flux.Map.3 - onNext(id: 1545835418281 name: "\347\243\257\351\207\216 \343\202\253\343\203\204\343\202\252" text: "\343\201\223\343\202\223\343\201\253\343\201\241\343\201\257\343\200\201RSocket RPC Java" time: 1545835418155 ) [reactor-tcp-client-epoll-12] INFO org.littlewings.rsocket.rpc.streaming.StreamingClient - response: id = 1545835418281, name = 磯野 カツオ, text = こんにちは、RSocket RPC Java, time = 1545835418155 [reactor-tcp-client-epoll-12] INFO reactor.Flux.Map.3 - onNext(id: 1545835428272 name: "\347\243\257\351\207\216 \343\202\253\343\203\204\343\202\252" text: "Java\343\201\247RSocket RPC" time: 1545835428264 ) [reactor-tcp-client-epoll-12] INFO org.littlewings.rsocket.rpc.streaming.StreamingClient - response: id = 1545835428272, name = 磯野 カツオ, text = JavaでRSocket RPC, time = 1545835428264 [reactor-tcp-client-epoll-12] INFO reactor.Flux.Map.3 - onComplete()
メッセージを送ると、サーバー側からはこれまでに送ったツイート+今回のツイートが返ってきます。
Server側のログ。
[reactor-tcp-server-epoll-10] INFO org.littlewings.rsocket.rpc.streaming.DefaultStreamingTwitter - request: name = 磯野 カツオ, text = こんにちは、RSocket RPC Java, time = 1545835418155 [reactor-tcp-server-epoll-10] INFO reactor.Flux.Map.2 - request(1) [reactor-tcp-server-epoll-10] INFO reactor.Flux.Map.2 - onComplete() [reactor-tcp-server-epoll-10] INFO reactor.Flux.Map.3 - onSubscribe(FluxMap.MapSubscriber) [reactor-tcp-server-epoll-10] INFO reactor.Flux.Map.3 - request(256) [reactor-tcp-server-epoll-10] INFO reactor.Flux.Map.3 - onNext(name: "\347\243\257\351\207\216 \343\202\253\343\203\204\343\202\252" text: "Java\343\201\247RSocket RPC" time: 1545835428264 ) [reactor-tcp-server-epoll-10] INFO org.littlewings.rsocket.rpc.streaming.DefaultStreamingTwitter - request: name = 磯野 カツオ, text = JavaでRSocket RPC, time = 1545835428264 [reactor-tcp-server-epoll-10] INFO reactor.Flux.Map.3 - request(1) [reactor-tcp-server-epoll-10] INFO reactor.Flux.Map.3 - onComplete()
これで、双方向のStreamingが確認できました、と。
まとめ
RSocket RPC Javaを試してみました。
さらっと書いた感じでは、Reactorで書けるgRPCですね。今回は、エラー処理とかほぼ書いていないので、gRPCより
すっきりしたように見えてしまいますが…。
gRPCとの距離感は、どうなんでしょうね。Springにも入りそうな感じですし、こちらもちょいちょい見ておきましょうか。