CLOVER🍀

That was when it all began.

Reactor Nettyのhttpパッケージで遊ぶ

Reactor Coreで遊んだので、今度はReactor Nettyへ。
Reactor Nettyを使うことで、MonoやFluxを使ってTCPUDP、HTTPのコードが書けるようになるみたいです。

Reactorに関するドキュメントはこちらなのですが、

https://projectreactor.io/docs/

Reactor IPCのサブモジュール的な感じにとらえればよさそうですね。

GitHub - reactor/reactor-ipc: Crossing IO boundaries on the JVM with Reactive Streams

ドキュメントは、Reactor IPCのものを見ればよさそうですが、まだあまり書かれていません。

https://projectreactor.io/ipc/docs/reference/

Javadocは別れているようなので、それぞれに。

IPC contracts for Reactive transport and protocol 0.6.2.RELEASE API

Reactor Netty 0.7.8.RELEASE

あんまり情報がないのですが、テストコードなどを参考に進めていこうと思います。

現時点でのReactor Nettyのバージョンは、0.5.1.RELEASEです。

それでは、使っていってみます。

準備

まずはMaven依存関係。

        <dependency>
            <groupId>io.projectreactor.ipc</groupId>
            <artifactId>reactor-netty</artifactId>
            <version>0.5.1.RELEASE</version>
        </dependency>

今回は、テストコードは使わない形でいきます。

Reactor IPCのCodecとして、Reactor Addonsというサブモジュールがあります(Reactor Codec)。

GitHub - reactor/reactor-addons: Official modules for the Reactor project

このReactor Codecを使うとJSON向けのCodecなどが利用できるようになるのですが、現在のReactor NettyではこのReactor Codecに対する依存関係がOptionalであり、なおかつSNAPSHOTに依存しているので今回は使わない方向でいきます。

HTTPサーバーを書く

それでは、HTTPサーバーを書いていきます。

できあがったコードを先に載せると、こんな感じ。
src/main/java/org/littlewings/reactor/netty/http/ReactorNettyHttpServer.java

package org.littlewings.reactor.netty.http;

import java.util.Arrays;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

import io.netty.handler.codec.http.HttpMethod;
import reactor.core.publisher.Mono;
import reactor.ipc.netty.http.HttpServer;

public class ReactorNettyHttpServer {
    public static void main(String... args) {
        int port = 8080;

        HttpServer server =
                HttpServer.create(port)
                        .get("/index", channel -> channel.sendString(Mono.just("Hello World!!")))
                        .directory("/src", "src/main/java")
                        .file("/pom", "pom.xml")
                        .route(channel -> channel.uri().startsWith("/calc?") && HttpMethod.GET.equals(channel.method()),
                                channel -> {
                                    Function<String, Map<String, Object>> paramResolver = uri -> {
                                        String query = uri.split("\\?")[1];

                                        return Arrays
                                                .stream(query.split("&"))
                                                .map(kv -> kv.split("="))
                                                .collect(Collectors.toMap(kv -> kv[0], kv -> kv[1]));
                                    };

                                    channel.paramsResolver(paramResolver);

                                    String a = (String) channel.param("a");
                                    String b = (String) channel.param("b");
                                    int result = Integer.parseInt(a) + Integer.parseInt(b);

                                    return channel.sendString(Mono.just(Integer.toString(result)));
                                });

        server.start().log("httpd").block();

        //server.shutdownAndAwait();
        //server.shutdown().block();
    }
}

サーバーのリッスンポートは、8080とします。

        int port = 8080;

ポートを与え、HttpServer#createでHttpServerのインスタンスを作成します。

                HttpServer.create(port)

このHttpServerに対して、getやpost、そしてfileやdirectoryでマッピングを書いていくことができます。

                        .get("/index", channel -> channel.sendString(Mono.just("Hello World!!")))
                        .directory("/src", "src/main/java")
                        .file("/pom", "pom.xml")

また、複雑な条件?みたいなことをしたければ、routeを使えばいい?

                        .route(channel -> channel.uri().startsWith("/calc?") && HttpMethod.GET.equals(channel.method()),
                                channel -> {
                                    Function<String, Map<String, Object>> paramResolver = uri -> {
                                        String query = uri.split("\\?")[1];

                                        return Arrays
                                                .stream(query.split("&"))
                                                .map(kv -> kv.split("="))
                                                .collect(Collectors.toMap(kv -> kv[0], kv -> kv[1]));
                                    };

                                    channel.paramsResolver(paramResolver);

                                    String a = (String) channel.param("a");
                                    String b = (String) channel.param("b");
                                    int result = Integer.parseInt(a) + Integer.parseInt(b);

                                    return channel.sendString(Mono.just(Integer.toString(result)));
                                });

クエリなどをパースしてくれるものはないので、今回は自前で書きました。HttpChannel#paramsResolverで設定すればいいみたいです。

また、テストコードを見ているとパラメータを使ったパスっぽいものも使えそうに見えるのですが、

		httpServer.get("/get/{name}", getHandler());

https://github.com/reactor/reactor-netty/blob/master/src/test/java/reactor/ipc/netty/http/PostAndGetTests.java

これはReactor Busがないとダメみたいなので、こちらもパス。Reactor Addonsの、やっぱりSNAPSHOT依存ですし。

あとは起動して、ブロックして待機。logメソッドを使うことで、ログ出力を行うことができます。

        server.start().log("httpd").block();

停止する時は、HttpServer#shutdownすればよいです。今回は、コメントアウトしてCtrl-cで停止のつもり。

        //server.shutdownAndAwait();
        //server.shutdown().block();

確認

それでは、作成したHTTPサーバーの動作確認をしてみます。

$ mvn compile exec:java -Dexec.mainClass=org.littlewings.reactor.netty.http.ReactorNettyHttpServer

確認。

## /index
$ curl http://localhost:8080/index
Hello World!!

## HTTPサーバーのソースコードを表示
$ curl http://localhost:8080/src/org/littlewings/reactor/netty/http/ReactorNettyHttpServer.java
package org.littlewings.reactor.netty.http;

import java.util.Arrays;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

import io.netty.handler.codec.http.HttpMethod;
import reactor.core.publisher.Mono;
import reactor.ipc.netty.http.HttpServer;

public class ReactorNettyHttpServer {
    public static void main(String... args) {
        int port = 8080;

        HttpServer server =
                HttpServer.create(port)

〜省略〜

        server.start().log("httpd").block();

        //server.shutdownAndAwait();
        //server.shutdown().block();
    }
}

## /calcで計算
$ curl 'http://localhost:8080/calc?a=5&b=3'
8

OKそうですね。

HTTPクライアント

続いて、クライアント側。
※@makingさんに指摘をいただいたので、後に追記しています

先ほどcurlで確認したことと、同じことをやってみます。
src/main/java/org/littlewings/reactor/netty/http/ReactorNettyHttpClient.java

package org.littlewings.reactor.netty.http;

import java.nio.charset.StandardCharsets;

import reactor.core.publisher.Flux;
import reactor.ipc.netty.http.HttpClient;

public class ReactorNettyHttpClient {
    public static void main(String... args) {
        HttpClient client = HttpClient.create("localhost", 8080);

        Flux<String> indexResponse =
                client
                        .get("/index")
                        .map(response -> response.receiveString(StandardCharsets.UTF_8))
                        .block();

        indexResponse.subscribe(System.out::println);

        Flux<String> sourceResponse =
                client
                        .get("/src/org/littlewings/reactor/netty/http/ReactorNettyHttpServer.java")
                        .map(response -> response.receiveString(StandardCharsets.UTF_8))
                        .block();

        sourceResponse.subscribe(System.out::println);

        Flux<String> calcResponse =
                client
                        .get("/calc?a=5&b=3")
                        .map(response -> response.receiveString(StandardCharsets.UTF_8))
                        .block();

        calcResponse.subscribe(System.out::println);


        client.shutdown().block();
    }
}

見た感じそんなに難しくないと思いますが、一応解説。

まず、接続先を指定して、HttpClient#createでHttpClientのインスタンスを作成します。

        HttpClient client = HttpClient.create("localhost", 8080);

あとは、HttpClient#getなどでアクセスすればOKです。レスポンスの中身などは、HttpClientResponseからreceiveなどで取得することになります。

        Flux<String> indexResponse =
                client
                        .get("/index")
                        .map(response -> response.receiveString(StandardCharsets.UTF_8))
                        .block();

        indexResponse.subscribe(System.out::println);

ここでも、MonoやFluxを使うことになります。

最後はshutdownしておきましょう。

        client.shutdown().block();

実行結果は、curlと同じ結果がコンソールに出力されるだけなので、端折ります。

HTTPクライアント(追記

というわけで、ちょっと変えてみました。

package org.littlewings.reactor.netty.http;

import java.nio.charset.StandardCharsets;

import reactor.core.publisher.Mono;
import reactor.ipc.netty.http.HttpClient;

public class ReactorNettyHttpClient {
    public static void main(String... args) {
        HttpClient client = HttpClient.create("localhost", 8080);

        Mono<String> indexResponse =
                client
                        .get("/index")
                        .flatMap(response -> response.receiveString(StandardCharsets.UTF_8))
                        .next();

        indexResponse.subscribe(System.out::println);

        Mono<String> sourceResponse =
                client
                        .get("/src/org/littlewings/reactor/netty/http/ReactorNettyHttpServer.java")
                        .flatMap(response -> response.receiveString(StandardCharsets.UTF_8))
                        .next();

        sourceResponse.subscribe(System.out::println);

        Mono<String> calcResponse =
                client
                        .get("/calc?a=5&b=3")
                        .flatMap(response -> response.receiveString(StandardCharsets.UTF_8))
                        .next();

        calcResponse.subscribe(System.out::println);

        indexResponse
                .then(sourceResponse)
                .then(calcResponse)
                .block();

        client.shutdown().block();
    }
}

Flux#nextでMonoとして受け取るようにして

        Mono<String> indexResponse =
                client
                        .get("/index")
                        .flatMap(response -> response.receiveString(StandardCharsets.UTF_8))
                        .next();

        indexResponse.subscribe(System.out::println);

ブロックするのは最後にまとめてみました。

        indexResponse
                .then(sourceResponse)
                .then(calcResponse)
                .block();

もしくは、こういうのでもOKみたいです。

        Flux.merge(indexResponse, sourceResponse, calcResponse).blockLast();

オマケ

オマケで、HttpServer#directoryで簡単な静的なHTTPサーバーが作れるので、書いてみます。
src/main/java/org/littlewings/reactor/netty/http/SimpleStaticHttpd.java

package org.littlewings.reactor.netty.http;

import reactor.ipc.netty.http.HttpServer;

public class SimpleStaticHttpd {
    public static void main(String... args) {
        int port = 8080;
        String documentRoot = "./";

        HttpServer server =
                HttpServer
                        .create(port)
                        .directory("/", documentRoot);

        server.start().log().block();
    }
}

この実装だと、カレントディレクトリ配下のファイルやディレクトリをHTTPで公開することができます。
※ホントにこのまま使ったらダメでしょうけど

ちょっと使ってみて思ったのは、404のハンドリングとか書いてないとクライアントも終わりがわからなくなるみたい(curlがずっと待ってしまう)なので、そのあたりどうしたらいいのかな?という感じですね。

まあ、ちょっとお試し的な使い方しかしないでしょうから、これでも困らないと思いますが…。

とりあえず、こんなところで。