CLOVER🍀

That was when it all began.

Reactor Netty 0.7でTCP Echo Client/Server

過去に書いた、こちらのエントリのReactor Netty 0.7版。

Reactor NettyでEcho Client/Serverを書く - CLOVER

Reactor Nettyを使って、TCPでEcho Client/Serverを書いてみます。

ストリームっぽく…とはまあいかないものの、Reactor Nettyも前に動かしていた頃よりだいぶ進んだので、そろそろ
やり直そうと。

というか、APIもだいぶ変わりましたね。

準備

Reactor Nettyは、ReactorのBOMに入っているので、dependencyManagementとしてこのように定義。

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>io.projectreactor</groupId>
                <artifactId>reactor-bom</artifactId>
                <version>Bismuth-SR4</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

そして、「reactor-netty」を追加することで、今回はReactor Netty 0.7.2が使えるようになります。

        <dependency>
            <groupId>io.projectreactor.ipc</groupId>
            <artifactId>reactor-netty</artifactId>
        </dependency>

あとは、テストライブラリも足しておきます。

        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-api</artifactId>
            <version>5.0.3</version>
            <scope>test</scope>
        </dependency>

テストコードの雛形

準備はできたので、コードを書いていってみます。

ドキュメントは…ない感じなので…

Documentation

※過去に見えていたドキュメントもあるにはあるのですが、内容が古い…

Reactor Netty

参考にしたのは、自分の過去のエントリと、こちら。

Reactor NettyでEcho Client/Serverを書く - CLOVER

Echo Serverを実装して学ぶReactor Nettyによるストリーム処理

Echo Clientを実装して学ぶReactor NettyのTCP Client

https://github.com/reactor/reactor-netty/blob/v0.7.2.RELEASE/src/test/java/reactor/ipc/netty/tcp/TcpServerTests.java
https://github.com/reactor/reactor-netty/blob/v0.7.2.RELEASE/src/test/java/reactor/ipc/netty/tcp/TcpClientTests.java

これらを元にして、書いていきます。

コードの雛形は、こちら。
src/test/java/org/littlewings/reactor/tcp/TcpEchoTest.java

package org.littlewings.reactor.tcp;

import java.nio.charset.StandardCharsets;
import java.util.function.BiFunction;

import org.junit.jupiter.api.Test;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.ipc.netty.NettyContext;
import reactor.ipc.netty.NettyInbound;
import reactor.ipc.netty.NettyOutbound;
import reactor.ipc.netty.tcp.BlockingNettyContext;
import reactor.ipc.netty.tcp.TcpClient;
import reactor.ipc.netty.tcp.TcpServer;
import reactor.test.StepVerifier;

public class TcpEchoTest {
    @Test
    public void tcpEchoClientAndServer() {

        // ここに、テストを書く!

    }
}

Server側

まずは、Server側から書いていきます。

        //// Server
        TcpServer echoServer = TcpServer.create(8000);

        BlockingNettyContext serverContext =
                echoServer
                        .<BiFunction<NettyInbound, NettyOutbound, Publisher<Void>>>start((inbound, outbound) -> {
                            Flux<String> input = inbound.receive().asString(StandardCharsets.UTF_8);

                            input
                                    .log("server")
                                    .map(message -> "***" + message + "***")
                                    .doOnNext(message ->
                                            outbound.sendString(Mono.just(message)).then().subscribe())
                                    .subscribe();

                            return Flux.never();
                        });

ほぼ、参考にしたブログエントリのまま…。

TcpServer#createでリッスンポートを指定してインスタンスを作成し、TcpServer#startでハンドラを登録します。

データを受信して

                            Flux<String> input = inbound.receive().asString(StandardCharsets.UTF_8);

今回はちょっと加工して、Client側へ送り返します。

                            input
                                    .log("server")
                                    .map(message -> "***" + message + "***")
                                    .doOnNext(message ->
                                            outbound.sendString(Mono.just(message)).then().subscribe())
                                    .subscribe();

これで、Server全体の入出力の流れを組んでいる…と。

最後に、終わらないFluxを返しておしまい。

                            return Flux.never();

最終的には、TcpServer#startで返ってきたBlockingNettyContext#shutdownを呼び出してServerを停止します。

        //// Shutdown
        serverContext.shutdown();

テストコードの都合上、BlockingNettyContext#shutdownの間にClient側のソースコードが挟まっているのですが、
それは最後に載せることにします。

Client側

続いて、Client側。

        //// Client
        TcpClient echoClient = TcpClient.create(8000);

        Mono<String> echoResponse =
                Mono.create(sink -> {
                    BlockingNettyContext clientContext =
                            echoClient
                                    .<BiFunction<NettyInbound, NettyOutbound, Publisher<Void>>>start((inbound, outbound) -> {
                                                NettyOutbound send =
                                                        outbound
                                                                .sendString(Mono.just("こんにちは、Reactor"), StandardCharsets.UTF_8);

                                                return send.then(inbound
                                                        .receive()
                                                        .asString(StandardCharsets.UTF_8)
                                                        .log("client")
                                                        .doOnNext(sink::success)
                                                        .then()
                                                );
                                            }
                                    );

                    //// Shutdown
                    clientContext.installShutdownHook();
                });

Server側から受信した結果を、別にアサーションしたかったので、Mono#createでMonoSinkを使って作成することにしました。

Client側は、データを送って受診時にFlux#doOnNextでMonoSinkに受信したデータを登録し、Monoを返して終了にします。

                    BlockingNettyContext clientContext =
                            echoClient
                                    .<BiFunction<NettyInbound, NettyOutbound, Publisher<Void>>>start((inbound, outbound) -> {
                                                NettyOutbound send =
                                                        outbound
                                                                .sendString(Mono.just("こんにちは、Reactor"), StandardCharsets.UTF_8);

                                                return send.then(inbound
                                                        .receive()
                                                        .asString(StandardCharsets.UTF_8)
                                                        .log("client")
                                                        .doOnNext(sink::success)
                                                        .then()
                                                );
                                            }
                                    );

Client側の停止は、ShutdownHookを使う方法を今回は取りました。

                    //// Shutdown
                    clientContext.installShutdownHook();

結果の確認。

        //// Assertion
        StepVerifier
                .create(echoResponse)
                .expectNext("***こんにちは、Reactor***")
                .verifyComplete();

OKそうですね。

ここまでまとめると、こんな感じです。

    @Test
    public void tcpEchoClientAndServer() {
        //// Server
        TcpServer echoServer = TcpServer.create(8000);

        BlockingNettyContext serverContext =
                echoServer
                        .<BiFunction<NettyInbound, NettyOutbound, Publisher<Void>>>start((inbound, outbound) -> {
                            Flux<String> input = inbound.receive().asString(StandardCharsets.UTF_8);

                            input
                                    .log("server")
                                    .map(message -> "***" + message + "***")
                                    .doOnNext(message ->
                                            outbound.sendString(Mono.just(message)).then().subscribe())
                                    .subscribe();

                            return Flux.never();
                        });

        //// Client
        TcpClient echoClient = TcpClient.create(8000);

        Mono<String> echoResponse =
                Mono.create(sink -> {
                    Mono<? extends NettyContext> clientContext =
                            echoClient.newHandler((inbound, outbound) -> {
                                NettyOutbound send =
                                        outbound
                                                .sendString(Mono.just("こんにちは、Reactor"), StandardCharsets.UTF_8);

                                return send.then(inbound
                                        .receive()
                                        .asString(StandardCharsets.UTF_8)
                                        .log("client")
                                        .doOnNext(sink::success)
                                        .then()
                                );
                            });

                    NettyContext c = clientContext.block();
                    sink.onDispose(c::dispose);
                });

        //// Assertion
        StepVerifier
                .create(echoResponse)
                .expectNext("***こんにちは、Reactor***")
                .verifyComplete();

        //// Shutdown
        serverContext.shutdown();
    }

別のやり方で

Client側で、ShutdownHookを使ったのが微妙な気がして、他の方法は?とあれこれやってみた結果がこちら。

        Mono<String> echoResponse =
                Mono.create(sink -> {
                    Mono<? extends NettyContext> clientContext =
                            echoClient.newHandler((inbound, outbound) -> {
                                NettyOutbound send =
                                        outbound
                                                .sendString(Mono.just("こんにちは、Reactor"), StandardCharsets.UTF_8);

                                return send.then(inbound
                                        .receive()
                                        .asString(StandardCharsets.UTF_8)
                                        .log("client")
                                        .doOnNext(sink::success)
                                        .then()
                                );
                            });

                    NettyContext c = clientContext.block();
                    sink.onDispose(c::dispose);
                });

TcpClient#newHandlerでMonoが返ってくるので、これをblockしてdispose。

BlockingNettyContextも、Mono#blockを呼んでいるもののようですし。
https://github.com/reactor/reactor-netty/blob/v0.7.2.RELEASE/src/main/java/reactor/ipc/netty/tcp/BlockingNettyContext.java#L53-L56

相変わらず慣れないですし、特にClient側の停め方がなんともな感じなのですが…とりあえずこんなところで。