CLOVER🍀

That was when it all began.

はじめてのRSocket RPC Java(0.2)

これは、なにをしたくて書いたもの?

  • RSocketに、Protocol Buffersを使ったRPCがあると聞いて
  • ちょっと試してみようかと

RSocket RPC?

RSocket上で動作する、RPCの仕組みです。

RSocket

RSocket自体は、以前に軽く試してみたことがあります。

RSocket Java(0.11)でEcho Server/Clientっぽいものを書いてみる - CLOVER🍀

RSocket RPCでは、Protocol Buffers 3を使い、リクエスト/レスポンスという形態だけではなく、ストリーム(双方向含む)も
扱うことができます。

RSocket RPCは複数の言語に対応していますが、現時点で作られているリポジトリは、以下のようです。

GitHub - rsocket/rsocket-rpc-java: Standard RSocket RPC Java Implementation

GitHub - rsocket/rsocket-rpc-js: Standard RSocket RPC Implementation

GitHub - rsocket/rsocket-rpc-kotlin: Standard RSocket RPC Kotlin Implementation

これと似たようなものとして、gRPCがありますね。gRPCも、少し前に試してみました。

はじめてのgRPC(Java) - CLOVER🍀

では、RSocket RPCがgRPCとどう異なるかですが…RSocket RPCがどのような動機で作られたのかは、こちらを
参照するとよいでしょう。

https://github.com/rsocket/rsocket-rpc-java/blob/0.2.11/docs/motivation.md

端的に言うと、Reactive Streams、Back Pressureへの対応です。gRPCでは、フロー制御への対応がないことを、
このドキュメントでは指摘しています。

今回は同じProtocol Buffers 3を使うgRPCについて過去に自分で書いたエントリの内容を使いつつ、RSocket RPC Java
試してみたいと思います。

参考にするドキュメントは、こちらです。

https://github.com/rsocket/rsocket-rpc-java/blob/0.2.11/docs/get-started.md

お題

今回のお題は、前のgRPCのエントリと同じく、ツイート的なものです。

  • クライアントから名前とツイートする内容、ツイートした時刻を含めてリクエストすると、サーバー側がその内容にidを追加して返す
  • 上記の内容を双方向のStreamingにして、ツイートするとサーバーからこれまでにツイートされた内容が非同期に返ってくる

環境

今回の環境は、こちら。

$ java -version
openjdk version "1.8.0_191"
OpenJDK Runtime Environment (build 1.8.0_191-8u191-b12-0ubuntu0.18.04.1-b12)
OpenJDK 64-Bit Server VM (build 25.191-b12, mixed mode)


$ mvn -version
Apache Maven 3.6.0 (97c98ec64a1fdfee7767ce5ffb20918da4f719f3; 2018-10-25T03:41:47+09:00)
Maven home: $HOME/.sdkman/candidates/maven/current
Java version: 1.8.0_191, vendor: Oracle Corporation, runtime: /usr/lib/jvm/java-8-openjdk-amd64/jre
Default locale: ja_JP, platform encoding: UTF-8
OS name: "linux", version: "4.15.0-43-generic", arch: "amd64", family: "unix"

準備

Maven依存関係は、こちら。

        <dependency>
            <groupId>io.rsocket.rpc</groupId>
            <artifactId>rsocket-rpc-core</artifactId>
            <version>0.2.11</version>
            <exclusions>
                <exclusion>
                    <groupId>io.rsocket.rpc</groupId>
                    <artifactId>rsocket-rpc-protobuf</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>io.rsocket</groupId>
            <artifactId>rsocket-transport-netty</artifactId>
            <version>0.11.13</version>
        </dependency>
        <dependency>
            <groupId>com.google.protobuf</groupId>
            <artifactId>protobuf-java</artifactId>
            <version>3.6.1</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.25</version>
        </dependency>

「rsocket-rpc-core」と、通信ライブラリとして「rsocket-transport-netty」、そしてProtocol Buffersのライブラリが必要です。

「rsocket-rpc-protobuf」をexcludeしているのは、これを書かないと依存関係の解決に失敗するからです…。

コンパイル時に、こんなエラーを受け取ることになります。

[ERROR] Failed to execute goal on project rsocket-hello-rpc: Could not resolve dependencies for project org.littlewings:rsocket-hello-rpc:jar:0.0.1-SNAPSHOT: Could not find artifact io.rsocket.rpc:rsocket-rpc-protobuf:jar:0.2.11 in central (https://repo.maven.apache.org/maven2) -> [Help 1]

このライブラリがなにかは、次で説明します。

あと、「slf4j-simple」はログ出力用ですね。

そして、Maven Pluginの設定が必要です。

Getting StartedのドキュメントではGradle向けの記述しか書かれていないのですが、

https://github.com/rsocket/rsocket-rpc-java/blob/0.2.11/docs/get-started.md

これをMavenの設定に読み替えるとこんな感じになりました。

    <build>
        <extensions>
            <extension>
                <groupId>kr.motd.maven</groupId>
                <artifactId>os-maven-plugin</artifactId>
                <version>1.6.1</version>
            </extension>
        </extensions>
        <plugins>
            <plugin>
                <groupId>org.xolstice.maven.plugins</groupId>
                <artifactId>protobuf-maven-plugin</artifactId>
                <version>0.6.1</version>
                <configuration>
                    <protocArtifact>com.google.protobuf:protoc:3.6.1:exe:${os.detected.classifier}</protocArtifact>
                    <pluginId>rsocket-rpc</pluginId>
                    <pluginArtifact>io.rsocket.rpc:rsocket-rpc-protobuf:0.2.11:exe:${os.detected.classifier}
                    </pluginArtifact>
                </configuration>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>compile-custom</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

どこかで見たことがあります。

ほぼ、gRPCの時の設定と同じです。

gRPC / Generated Code

変えているのは、Protocol BuffersのMaven Pluginに追加している、protocに対するプラグインです。

                <configuration>
                    <protocArtifact>com.google.protobuf:protoc:3.6.1:exe:${os.detected.classifier}</protocArtifact>
                    <pluginId>rsocket-rpc</pluginId>
                    <pluginArtifact>io.rsocket.rpc:rsocket-rpc-protobuf:0.2.11:exe:${os.detected.classifier}
                    </pluginArtifact>
                </configuration>

Maven Protocol Buffers Plugin – Introduction

Maven Centralを見るとわかるのですが、このアーティファクトはexeとして提供されており、protocに組み込まれて動作します。

https://repo1.maven.org/maven2/io/rsocket/rpc/rsocket-rpc-protobuf/0.2.11/

なので、これを依存関係に含めている「rsocket-rpc-core」をそのまま使うと、コンパイルに失敗するという…。

環境的な準備は、ここまでです。

簡単なRPCを書く

ここからは、gRPCの時のエントリと同じ流れで書いていきます。

まずは、ツイートをするとidを付けてレスポンスが戻ってくる、簡単なRPCを書きます。

IDLの定義。
src/main/proto/tweet.proto

syntax = "proto3";

import "google/protobuf/empty.proto";

option java_multiple_files = true;
option java_package = "org.littlewings.rsocket.simpletweet";
option java_outer_classname = "SimpleTweetService";

package simpletweet;

service SimpleTweeter {
    rpc Tweet (TweetRequest) returns (TweetResponse) {}
}

message TweetRequest {
    string name = 1;
    string text = 2;
    int64 time = 3;
}

message TweetResponse {
    int64 id = 1;
    string name = 2;
    string text = 3;
    int64 time = 4;
}

RPCの定義はこの部分で、

service SimpleTweeter {
    rpc Tweet (TweetRequest) returns (TweetResponse) {}
}

残りは、送受信するメッセージの定義です。

gRPCの時の定義とほぼ同じなのですが、少しだけ変えたところが。

「objc_class_prefix」を省略したことと、以下のimport文を追加したことです。

import "google/protobuf/empty.proto";

これは、今回は使用しませんが、Fire-and-Forgetのパターンの時の、戻り値の定義として使います。

Fire-and-Forget / Protobuf

import "google/protobuf/empty.proto";
...
rpc FireAndForget (SimpleRequest) returns (google.protobuf.Empty) {}

これをコンパイルすると、ソースコードが生成されます。

$ mvn compile
[INFO] Scanning for projects...
[INFO] ------------------------------------------------------------------------
[INFO] Detecting the operating system and CPU architecture
[INFO] ------------------------------------------------------------------------
[INFO] os.detected.name: linux
[INFO] os.detected.arch: x86_64
[INFO] os.detected.version: 4.15
[INFO] os.detected.version.major: 4
[INFO] os.detected.version.minor: 15
[INFO] os.detected.release: ubuntu
[INFO] os.detected.release.version: 18.04
[INFO] os.detected.release.like.ubuntu: true
[INFO] os.detected.release.like.debian: true
[INFO] os.detected.classifier: linux-x86_64
[INFO] 
[INFO] -----------------< org.littlewings:rsocket-hello-rpc >------------------
[INFO] Building rsocket-hello-rpc 0.0.1-SNAPSHOT
[INFO] --------------------------------[ jar ]---------------------------------
[INFO] 
[INFO] --- protobuf-maven-plugin:0.6.1:compile (default) @ rsocket-hello-rpc ---
[INFO] Compiling 2 proto file(s) to /path/to/rsocket-hello-rpc/target/generated-sources/protobuf/java
[WARNING] PROTOC: tweet-streaming.proto: warning: Import google/protobuf/empty.proto but not used.
tweet.proto: warning: Import google/protobuf/empty.proto but not used.

[INFO] 
[INFO] --- protobuf-maven-plugin:0.6.1:compile-custom (default) @ rsocket-hello-rpc ---
[INFO] Compiling 2 proto file(s) to /path/to/target/generated-sources/protobuf/rsocket-rpc
[WARNING] PROTOC: tweet-streaming.proto: warning: Import google/protobuf/empty.proto but not used.
tweet.proto: warning: Import google/protobuf/empty.proto but not used.

あら、importしたのに使っていないと、警告されるんですね…。

tweet.proto: warning: Import google/protobuf/empty.proto but not used.

まあ、今回はメモの目的も兼ねているので、このままいくことにします。

生成されたソースコードは、今回の設定では「target/generated-sources/protobuf/rsocket-java」と
「target/generated-sources/protobuf/java」にそれぞれ出力されます。

$ find target/generated-sources -type f
target/generated-sources/protobuf/rsocket-rpc/org/littlewings/rsocket/simpletweet/BlockingSimpleTweeterClient.java
target/generated-sources/protobuf/rsocket-rpc/org/littlewings/rsocket/simpletweet/BlockingSimpleTweeter.java
target/generated-sources/protobuf/rsocket-rpc/org/littlewings/rsocket/simpletweet/SimpleTweeterServer.java
target/generated-sources/protobuf/rsocket-rpc/org/littlewings/rsocket/simpletweet/SimpleTweeterClient.java
target/generated-sources/protobuf/rsocket-rpc/org/littlewings/rsocket/simpletweet/BlockingSimpleTweeterServer.java
target/generated-sources/protobuf/rsocket-rpc/org/littlewings/rsocket/simpletweet/SimpleTweeter.java
target/generated-sources/protobuf/java/org/littlewings/rsocket/simpletweet/TweetRequest.java
target/generated-sources/protobuf/java/org/littlewings/rsocket/simpletweet/SimpleTweetService.java
target/generated-sources/protobuf/java/org/littlewings/rsocket/simpletweet/TweetResponseOrBuilder.java
target/generated-sources/protobuf/java/org/littlewings/rsocket/simpletweet/TweetRequestOrBuilder.java
target/generated-sources/protobuf/java/org/littlewings/rsocket/simpletweet/TweetResponse.java

では、Server側を書いていってみます。

RPCの部分。自動生成されたインターフェースをimplementsします。
src/main/java/org/littlewings/rsocket/rpc/simple/DefaultSimpleTwitter.java

package org.littlewings.rsocket.rpc.simple;

import io.netty.buffer.ByteBuf;
import org.littlewings.rsocket.simpletweet.SimpleTweeter;
import org.littlewings.rsocket.simpletweet.TweetRequest;
import org.littlewings.rsocket.simpletweet.TweetResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

public class DefaultSimpleTwitter implements SimpleTweeter {
    Logger logger = LoggerFactory.getLogger(getClass());

    @Override
    public Mono<TweetResponse> tweet(TweetRequest request, ByteBuf metadata) {
        logger.info("request: name = {}, text = {}, time = {}", request.getName(), request.getText(), request.getTime());

        TweetResponse tweetResponse =
                TweetResponse
                        .newBuilder()
                        .setId(System.currentTimeMillis())
                        .setName(request.getName())
                        .setText(request.getText())
                        .setTime(request.getTime())
                        .build();

        return Mono.just(tweetResponse);
    }
}

リクエスト/レスポンスのスタイルだと、Monoを返すように実装するようです。

Serverとして起動するクラス。 src/main/java/org/littlewings/rsocket/rpc/simple/SimpleServer.java

package org.littlewings.rsocket.rpc.simple;

import java.util.Optional;

import io.rsocket.RSocketFactory;
import io.rsocket.rpc.rsocket.RequestHandlingRSocket;
import io.rsocket.transport.netty.server.CloseableChannel;
import io.rsocket.transport.netty.server.TcpServerTransport;
import org.littlewings.rsocket.simpletweet.SimpleTweeterServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

public class SimpleServer {
    Logger logger = LoggerFactory.getLogger(getClass());

    SimpleTweeterServer server;
    CloseableChannel serverChannel;

    public static void main(String... args) {
        SimpleServer simpleServer = new SimpleServer();
        simpleServer.start();

        System.console().readLine("> Enter stop.");

        simpleServer.stop();
    }

    public void start() {
        server = new SimpleTweeterServer(new DefaultSimpleTwitter(), Optional.empty(), Optional.empty());

        serverChannel =
                RSocketFactory
                        .receive()
                        .acceptor((setup, sendingSocket) -> Mono.just(new RequestHandlingRSocket(server)))
                        .transport(TcpServerTransport.create("localhost", 8080))
                        .start()
                        .log()
                        .block();

        logger.info("start RSocket RPC server.");
    }

    public void stop() {
        server.dispose();
        serverChannel.dispose();

        logger.info("stop RSocket RPC server.");
    }
}

先ほど作成したクラスのインスタンスを生成して、RSocketFactoryからServerを起動します。

        server = new SimpleTweeterServer(new DefaultSimpleTwitter(), Optional.empty(), Optional.empty());

        serverChannel =
                RSocketFactory
                        .receive()
                        .acceptor((setup, sendingSocket) -> Mono.just(new RequestHandlingRSocket(server)))
                        .transport(TcpServerTransport.create("localhost", 8080))
                        .start()
                        .log()
                        .block();

なお、このServerはEnterを押すと停止します。

Client側。
src/main/java/org/littlewings/rsocket/rpc/simple/SimpleClient.java

package org.littlewings.rsocket.rpc.simple;

import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.transport.netty.client.TcpClientTransport;
import org.littlewings.rsocket.simpletweet.SimpleTweeterClient;
import org.littlewings.rsocket.simpletweet.TweetRequest;
import org.littlewings.rsocket.simpletweet.TweetResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SimpleClient {
    Logger logger = LoggerFactory.getLogger(getClass());

    SimpleTweeterClient client;
    RSocket rsocket;

    public static void main(String... args) {
        SimpleClient simpleClient = SimpleClient.create("localhost", 8080);
        simpleClient.tweet("磯野 カツオ", "こんにちは!!RSocket RPC!!");
        simpleClient.tweet("フグ田 タラオ", "Hello World!!");
        simpleClient.shutdown();
    }

    public static SimpleClient create(String host, int port) {
        SimpleClient simpleClient = new SimpleClient();

        RSocket rsocket = RSocketFactory
                .connect()
                .transport(TcpClientTransport.create(host, port))
                .start()
                .log()
                .block();

        simpleClient.client = new SimpleTweeterClient(rsocket);
        simpleClient.rsocket = rsocket;

        return simpleClient;
    }

    public void tweet(String name, String text) {
        TweetRequest tweetRequest =
                TweetRequest
                        .newBuilder()
                        .setName(name)
                        .setText(text)
                        .setTime(System.currentTimeMillis())
                        .build();

        TweetResponse tweetResponse =
                client
                        .tweet(tweetRequest)
                        .block();

        logger.info("response: id = {}, name = {}, text = {}, time = {}", tweetResponse.getId(), tweetResponse.getName(), tweetResponse.getText(), tweetResponse.getTime());
    }

    public void shutdown() {
        rsocket.dispose();
    }
}

Client側も、やっぱりRSocketFactoryからRSocketを作ります。

        RSocket rsocket = RSocketFactory
                .connect()
                .transport(TcpClientTransport.create(host, port))
                .start()
                .log()
                .block();

今回はリクエスト/レスポンスを受け取る時はblockしていますが、Reactorを使ってコードを書きます。

    public void tweet(String name, String text) {
        TweetRequest tweetRequest =
                TweetRequest
                        .newBuilder()
                        .setName(name)
                        .setText(text)
                        .setTime(System.currentTimeMillis())
                        .build();

        TweetResponse tweetResponse =
                client
                        .tweet(tweetRequest)
                        .block();

        logger.info("response: id = {}, name = {}, text = {}, time = {}", tweetResponse.getId(), tweetResponse.getName(), tweetResponse.getText(), tweetResponse.getTime());
    }

では、実行してみましょう。

Serverを起動。

$ mvn compile exec:java -Dexec.mainClass=org.littlewings.rsocket.rpc.simple.SimpleServer

Clientを実行。

$ mvn compile exec:java -Dexec.mainClass=org.littlewings.rsocket.rpc.simple.SimpleClient

...

[org.littlewings.rsocket.rpc.simple.SimpleClient.main()] INFO org.littlewings.rsocket.rpc.simple.SimpleClient - response: id = 1545834020324, name = 磯野 カツオ, text = こんにちは!!RSocket RPC!!, time = 1545834020248
[org.littlewings.rsocket.rpc.simple.SimpleClient.main()] INFO org.littlewings.rsocket.rpc.simple.SimpleClient - response: id = 1545834020358, name = フグ田 タラオ, text = Hello World!!, time = 1545834020356

Server側では、こんなログが出力されます。

[reactor-tcp-server-epoll-10] INFO org.littlewings.rsocket.rpc.simple.DefaultSimpleTwitter - request: name = 磯野 カツオ, text = こんにちは!!RSocket RPC!!, time = 1545834020248
[reactor-tcp-server-epoll-10] INFO org.littlewings.rsocket.rpc.simple.DefaultSimpleTwitter - request: name = フグ田 タラオ, text = Hello World!!, time = 1545834020356

OKそうですね。

Streaming(双方向)なRPCを書く

続いて、双方向なRPCを書いてみます。

IDLの定義。
src/main/proto/tweet-streaming.proto

syntax = "proto3";

import "google/protobuf/empty.proto";

option java_multiple_files = true;
option java_package = "org.littlewings.rsocket.streamingtweet";
option java_outer_classname = "StreamingTweetService";

package streamingtweet;

service StreamingTweeter {
    rpc TweetStreaming (stream TweetRequest) returns (stream TweetResponse) {}
}

message TweetRequest {
    string name = 1;
    string text = 2;
    int64 time = 3;
}

message TweetResponse {
    int64 id = 1;
    string name = 2;
    string text = 3;
    int64 time = 4;
}

RCPの引数と戻り値が、streamになりました。

service StreamingTweeter {
    rpc TweetStreaming (stream TweetRequest) returns (stream TweetResponse) {}
}

これで、双方向のstreamとなります。というか、gRPCの時と同じです。

では、コードを自動生成して

$ mvn compile

Server側を作成しましょう。

まずはRPCの部分。
src/main/java/org/littlewings/rsocket/rpc/streaming/DefaultStreamingTwitter.java

package org.littlewings.rsocket.rpc.streaming;

import java.util.ArrayList;
import java.util.List;

import io.netty.buffer.ByteBuf;
import org.littlewings.rsocket.streamingtweet.StreamingTweeter;
import org.littlewings.rsocket.streamingtweet.TweetRequest;
import org.littlewings.rsocket.streamingtweet.TweetResponse;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;

public class DefaultStreamingTwitter implements StreamingTweeter {
    Logger logger = LoggerFactory.getLogger(getClass());
    List<TweetResponse> tweets = new ArrayList<>();

    @Override
    public Flux<TweetResponse> tweetStreaming(Publisher<TweetRequest> request, ByteBuf metadata) {
        return Flux.from(request)
                .log()
                .flatMap(tweetRequest -> {
                    logger.info("request: name = {}, text = {}, time = {}",
                            tweetRequest.getName(),
                            tweetRequest.getText(),
                            tweetRequest.getTime());

                    TweetResponse tweetResponse =
                            TweetResponse
                                    .newBuilder()
                                    .setId(System.currentTimeMillis())
                                    .setName(tweetRequest.getName())
                                    .setText(tweetRequest.getText())
                                    .setTime(tweetRequest.getTime())
                                    .build();

                    List<TweetResponse> snapshot;
                    synchronized (this) {
                        tweets.add(tweetResponse);
                        snapshot = new ArrayList<>(tweets);
                    }

                    return Flux.fromIterable(snapshot);
                });
    }
}

今度は、メソッドの引数がPublisherに、戻り値がFluxとなります。

また、リクエストの度にツイートを蓄積し、全部返す感じで作成しました。

    @Override
    public Flux<TweetResponse> tweetStreaming(Publisher<TweetRequest> request, ByteBuf metadata) {
        return Flux.from(request)
                .log()
                .flatMap(tweetRequest -> {
                    logger.info("request: name = {}, text = {}, time = {}",
                            tweetRequest.getName(),
                            tweetRequest.getText(),
                            tweetRequest.getTime());

                    TweetResponse tweetResponse =
                            TweetResponse
                                    .newBuilder()
                                    .setId(System.currentTimeMillis())
                                    .setName(tweetRequest.getName())
                                    .setText(tweetRequest.getText())
                                    .setTime(tweetRequest.getTime())
                                    .build();

                    List<TweetResponse> snapshot;
                    synchronized (this) {
                        tweets.add(tweetResponse);
                        snapshot = new ArrayList<>(tweets);
                    }

                    return Flux.fromIterable(snapshot);
                });
    }

Serverを起動するクラス。
src/main/java/org/littlewings/rsocket/rpc/streaming/StreamingServer.java

package org.littlewings.rsocket.rpc.streaming;

import java.util.Optional;

import io.rsocket.RSocketFactory;
import io.rsocket.rpc.rsocket.RequestHandlingRSocket;
import io.rsocket.transport.netty.server.CloseableChannel;
import io.rsocket.transport.netty.server.TcpServerTransport;
import org.littlewings.rsocket.streamingtweet.StreamingTweeterServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

public class StreamingServer {
    Logger logger = LoggerFactory.getLogger(getClass());

    StreamingTweeterServer server;
    CloseableChannel serverChannel;

    public static void main(String... args) {
        StreamingServer streamingServer = new StreamingServer();
        streamingServer.start();

        System.console().readLine("> Enter stop.");

        streamingServer.stop();
    }

    public void start() {
        server = new StreamingTweeterServer(new DefaultStreamingTwitter(), Optional.empty(), Optional.empty());

        serverChannel =
                RSocketFactory
                        .receive()
                        .acceptor((setup, sendingSocket) -> Mono.just(new RequestHandlingRSocket(server)))
                        .transport(TcpServerTransport.create("localhost", 8080))
                        .start()
                        .log()
                        .block();

        logger.info("start RSocket RPC server.");
    }

    public void stop() {
        server.dispose();
        serverChannel.dispose();

        logger.info("stop RSocket RPC server.");
    }
}

こちらは、リクエスト/レスポンスのスタイルの時と、ほぼ変わりません。

続いて、Client側。
src/main/java/org/littlewings/rsocket/rpc/streaming/StreamingClient.java

package org.littlewings.rsocket.rpc.streaming;

import java.io.Console;
import java.util.concurrent.TimeUnit;

import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.transport.netty.client.TcpClientTransport;
import org.littlewings.rsocket.streamingtweet.StreamingTweeterClient;
import org.littlewings.rsocket.streamingtweet.TweetRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

public class StreamingClient {
    Logger logger = LoggerFactory.getLogger(getClass());

    StreamingTweeterClient client;
    RSocket rsocket;

    public static void main(String... args) {
        StreamingClient simpleClient = StreamingClient.create("localhost", 8080);
        simpleClient.tweetWhile("磯野 カツオ");
        simpleClient.shutdown();
    }

    public static StreamingClient create(String host, int port) {
        StreamingClient streamingClient = new StreamingClient();

        RSocket rsocket = RSocketFactory
                .connect()
                .transport(TcpClientTransport.create(host, port))
                .start()
                .log()
                .block();

        streamingClient.client = new StreamingTweeterClient(rsocket);
        streamingClient.rsocket = rsocket;

        return streamingClient;
    }

    public void tweetWhile(String name) {
        Console console = System.console();

        while (true) {
            String line = console.readLine("> ");

            if (line.isEmpty()) {
                continue;
            }

            if (line.equals("exit")) {
                break;
            }

            TweetRequest tweetRequest =
                    TweetRequest
                            .newBuilder()
                            .setName(name)
                            .setText(line)
                            .setTime(System.currentTimeMillis())
                            .build();

            client
                    .tweetStreaming(Mono.just(tweetRequest))
                    .log()
                    .subscribe(tweetResponse ->
                            logger.info("response: id = {}, name = {}, text = {}, time = {}",
                                    tweetResponse.getId(),
                                    tweetResponse.getName(),
                                    tweetResponse.getText(),
                                    tweetResponse.getTime())
                    );

            try {
                TimeUnit.SECONDS.sleep(1L);
            } catch (InterruptedException e) {
                // ignore
            }
        }

        logger.info("request completed!!");
    }

    public void shutdown() {
        rsocket.dispose();
    }
}

リクエストの送信については、固定の名前でツイートし続けるものに。まあ、gRPCの時のサンプルと同じです。

    public void tweetWhile(String name) {
        Console console = System.console();

        while (true) {
            String line = console.readLine("> ");

            if (line.isEmpty()) {
                continue;
            }

            if (line.equals("exit")) {
                break;
            }

            TweetRequest tweetRequest =
                    TweetRequest
                            .newBuilder()
                            .setName(name)
                            .setText(line)
                            .setTime(System.currentTimeMillis())
                            .build();

            client
                    .tweetStreaming(Mono.just(tweetRequest))
                    .log()
                    .subscribe(tweetResponse ->
                            logger.info("response: id = {}, name = {}, text = {}, time = {}",
                                    tweetResponse.getId(),
                                    tweetResponse.getName(),
                                    tweetResponse.getText(),
                                    tweetResponse.getTime())
                    );

            try {
                TimeUnit.SECONDS.sleep(1L);
            } catch (InterruptedException e) {
                // ignore
            }
        }

        logger.info("request completed!!");
    }

RSocketを作るところは、リクエスト/レスポンススタイルの時と変わりません。

なお、「exit」と入力すると終了します。

では、確認してみます。Server側を起動。

$ mvn compile exec:java -Dexec.mainClass=org.littlewings.rsocket.rpc.streaming.StreamingServer

Client側を起動。

$ mvn compile exec:java -Dexec.mainClass=org.littlewings.rsocket.rpc.streaming.StreamingClient

...

[reactor-tcp-client-epoll-12] INFO reactor.Mono.FlatMap.1 - | onNext(io.rsocket.RSocketClient@52a40755)
[reactor-tcp-client-epoll-12] INFO reactor.Mono.FlatMap.1 - | onComplete()
> 

待機状態になるので、適当にツイートしてみます。

> こんにちは、RSockeet RPC
> こんにちは、RSocket RPC Java
[org.littlewings.rsocket.rpc.streaming.StreamingClient.main()] INFO reactor.Flux.Map.2 - onSubscribe(FluxMap.MapSubscriber)
[org.littlewings.rsocket.rpc.streaming.StreamingClient.main()] INFO reactor.Flux.Map.2 - request(unbounded)
[reactor-tcp-client-epoll-12] INFO reactor.Flux.Map.2 - onNext(id: 1545835418281
name: "\347\243\257\351\207\216 \343\202\253\343\203\204\343\202\252"
text: "\343\201\223\343\202\223\343\201\253\343\201\241\343\201\257\343\200\201RSocket RPC Java"
time: 1545835418155
)
[reactor-tcp-client-epoll-12] INFO org.littlewings.rsocket.rpc.streaming.StreamingClient - response: id = 1545835418281, name = 磯野 カツオ, text = こんにちは、RSocket RPC Java, time = 1545835418155
[reactor-tcp-client-epoll-12] INFO reactor.Flux.Map.2 - onComplete()
> JavaでRSocket RPC
[org.littlewings.rsocket.rpc.streaming.StreamingClient.main()] INFO reactor.Flux.Map.3 - onSubscribe(FluxMap.MapSubscriber)
[org.littlewings.rsocket.rpc.streaming.StreamingClient.main()] INFO reactor.Flux.Map.3 - request(unbounded)
[reactor-tcp-client-epoll-12] INFO reactor.Flux.Map.3 - onNext(id: 1545835418281
name: "\347\243\257\351\207\216 \343\202\253\343\203\204\343\202\252"
text: "\343\201\223\343\202\223\343\201\253\343\201\241\343\201\257\343\200\201RSocket RPC Java"
time: 1545835418155
)
[reactor-tcp-client-epoll-12] INFO org.littlewings.rsocket.rpc.streaming.StreamingClient - response: id = 1545835418281, name = 磯野 カツオ, text = こんにちは、RSocket RPC Java, time = 1545835418155
[reactor-tcp-client-epoll-12] INFO reactor.Flux.Map.3 - onNext(id: 1545835428272
name: "\347\243\257\351\207\216 \343\202\253\343\203\204\343\202\252"
text: "Java\343\201\247RSocket RPC"
time: 1545835428264
)
[reactor-tcp-client-epoll-12] INFO org.littlewings.rsocket.rpc.streaming.StreamingClient - response: id = 1545835428272, name = 磯野 カツオ, text = JavaでRSocket RPC, time = 1545835428264
[reactor-tcp-client-epoll-12] INFO reactor.Flux.Map.3 - onComplete()

メッセージを送ると、サーバー側からはこれまでに送ったツイート+今回のツイートが返ってきます。

Server側のログ。

[reactor-tcp-server-epoll-10] INFO org.littlewings.rsocket.rpc.streaming.DefaultStreamingTwitter - request: name = 磯野 カツオ, text = こんにちは、RSocket RPC Java, time = 1545835418155
[reactor-tcp-server-epoll-10] INFO reactor.Flux.Map.2 - request(1)
[reactor-tcp-server-epoll-10] INFO reactor.Flux.Map.2 - onComplete()
[reactor-tcp-server-epoll-10] INFO reactor.Flux.Map.3 - onSubscribe(FluxMap.MapSubscriber)
[reactor-tcp-server-epoll-10] INFO reactor.Flux.Map.3 - request(256)
[reactor-tcp-server-epoll-10] INFO reactor.Flux.Map.3 - onNext(name: "\347\243\257\351\207\216 \343\202\253\343\203\204\343\202\252"
text: "Java\343\201\247RSocket RPC"
time: 1545835428264
)
[reactor-tcp-server-epoll-10] INFO org.littlewings.rsocket.rpc.streaming.DefaultStreamingTwitter - request: name = 磯野 カツオ, text = JavaでRSocket RPC, time = 1545835428264
[reactor-tcp-server-epoll-10] INFO reactor.Flux.Map.3 - request(1)
[reactor-tcp-server-epoll-10] INFO reactor.Flux.Map.3 - onComplete()

これで、双方向のStreamingが確認できました、と。

まとめ

RSocket RPC Javaを試してみました。

さらっと書いた感じでは、Reactorで書けるgRPCですね。今回は、エラー処理とかほぼ書いていないので、gRPCより
すっきりしたように見えてしまいますが…。

gRPCとの距離感は、どうなんでしょうね。Springにも入りそうな感じですし、こちらもちょいちょい見ておきましょうか。

[SPR-16751] RSocket client and server support - Spring JIRA