CLOVER🍀

That was when it all began.

RSocket Java(0.11)でEcho Server/Clientっぽいものを書いてみる

これは、なにをしたくて書いたもの?

  • ちょっと前からRSocketというものを聞いていたので、軽く試してみようと
  • 現時点のリリースバージョンはまだ低そうな感じなので、素振り的に?
  • お題をEchoで
  • Reactorのリハビリも兼ねて

というわけで、RSocketを試してみます。

RSocketとは?

RSocketのオフィシャルサイトは、こちら。

RSocket

これはなんですか?ということなのですが、

  • TCP、WebSocket、Aeronをトランスポートに使う、バイナリプロトコル
  • Reactive Streams対応
  • 多言語対応(JavaJavaScriptC++、Kotlin)
  • 単一のコネクションで、以下のメッセージのやり取りの形態をサポート
    • Request - Response(リクエスト、レスポンスが1対1)
    • Request Stream(リクエストひとつに対して、レスポンスがストリームで戻ってくる)
    • Fire and Forget(リクエストを投げるのみ、レスポンスは気にしない)
    • Channel(双方向のストリーム)
  • セッションの再開をサポート
    • 長時間のストリームを扱える
    • ネットワークが頻繁に切断、切り替え、接続が発生するモバイル環境で便利なのでは

4つのメッセージ送受信の形態は、プロトコルのスペックの以下の部分を見るとわかりやすいと思います。

Stream Sequences and Lifetimes

Springなどでの情報でも、ちょいちょい出てきます。

The Reactive Revolution at SpringOne Platform 2018 (part 1/N)

Project Reactor Now and Tomorrow

Spring 5.2で、RSocketのサポートも入りそうな感じですね。

[SPR-16751] RSocket client and server support - Spring JIRA

日本語情報。

RSocket をちょっとだけ試してみた - mike-neckのブログ

今回は、RSocketのJava実装を使って遊んでみることにします。

GitHub - rsocket/rsocket-java: Java implementation of RSocket

RSocket Javaは、Reactorを使っており、トランスポートにはReactor Netty、そしてAeronを使用します。

はて、Reactor Netty単体となにが違うんだっけ…って思いましたが、そもそもプロトコルとして定義してあること、
多言語対応と双方向の通信サポートというところなのかな。

Aeron?

個人的に、ちょっと気になったのがAeronというプロトコル

Reliable UDPなライブラリなんですねぇ。

Efficient reliable UDP unicast, UDP multicast, and IPC message transport

GitHub - real-logic/aeron: Efficient reliable UDP unicast, UDP multicast, and IPC message transport

作者をよくよく見ると、LMAX Disruptorを作った人と同じみたいです。

Disruptor by LMAX-Exchange

RSocket JavaでAeronを使うには、RSocket Transport Aeronが必要なようですが、最近別リポジトリになったみたいで
まだリリースされていません。

GitHub - rsocket/rsocket-transport-aeron: Aeron Transport for RSocket

Move aeron transport to separate repo by rdegnan · Pull Request #538 · rsocket/rsocket-java · GitHub

仕方がありません、今回はパスですね。

とまあ、前置きはこれくらいにして、軽く試してみましょう。

環境

今回の環境は、こちら。

$ java -version
openjdk version "1.8.0_181"
OpenJDK Runtime Environment (build 1.8.0_181-8u181-b13-1ubuntu0.18.04.1-b13)
OpenJDK 64-Bit Server VM (build 25.181-b13, mixed mode)


$ mvn -version
Apache Maven 3.6.0 (97c98ec64a1fdfee7767ce5ffb20918da4f719f3; 2018-10-25T03:41:47+09:00)
Maven home: $HOME/.sdkman/candidates/maven/current
Java version: 1.8.0_181, vendor: Oracle Corporation, runtime: /usr/lib/jvm/java-8-openjdk-amd64/jre
Default locale: ja_JP, platform encoding: UTF-8
OS name: "linux", version: "4.15.0-38-generic", arch: "amd64", family: "unix"

準備

Maven依存関係は、こちら。

        <dependency>
            <groupId>io.rsocket</groupId>
            <artifactId>rsocket-core</artifactId>
            <version>0.11.12</version>
        </dependency>
        <dependency>
            <groupId>io.rsocket</groupId>
            <artifactId>rsocket-transport-netty</artifactId>
            <version>0.11.12</version>
        </dependency>

RSocket Javaのドキュメントに習い、rsocket-coreとrsocket-transport-nettyを追加。rsocket-transport-nettyはrsocket-coreに
依存しているので、rsocket-coreの方はなくてもいいのですが…。

バージョンは、0.11.12です。まだまだ低いので、軽く触り的な感じでいきましょう。

テストライブラリ。

        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-api</artifactId>
            <version>5.3.1</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-engine</artifactId>
            <version>5.3.1</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-test</artifactId>
            <version>3.2.1.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.25</version>
            <scope>test</scope>
        </dependency>

RSocket Javaが依存しているバージョンのReactorのテストライブラリと、JUnit 5を追加。あと、ログ出力にslf4j-simple。

Exampleがあるので、こちらを参考に書いていきます。

https://github.com/rsocket/rsocket-java/tree/0.11.12/rsocket-examples

ソースコードの雛形

ソースコードの雛形は、こちら。
src/test/java/org/littlewings/rsocket/echo/EchoTest.java

package org.littlewings.rsocket.echo;

import java.time.Duration;

import io.rsocket.AbstractRSocket;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.transport.netty.client.WebsocketClientTransport;
import io.rsocket.transport.netty.server.CloseableChannel;
import io.rsocket.transport.netty.server.TcpServerTransport;
import io.rsocket.transport.netty.server.WebsocketServerTransport;
import io.rsocket.util.DefaultPayload;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

public class EchoTest {

    // ここに、テストを書く!!
}

以降で、中身を埋めていきます。

TCPでRequest/Responseスタイル

最初は、TCPを使ってRequest/ResponseスタイルでEcho Server/Clientを書いていきます。

要するに、これをテストコードで表現したものです。

https://github.com/rsocket/rsocket-java/blob/0.11.12/rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/requestresponse/HelloWorldClient.java

返すメッセージには、ほんの少しの加工を加えますが。

作成したソースコードは、こちら。

    @Test
    public void tcpRequestResponse() {
        Mono<CloseableChannel> server =
                RSocketFactory
                        .receive()
                        .acceptor((setupPayload, sendingSocket) ->
                                Mono.just(new AbstractRSocket() {
                                    @Override
                                    public Mono<Payload> requestResponse(Payload payload) {
                                        return Mono.just(DefaultPayload.create("★★★" + payload.getDataUtf8() + "★★★"));
                                    }
                                })
                        )
                        .transport(TcpServerTransport.create("localhost", 5000))
                        .start()
                        .log("server");

        // server.subscribe();
        CloseableChannel serverChannel = server.block();

        Mono<RSocket> client =
                RSocketFactory
                        .connect()
                        .transport(TcpClientTransport.create("localhost", 5000))
                        .start()
                        .log("client");

        StepVerifier
                .create(client
                        .flatMap(rsocket -> rsocket.requestResponse(DefaultPayload.create("Hello RSocket!! / TCP"))
                                .map(payload -> payload.getDataUtf8())))
                .expectNext("★★★Hello RSocket!! / TCP★★★")
                .verifyComplete();

        serverChannel.dispose();
    }

この部分が、サーバーの定義になります。

        Mono<CloseableChannel> server =
                RSocketFactory
                        .receive()
                        .acceptor((setupPayload, sendingSocket) ->
                                Mono.just(new AbstractRSocket() {
                                    @Override
                                    public Mono<Payload> requestResponse(Payload payload) {
                                        return Mono.just(DefaultPayload.create("★★★" + payload.getDataUtf8() + "★★★"));
                                    }
                                })
                        )
                        .transport(TcpServerTransport.create("localhost", 5000))
                        .start()
                        .log("server");

acceptorメソッドにはSocketAcceptorを渡すのですが、その時にRSocketをメソッドの戻り値として渡す必要があります。
※Lambda式で書いていますが、これはSocketAcceptor#acceptをオーバーライドしたものです

                        .acceptor((setupPayload, sendingSocket) ->
                                Mono.just(new AbstractRSocket() {
                                    @Override
                                    public Mono<Payload> requestResponse(Payload payload) {
                                        return Mono.just(DefaultPayload.create("★★★" + payload.getDataUtf8() + "★★★"));
                                    }
                                })
                        )

で、RSocketのインスタンスを作成するのは、AbstractRSocketのサブクラスを作成することで行っていますが、このメソッドのうち
どれをオーバーライドしているかで扱えるメッセージのやり取りの形態が決まります。

今回は、requestResponseメソッドをオーバーライドしているので、Request/Responseスタイルを扱えます。

レスポンスは、Payloadとして返すことになります。また、Payload#getDataUtf8でデータをStringとして取り出せます。
レスポンスの文字列には、「★」を加えておきました。

                                        return Mono.just(DefaultPayload.create("★★★" + payload.getDataUtf8() + "★★★"));

バインドするアドレスとポートは、transportで設定。今回はTCPなので、TcpServerTransportを使用します。

                        .transport(TcpServerTransport.create("localhost", 5000))

で、blockでCloseableChannelを取り出してサーバーの起動。コメントアウトしているようにsubscribeでも起動するのですが、
今回は別のサンプルも書くので、CloseableChannelを使わないと停止できないみたいで…。

        // server.subscribe();
        CloseableChannel serverChannel = server.block();

クライアント側。

        Mono<RSocket> client =
                RSocketFactory
                        .connect()
                        .transport(TcpClientTransport.create("localhost", 5000))
                        .start()
                        .log("client");

        StepVerifier
                .create(client
                        .flatMap(rsocket -> rsocket.requestResponse(DefaultPayload.create("Hello RSocket!! / TCP"))
                                .map(payload -> payload.getDataUtf8())))
                .expectNext("★★★Hello RSocket!! / TCP★★★")
                .verifyComplete();

メッセージの送信は、RSocket#requestResponseとなります。

最後に、サーバーで使ったCloseableChannelを破棄して終了です。

        serverChannel.dispose();

クライアントの停止は、今回はパス…。

これで、とりあえずEchoっぽいものが書けました。

WebSocketにしてみる

次に、先ほどの例をWebSocketに書き換えてみましょう。

transportの部分を、WebsocketServerTransportとWebsocketClientTransportに変更すると、書き換えは終わったりします。

    @Test
    public void websocketRequestResponse() {
        Mono<CloseableChannel> server =
                RSocketFactory
                        .receive()
                        .acceptor((setupPayload, sendingSocket) ->
                                Mono.just(new AbstractRSocket() {
                                    @Override
                                    public Mono<Payload> requestResponse(Payload payload) {
                                        return Mono.just(DefaultPayload.create("★★★" + payload.getDataUtf8() + "★★★"));
                                    }
                                })
                        )
                        .transport(WebsocketServerTransport.create("localhost", 8080))
                        .start()
                        .log("server");

        // server.subscribe();
        CloseableChannel serverChannel = server.block();


        Mono<RSocket> client =
                RSocketFactory
                        .connect()
                        .transport(WebsocketClientTransport.create("localhost", 8080))
                        .start()
                        .log("client");

        StepVerifier
                .create(client
                        .flatMap(rsocket -> rsocket.requestResponse(DefaultPayload.create("Hello RSocket!! / WebSocket"))
                                .map(payload -> payload.getDataUtf8())))
                .expectNext("★★★Hello RSocket!! / WebSocket★★★")
                .verifyComplete();

        serverChannel.dispose();
    }

構築済みのHttpServerとルーティング定義を使う、WebsocketRouteTransportというものもあるようですが、パス…。

Request Streamを扱う

最後に再度TCPに戻って、メッセージの送受信形態のひとつである、Request Streamを扱ってみましょう。

結果は、このように。

    @Test
    public void tcpRequestStream() {
        Mono<CloseableChannel> server =
                RSocketFactory
                        .receive()
                        .acceptor((setupPayload, sendingSocket) ->
                                Mono.just(new AbstractRSocket() {
                                    @Override
                                    public Flux<Payload> requestStream(Payload payload) {
                                        return Flux
                                                .interval(Duration.ofMillis(100L))
                                                .map(count -> DefaultPayload.create("★★★" + payload.getDataUtf8() + " / " + (count + 1) + "★★★"))
                                                .take(5);
                                    }
                                })
                        )
                        .transport(TcpServerTransport.create("localhost", 5000))
                        .start()
                        .log("server");

        // server.subscribe();
        CloseableChannel serverChannel = server.block();

        Mono<RSocket> client =
                RSocketFactory
                        .connect()
                        .transport(TcpClientTransport.create("localhost", 5000))
                        .start()
                        .log("client");

        StepVerifier
                .create(Flux
                        .from(client)
                        .flatMap(rsocket -> rsocket.requestStream(DefaultPayload.create("Hello RSocket!! / TCP")))
                        .map(Payload::getDataUtf8))
                .expectNext("★★★Hello RSocket!! / TCP / 1★★★")
                .expectNext("★★★Hello RSocket!! / TCP / 2★★★")
                .expectNext("★★★Hello RSocket!! / TCP / 3★★★")
                .expectNext("★★★Hello RSocket!! / TCP / 4★★★")
                .expectNext("★★★Hello RSocket!! / TCP / 5★★★")
                .verifyComplete();

        serverChannel.dispose();
    }

サーバー側のacceptorで、AbstractRSocketのオーバーライドするメソッドがrequestStreamとなり、Fluxを返すようになりました。

                        .acceptor((setupPayload, sendingSocket) ->
                                Mono.just(new AbstractRSocket() {
                                    @Override
                                    public Flux<Payload> requestStream(Payload payload) {
                                        return Flux
                                                .interval(Duration.ofMillis(100L))
                                                .map(count -> DefaultPayload.create("★★★" + payload.getDataUtf8() + " / " + (count + 1) + "★★★"))
                                                .take(5);
                                    }
                                })
                        )

今回、100ミリ秒おきに5回メッセージを返すようにしています。

よって、クライアント側が受け取るのもFluxとなります。RSocketの使うメソッドも、requestStreamとなります。

        StepVerifier
                .create(Flux
                        .from(client)
                        .flatMap(rsocket -> rsocket.requestStream(DefaultPayload.create("Hello RSocket!! / TCP")))
                        .map(Payload::getDataUtf8))
                .expectNext("★★★Hello RSocket!! / TCP / 1★★★")
                .expectNext("★★★Hello RSocket!! / TCP / 2★★★")
                .expectNext("★★★Hello RSocket!! / TCP / 3★★★")
                .expectNext("★★★Hello RSocket!! / TCP / 4★★★")
                .expectNext("★★★Hello RSocket!! / TCP / 5★★★")
                .verifyComplete();

また、クライアントの構築時にRSocketはMonoとして受け取っているので

        Mono<RSocket> client = ...

1度Fluxに変換してあります。

        StepVerifier
                .create(Flux
                        .from(client)
                        .flatMap(rsocket -> rsocket.requestStream(DefaultPayload.create("Hello RSocket!! / TCP")))
                        .map(Payload::getDataUtf8))

こんなところですね。

まとめ

RSocket Javaを軽く試してみました。Reactorを忘れかかっていたので、けっこうてこずりました…。

他の言語でも扱えますし、RPCっぽいものもあるみたいなので、少し見てみてもいいかもですね。