これは、なにをしたくて書いたもの?
- ちょっと前からRSocketというものを聞いていたので、軽く試してみようと
- 現時点のリリースバージョンはまだ低そうな感じなので、素振り的に?
- お題をEchoで
- Reactorのリハビリも兼ねて
というわけで、RSocketを試してみます。
RSocketとは?
RSocketのオフィシャルサイトは、こちら。
これはなんですか?ということなのですが、
- TCP、WebSocket、Aeronをトランスポートに使う、バイナリプロトコル
- Reactive Streams対応
- 多言語対応(Java、JavaScript、C++、Kotlin)
- 単一のコネクションで、以下のメッセージのやり取りの形態をサポート
- セッションの再開をサポート
- 長時間のストリームを扱える
- ネットワークが頻繁に切断、切り替え、接続が発生するモバイル環境で便利なのでは
4つのメッセージ送受信の形態は、プロトコルのスペックの以下の部分を見るとわかりやすいと思います。
Stream Sequences and Lifetimes
Springなどでの情報でも、ちょいちょい出てきます。
The Reactive Revolution at SpringOne Platform 2018 (part 1/N)
Project Reactor Now and Tomorrow
Spring 5.2で、RSocketのサポートも入りそうな感じですね。
[SPR-16751] RSocket client and server support - Spring JIRA
日本語情報。
RSocket をちょっとだけ試してみた - mike-neckのブログ
今回は、RSocketのJava実装を使って遊んでみることにします。
GitHub - rsocket/rsocket-java: Java implementation of RSocket
RSocket Javaは、Reactorを使っており、トランスポートにはReactor Netty、そしてAeronを使用します。
はて、Reactor Netty単体となにが違うんだっけ…って思いましたが、そもそもプロトコルとして定義してあること、
多言語対応と双方向の通信サポートというところなのかな。
Aeron?
個人的に、ちょっと気になったのがAeronというプロトコル。
Reliable UDPなライブラリなんですねぇ。
Efficient reliable UDP unicast, UDP multicast, and IPC message transport
GitHub - real-logic/aeron: Efficient reliable UDP unicast, UDP multicast, and IPC message transport
作者をよくよく見ると、LMAX Disruptorを作った人と同じみたいです。
RSocket JavaでAeronを使うには、RSocket Transport Aeronが必要なようですが、最近別リポジトリになったみたいで
まだリリースされていません。
GitHub - rsocket/rsocket-transport-aeron: Aeron Transport for RSocket
Move aeron transport to separate repo by rdegnan · Pull Request #538 · rsocket/rsocket-java · GitHub
仕方がありません、今回はパスですね。
とまあ、前置きはこれくらいにして、軽く試してみましょう。
環境
今回の環境は、こちら。
$ java -version openjdk version "1.8.0_181" OpenJDK Runtime Environment (build 1.8.0_181-8u181-b13-1ubuntu0.18.04.1-b13) OpenJDK 64-Bit Server VM (build 25.181-b13, mixed mode) $ mvn -version Apache Maven 3.6.0 (97c98ec64a1fdfee7767ce5ffb20918da4f719f3; 2018-10-25T03:41:47+09:00) Maven home: $HOME/.sdkman/candidates/maven/current Java version: 1.8.0_181, vendor: Oracle Corporation, runtime: /usr/lib/jvm/java-8-openjdk-amd64/jre Default locale: ja_JP, platform encoding: UTF-8 OS name: "linux", version: "4.15.0-38-generic", arch: "amd64", family: "unix"
準備
Maven依存関係は、こちら。
<dependency> <groupId>io.rsocket</groupId> <artifactId>rsocket-core</artifactId> <version>0.11.12</version> </dependency> <dependency> <groupId>io.rsocket</groupId> <artifactId>rsocket-transport-netty</artifactId> <version>0.11.12</version> </dependency>
RSocket Javaのドキュメントに習い、rsocket-coreとrsocket-transport-nettyを追加。rsocket-transport-nettyはrsocket-coreに
依存しているので、rsocket-coreの方はなくてもいいのですが…。
バージョンは、0.11.12です。まだまだ低いので、軽く触り的な感じでいきましょう。
テストライブラリ。
<dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter-api</artifactId> <version>5.3.1</version> <scope>test</scope> </dependency> <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter-engine</artifactId> <version>5.3.1</version> <scope>test</scope> </dependency> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-test</artifactId> <version>3.2.1.RELEASE</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.25</version> <scope>test</scope> </dependency>
RSocket Javaが依存しているバージョンのReactorのテストライブラリと、JUnit 5を追加。あと、ログ出力にslf4j-simple。
Exampleがあるので、こちらを参考に書いていきます。
https://github.com/rsocket/rsocket-java/tree/0.11.12/rsocket-examples
ソースコードの雛形
ソースコードの雛形は、こちら。
src/test/java/org/littlewings/rsocket/echo/EchoTest.java
package org.littlewings.rsocket.echo; import java.time.Duration; import io.rsocket.AbstractRSocket; import io.rsocket.Payload; import io.rsocket.RSocket; import io.rsocket.RSocketFactory; import io.rsocket.transport.netty.client.TcpClientTransport; import io.rsocket.transport.netty.client.WebsocketClientTransport; import io.rsocket.transport.netty.server.CloseableChannel; import io.rsocket.transport.netty.server.TcpServerTransport; import io.rsocket.transport.netty.server.WebsocketServerTransport; import io.rsocket.util.DefaultPayload; import org.junit.jupiter.api.Test; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; public class EchoTest { // ここに、テストを書く!! }
以降で、中身を埋めていきます。
TCPでRequest/Responseスタイル
最初は、TCPを使ってRequest/ResponseスタイルでEcho Server/Clientを書いていきます。
要するに、これをテストコードで表現したものです。
返すメッセージには、ほんの少しの加工を加えますが。
作成したソースコードは、こちら。
@Test public void tcpRequestResponse() { Mono<CloseableChannel> server = RSocketFactory .receive() .acceptor((setupPayload, sendingSocket) -> Mono.just(new AbstractRSocket() { @Override public Mono<Payload> requestResponse(Payload payload) { return Mono.just(DefaultPayload.create("★★★" + payload.getDataUtf8() + "★★★")); } }) ) .transport(TcpServerTransport.create("localhost", 5000)) .start() .log("server"); // server.subscribe(); CloseableChannel serverChannel = server.block(); Mono<RSocket> client = RSocketFactory .connect() .transport(TcpClientTransport.create("localhost", 5000)) .start() .log("client"); StepVerifier .create(client .flatMap(rsocket -> rsocket.requestResponse(DefaultPayload.create("Hello RSocket!! / TCP")) .map(payload -> payload.getDataUtf8()))) .expectNext("★★★Hello RSocket!! / TCP★★★") .verifyComplete(); serverChannel.dispose(); }
この部分が、サーバーの定義になります。
Mono<CloseableChannel> server = RSocketFactory .receive() .acceptor((setupPayload, sendingSocket) -> Mono.just(new AbstractRSocket() { @Override public Mono<Payload> requestResponse(Payload payload) { return Mono.just(DefaultPayload.create("★★★" + payload.getDataUtf8() + "★★★")); } }) ) .transport(TcpServerTransport.create("localhost", 5000)) .start() .log("server");
acceptorメソッドにはSocketAcceptorを渡すのですが、その時にRSocketをメソッドの戻り値として渡す必要があります。
※Lambda式で書いていますが、これはSocketAcceptor#acceptをオーバーライドしたものです
.acceptor((setupPayload, sendingSocket) -> Mono.just(new AbstractRSocket() { @Override public Mono<Payload> requestResponse(Payload payload) { return Mono.just(DefaultPayload.create("★★★" + payload.getDataUtf8() + "★★★")); } }) )
で、RSocketのインスタンスを作成するのは、AbstractRSocketのサブクラスを作成することで行っていますが、このメソッドのうち
どれをオーバーライドしているかで扱えるメッセージのやり取りの形態が決まります。
今回は、requestResponseメソッドをオーバーライドしているので、Request/Responseスタイルを扱えます。
レスポンスは、Payloadとして返すことになります。また、Payload#getDataUtf8でデータをStringとして取り出せます。
レスポンスの文字列には、「★」を加えておきました。
return Mono.just(DefaultPayload.create("★★★" + payload.getDataUtf8() + "★★★"));
バインドするアドレスとポートは、transportで設定。今回はTCPなので、TcpServerTransportを使用します。
.transport(TcpServerTransport.create("localhost", 5000))
で、blockでCloseableChannelを取り出してサーバーの起動。コメントアウトしているようにsubscribeでも起動するのですが、
今回は別のサンプルも書くので、CloseableChannelを使わないと停止できないみたいで…。
// server.subscribe();
CloseableChannel serverChannel = server.block();
クライアント側。
Mono<RSocket> client = RSocketFactory .connect() .transport(TcpClientTransport.create("localhost", 5000)) .start() .log("client"); StepVerifier .create(client .flatMap(rsocket -> rsocket.requestResponse(DefaultPayload.create("Hello RSocket!! / TCP")) .map(payload -> payload.getDataUtf8()))) .expectNext("★★★Hello RSocket!! / TCP★★★") .verifyComplete();
メッセージの送信は、RSocket#requestResponseとなります。
最後に、サーバーで使ったCloseableChannelを破棄して終了です。
serverChannel.dispose();
クライアントの停止は、今回はパス…。
これで、とりあえずEchoっぽいものが書けました。
WebSocketにしてみる
次に、先ほどの例をWebSocketに書き換えてみましょう。
transportの部分を、WebsocketServerTransportとWebsocketClientTransportに変更すると、書き換えは終わったりします。
@Test public void websocketRequestResponse() { Mono<CloseableChannel> server = RSocketFactory .receive() .acceptor((setupPayload, sendingSocket) -> Mono.just(new AbstractRSocket() { @Override public Mono<Payload> requestResponse(Payload payload) { return Mono.just(DefaultPayload.create("★★★" + payload.getDataUtf8() + "★★★")); } }) ) .transport(WebsocketServerTransport.create("localhost", 8080)) .start() .log("server"); // server.subscribe(); CloseableChannel serverChannel = server.block(); Mono<RSocket> client = RSocketFactory .connect() .transport(WebsocketClientTransport.create("localhost", 8080)) .start() .log("client"); StepVerifier .create(client .flatMap(rsocket -> rsocket.requestResponse(DefaultPayload.create("Hello RSocket!! / WebSocket")) .map(payload -> payload.getDataUtf8()))) .expectNext("★★★Hello RSocket!! / WebSocket★★★") .verifyComplete(); serverChannel.dispose(); }
構築済みのHttpServerとルーティング定義を使う、WebsocketRouteTransportというものもあるようですが、パス…。
Request Streamを扱う
最後に再度TCPに戻って、メッセージの送受信形態のひとつである、Request Streamを扱ってみましょう。
結果は、このように。
@Test public void tcpRequestStream() { Mono<CloseableChannel> server = RSocketFactory .receive() .acceptor((setupPayload, sendingSocket) -> Mono.just(new AbstractRSocket() { @Override public Flux<Payload> requestStream(Payload payload) { return Flux .interval(Duration.ofMillis(100L)) .map(count -> DefaultPayload.create("★★★" + payload.getDataUtf8() + " / " + (count + 1) + "★★★")) .take(5); } }) ) .transport(TcpServerTransport.create("localhost", 5000)) .start() .log("server"); // server.subscribe(); CloseableChannel serverChannel = server.block(); Mono<RSocket> client = RSocketFactory .connect() .transport(TcpClientTransport.create("localhost", 5000)) .start() .log("client"); StepVerifier .create(Flux .from(client) .flatMap(rsocket -> rsocket.requestStream(DefaultPayload.create("Hello RSocket!! / TCP"))) .map(Payload::getDataUtf8)) .expectNext("★★★Hello RSocket!! / TCP / 1★★★") .expectNext("★★★Hello RSocket!! / TCP / 2★★★") .expectNext("★★★Hello RSocket!! / TCP / 3★★★") .expectNext("★★★Hello RSocket!! / TCP / 4★★★") .expectNext("★★★Hello RSocket!! / TCP / 5★★★") .verifyComplete(); serverChannel.dispose(); }
サーバー側のacceptorで、AbstractRSocketのオーバーライドするメソッドがrequestStreamとなり、Fluxを返すようになりました。
.acceptor((setupPayload, sendingSocket) -> Mono.just(new AbstractRSocket() { @Override public Flux<Payload> requestStream(Payload payload) { return Flux .interval(Duration.ofMillis(100L)) .map(count -> DefaultPayload.create("★★★" + payload.getDataUtf8() + " / " + (count + 1) + "★★★")) .take(5); } }) )
今回、100ミリ秒おきに5回メッセージを返すようにしています。
よって、クライアント側が受け取るのもFluxとなります。RSocketの使うメソッドも、requestStreamとなります。
StepVerifier .create(Flux .from(client) .flatMap(rsocket -> rsocket.requestStream(DefaultPayload.create("Hello RSocket!! / TCP"))) .map(Payload::getDataUtf8)) .expectNext("★★★Hello RSocket!! / TCP / 1★★★") .expectNext("★★★Hello RSocket!! / TCP / 2★★★") .expectNext("★★★Hello RSocket!! / TCP / 3★★★") .expectNext("★★★Hello RSocket!! / TCP / 4★★★") .expectNext("★★★Hello RSocket!! / TCP / 5★★★") .verifyComplete();
また、クライアントの構築時にRSocketはMonoとして受け取っているので
Mono<RSocket> client = ...
1度Fluxに変換してあります。
StepVerifier .create(Flux .from(client) .flatMap(rsocket -> rsocket.requestStream(DefaultPayload.create("Hello RSocket!! / TCP"))) .map(Payload::getDataUtf8))
こんなところですね。
まとめ
RSocket Javaを軽く試してみました。Reactorを忘れかかっていたので、けっこうてこずりました…。
他の言語でも扱えますし、RPCっぽいものもあるみたいなので、少し見てみてもいいかもですね。