少し前に、Reactor Nettyで単純なHTTP Client/Serverを書いてみましたが、今度はもう少し下のレイヤーにいってEcho Client/Serverを書いてみたいと思います。
要するに、TcpServerとTcpClientでコードを書きますよ、と。
実は、HTTPよりも先にこちらから始めていたのですが、TcpServerとTcpClientの使い方がよくわからずに、諦めてHTTPに1度移ったという話もあるのですが…。
とはいえ、今でもやっぱりよくわかりません。
とりあえず、書いたコードを乗せていきましょう。
準備
Maven依存関係。
<dependency> <groupId>io.projectreactor.ipc</groupId> <artifactId>reactor-netty</artifactId> <version>0.5.2.RELEASE</version> </dependency>
Reactor Nettyのバージョンは、0.5.2.RELEASEとします。
Echo Server
では、まず最初にEcho Serverを書いてみましょう。
コード自体は短いので、いきなり載せてみます。
src/main/java/org/littlewings/reactor/netty/echo/EchoServer.java
package org.littlewings.reactor.netty.echo; import java.nio.charset.StandardCharsets; import reactor.ipc.netty.tcp.TcpServer; public class EchoServer { public static void main(String... args) { int port = 12012; TcpServer tcpServer = TcpServer.create(port); tcpServer.start(channel -> channel .receiveString(StandardCharsets.UTF_8) .log("echo.server") .map(String::trim) .map(m -> "*** " + m + " ***" + System.lineSeparator()) .next() .publish(s -> channel.sendString(s, StandardCharsets.UTF_8)) ); // System.console().readLine("> Enter stop."); // tcpServer.shutdownAndAwait(); // 以下と同じ // tcpServer.shutdown().block(); } }
単純に受け取った文字列を返すのではなくて、「***」くらいつけるようにしました。
で、短いことは短いのですが、こうなるのにけっこう苦労しまして…。
少し説明すると、最初はポートを指定してTcpServerを作成。
int port = 12012; TcpServer tcpServer = TcpServer.create(port);
この他、バインドするアドレスも渡せたり、Nettyの設定をオプションとして渡すこともできるようです。
そして、TcpServer#startでNettyChannelが渡ってくるので、ここで通信時にどんなことをしたいのかを実装します。
tcpServer.start(channel -> channel .receiveString(StandardCharsets.UTF_8) .log("echo.server") .map(String::trim) .map(m -> "*** " + m + " ***" + System.lineSeparator()) .next() .publish(s -> channel.sendString(s, StandardCharsets.UTF_8)) );
今回は、文字列を受け取ってトリムして「***」をくっつけ、その状態だとFluxなのでnextでMonoに落とし、最後にクライアント側に文字列を送るっといった処理をつなげて書いています。
シャットダウンするコードは、今回はコメントアウト。
// tcpServer.shutdownAndAwait(); // 以下と同じ // tcpServer.shutdown().block();
ただ、この実装順がパッと出てこなくて、けっこうハマりました…。慣れないとなかなか難しいですね…。
このコードを書く時に参考にしたのは、テストコードですね。
https://github.com/reactor/reactor-netty/blob/v0.5.2.RELEASE/src/test/java/reactor/ipc/netty/tcp/TcpServerTests.java
といっても、Echoそのものズバリはありませんでしたが…。
動作確認。
$ echo 'Hello World' | nc -q 1 localhost 12012 *** Hello World ***
OKそうです。
Echo Client
続いて、Echo Client。
※最後に追記しました
こちらは、Server側よりさらに苦労したような…。
結果は、このように。
src/main/java/org/littlewings/reactor/netty/echo/EchoClient.java
package org.littlewings.reactor.netty.echo; import java.nio.charset.StandardCharsets; import java.util.concurrent.CountDownLatch; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.ipc.netty.tcp.TcpClient; public class EchoClient { public static void main(String... args) throws InterruptedException { TcpClient client = TcpClient.create("localhost", 12012); CountDownLatch latch = new CountDownLatch(1); Mono<Void> mono = client.start(channel -> { channel.sendString(Flux.just("Hello Reactor"), StandardCharsets.UTF_8).subscribe(); channel.receiveString(StandardCharsets.UTF_8) .subscribe(s -> { System.out.println(s); latch.countDown(); }); return Flux.never(); }); mono.awaitOnSubscribe(); latch.await(); client.shutdown().awaitOnSubscribe(); } }
Client側は、今回は接続先のホストとポートを指定して、TcpClientを作成しました。
TcpClient client = TcpClient.create("localhost", 12012);
そして、文字列の送信と受信のコードをTcpClient#start内に書くのですが、こちらはServer側の様に送受信をつなげるのではなく、別々に書いてCountDownLatchで待ち合わせます。
CountDownLatch latch = new CountDownLatch(1); Mono<Void> mono = client.start(channel -> { channel.sendString(Flux.just("Hello Reactor"), StandardCharsets.UTF_8).subscribe(); channel.receiveString(StandardCharsets.UTF_8) .subscribe(s -> { System.out.println(s); latch.countDown(); }); return Flux.never(); }); mono.awaitOnSubscribe(); latch.await();
TcpClient#start自体へは、Flux#neverを返しておきます。
最後は、TcpClientをシャットダウン。
client.shutdown().awaitOnSubscribe();
動かしてみます。
*** Hello Reactor ***
とりあえず、OKそうです。
とはいえ、このClientのコード。あんまり腑に落ちていません…。やっぱり、CountDownLatchを使ってしまったのがとても気になります。で、試行錯誤してはみたものの、結局よくわからずこの形に。
他にはテストコード(Echo Clientありますし)や
https://github.com/reactor/reactor-netty/blob/v0.5.2.RELEASE/src/test/java/reactor/ipc/netty/tcp/TcpClientTests.java#L102
ドキュメントを見てみても
Asynchronous TCP, UDP and HTTP/Head first with a Java 8 example of some Net work
みんなCountDownLatch使ってますし…。
HttpClientの時はこうじゃなかったなぁと思いつつ、そこはHandlerを作って頑張っているみたいなので、TCPだともう少し足りない感じなのでしょうか。
https://github.com/reactor/reactor-netty/blob/v0.5.2.RELEASE/src/main/java/reactor/ipc/netty/http/NettyHttpClientHandler.java
まあ、解法が見つかったらまた書いてみたいかなと思います。
慣れないのとあまり情報がないので、ちょっとのコードを書くのにもなかなか苦労するのですが…慣れたら楽になるかな?慣れるといいなぁと…。
※追記)
その後、@makingさんにヒントをいただいたので、書き直してみました。
@kazuhira_r やりたいのはこういうこと?URL
というわけで、(ほぼアドバイスのままですが)こういう記述になりました。
package org.littlewings.reactor.netty.echo; import java.nio.charset.StandardCharsets; import reactor.core.publisher.Mono; import reactor.ipc.netty.tcp.TcpClient; public class EchoClient { public static void main(String... args) { TcpClient client = TcpClient.create("localhost", 12012); Mono<String> response = Mono.create(sink -> client.start(channel -> { channel .receiveString(StandardCharsets.UTF_8) .log("echo.client.receive") .next() .doOnSuccess(sink::success) .doOnError(sink::error) .subscribe(); channel .sendString(Mono.just("Hello Reactor"), StandardCharsets.UTF_8) .log("echo.client.send") .doOnError(sink::error) .subscribe(); return Mono.never(); }) ); response.doOnSuccess(s -> System.out.println(s)).block(); client.shutdown(); } }
なるほど、Mono#create(Consumer
これを見て、「あっ」て思いましたが、この発想はなかったです。まだまだ慣れが足りませんね…。