過去に書いた、こちらのエントリのReactor Netty 0.7版。
Reactor NettyでEcho Client/Serverを書く - CLOVER
Reactor Nettyを使って、TCPでEcho Client/Serverを書いてみます。
ストリームっぽく…とはまあいかないものの、Reactor Nettyも前に動かしていた頃よりだいぶ進んだので、そろそろ
やり直そうと。
というか、APIもだいぶ変わりましたね。
準備
Reactor Nettyは、ReactorのBOMに入っているので、dependencyManagementとしてこのように定義。
<dependencyManagement> <dependencies> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-bom</artifactId> <version>Bismuth-SR4</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
そして、「reactor-netty」を追加することで、今回はReactor Netty 0.7.2が使えるようになります。
<dependency> <groupId>io.projectreactor.ipc</groupId> <artifactId>reactor-netty</artifactId> </dependency>
あとは、テストライブラリも足しておきます。
<dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter-api</artifactId> <version>5.0.3</version> <scope>test</scope> </dependency>
テストコードの雛形
準備はできたので、コードを書いていってみます。
ドキュメントは…ない感じなので…
※過去に見えていたドキュメントもあるにはあるのですが、内容が古い…
参考にしたのは、自分の過去のエントリと、こちら。
Reactor NettyでEcho Client/Serverを書く - CLOVER
Echo Serverを実装して学ぶReactor Nettyによるストリーム処理
Echo Clientを実装して学ぶReactor NettyのTCP Client
https://github.com/reactor/reactor-netty/blob/v0.7.2.RELEASE/src/test/java/reactor/ipc/netty/tcp/TcpServerTests.java
https://github.com/reactor/reactor-netty/blob/v0.7.2.RELEASE/src/test/java/reactor/ipc/netty/tcp/TcpClientTests.java
これらを元にして、書いていきます。
コードの雛形は、こちら。
src/test/java/org/littlewings/reactor/tcp/TcpEchoTest.java
package org.littlewings.reactor.tcp; import java.nio.charset.StandardCharsets; import java.util.function.BiFunction; import org.junit.jupiter.api.Test; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.ipc.netty.NettyContext; import reactor.ipc.netty.NettyInbound; import reactor.ipc.netty.NettyOutbound; import reactor.ipc.netty.tcp.BlockingNettyContext; import reactor.ipc.netty.tcp.TcpClient; import reactor.ipc.netty.tcp.TcpServer; import reactor.test.StepVerifier; public class TcpEchoTest { @Test public void tcpEchoClientAndServer() { // ここに、テストを書く! } }
Server側
まずは、Server側から書いていきます。
//// Server TcpServer echoServer = TcpServer.create(8000); BlockingNettyContext serverContext = echoServer .<BiFunction<NettyInbound, NettyOutbound, Publisher<Void>>>start((inbound, outbound) -> { Flux<String> input = inbound.receive().asString(StandardCharsets.UTF_8); input .log("server") .map(message -> "***" + message + "***") .doOnNext(message -> outbound.sendString(Mono.just(message)).then().subscribe()) .subscribe(); return Flux.never(); });
ほぼ、参考にしたブログエントリのまま…。
TcpServer#createでリッスンポートを指定してインスタンスを作成し、TcpServer#startでハンドラを登録します。
データを受信して
Flux<String> input = inbound.receive().asString(StandardCharsets.UTF_8);
今回はちょっと加工して、Client側へ送り返します。
input .log("server") .map(message -> "***" + message + "***") .doOnNext(message -> outbound.sendString(Mono.just(message)).then().subscribe()) .subscribe();
これで、Server全体の入出力の流れを組んでいる…と。
最後に、終わらないFluxを返しておしまい。
return Flux.never();
最終的には、TcpServer#startで返ってきたBlockingNettyContext#shutdownを呼び出してServerを停止します。
//// Shutdown
serverContext.shutdown();
テストコードの都合上、BlockingNettyContext#shutdownの間にClient側のソースコードが挟まっているのですが、
それは最後に載せることにします。
Client側
続いて、Client側。
//// Client TcpClient echoClient = TcpClient.create(8000); Mono<String> echoResponse = Mono.create(sink -> { BlockingNettyContext clientContext = echoClient .<BiFunction<NettyInbound, NettyOutbound, Publisher<Void>>>start((inbound, outbound) -> { NettyOutbound send = outbound .sendString(Mono.just("こんにちは、Reactor"), StandardCharsets.UTF_8); return send.then(inbound .receive() .asString(StandardCharsets.UTF_8) .log("client") .doOnNext(sink::success) .then() ); } ); //// Shutdown clientContext.installShutdownHook(); });
Server側から受信した結果を、別にアサーションしたかったので、Mono#createでMonoSinkを使って作成することにしました。
Client側は、データを送って受診時にFlux#doOnNextでMonoSinkに受信したデータを登録し、Mono
BlockingNettyContext clientContext = echoClient .<BiFunction<NettyInbound, NettyOutbound, Publisher<Void>>>start((inbound, outbound) -> { NettyOutbound send = outbound .sendString(Mono.just("こんにちは、Reactor"), StandardCharsets.UTF_8); return send.then(inbound .receive() .asString(StandardCharsets.UTF_8) .log("client") .doOnNext(sink::success) .then() ); } );
Client側の停止は、ShutdownHookを使う方法を今回は取りました。
//// Shutdown
clientContext.installShutdownHook();
結果の確認。
//// Assertion StepVerifier .create(echoResponse) .expectNext("***こんにちは、Reactor***") .verifyComplete();
OKそうですね。
ここまでまとめると、こんな感じです。
@Test public void tcpEchoClientAndServer() { //// Server TcpServer echoServer = TcpServer.create(8000); BlockingNettyContext serverContext = echoServer .<BiFunction<NettyInbound, NettyOutbound, Publisher<Void>>>start((inbound, outbound) -> { Flux<String> input = inbound.receive().asString(StandardCharsets.UTF_8); input .log("server") .map(message -> "***" + message + "***") .doOnNext(message -> outbound.sendString(Mono.just(message)).then().subscribe()) .subscribe(); return Flux.never(); }); //// Client TcpClient echoClient = TcpClient.create(8000); Mono<String> echoResponse = Mono.create(sink -> { Mono<? extends NettyContext> clientContext = echoClient.newHandler((inbound, outbound) -> { NettyOutbound send = outbound .sendString(Mono.just("こんにちは、Reactor"), StandardCharsets.UTF_8); return send.then(inbound .receive() .asString(StandardCharsets.UTF_8) .log("client") .doOnNext(sink::success) .then() ); }); NettyContext c = clientContext.block(); sink.onDispose(c::dispose); }); //// Assertion StepVerifier .create(echoResponse) .expectNext("***こんにちは、Reactor***") .verifyComplete(); //// Shutdown serverContext.shutdown(); }
別のやり方で
Client側で、ShutdownHookを使ったのが微妙な気がして、他の方法は?とあれこれやってみた結果がこちら。
Mono<String> echoResponse = Mono.create(sink -> { Mono<? extends NettyContext> clientContext = echoClient.newHandler((inbound, outbound) -> { NettyOutbound send = outbound .sendString(Mono.just("こんにちは、Reactor"), StandardCharsets.UTF_8); return send.then(inbound .receive() .asString(StandardCharsets.UTF_8) .log("client") .doOnNext(sink::success) .then() ); }); NettyContext c = clientContext.block(); sink.onDispose(c::dispose); });
TcpClient#newHandlerでMonoが返ってくるので、これをblockしてdispose。
BlockingNettyContextも、Mono#blockを呼んでいるもののようですし。
https://github.com/reactor/reactor-netty/blob/v0.7.2.RELEASE/src/main/java/reactor/ipc/netty/tcp/BlockingNettyContext.java#L53-L56
相変わらず慣れないですし、特にClient側の停め方がなんともな感じなのですが…とりあえずこんなところで。