Reactor Coreで遊んだので、今度はReactor Nettyへ。
Reactor Nettyを使うことで、MonoやFluxを使ってTCP、UDP、HTTPのコードが書けるようになるみたいです。
Reactorに関するドキュメントはこちらなのですが、
https://projectreactor.io/docs/
Reactor IPCのサブモジュール的な感じにとらえればよさそうですね。
GitHub - reactor/reactor-ipc: Crossing IO boundaries on the JVM with Reactive Streams
ドキュメントは、Reactor IPCのものを見ればよさそうですが、まだあまり書かれていません。
https://projectreactor.io/ipc/docs/reference/
Javadocは別れているようなので、それぞれに。
IPC contracts for Reactive transport and protocol 0.6.2.RELEASE API
あんまり情報がないのですが、テストコードなどを参考に進めていこうと思います。
現時点でのReactor Nettyのバージョンは、0.5.1.RELEASEです。
それでは、使っていってみます。
準備
まずはMaven依存関係。
<dependency> <groupId>io.projectreactor.ipc</groupId> <artifactId>reactor-netty</artifactId> <version>0.5.1.RELEASE</version> </dependency>
今回は、テストコードは使わない形でいきます。
Reactor IPCのCodecとして、Reactor Addonsというサブモジュールがあります(Reactor Codec)。
GitHub - reactor/reactor-addons: Official modules for the Reactor project
このReactor Codecを使うとJSON向けのCodecなどが利用できるようになるのですが、現在のReactor NettyではこのReactor Codecに対する依存関係がOptionalであり、なおかつSNAPSHOTに依存しているので今回は使わない方向でいきます。
HTTPサーバーを書く
それでは、HTTPサーバーを書いていきます。
できあがったコードを先に載せると、こんな感じ。
src/main/java/org/littlewings/reactor/netty/http/ReactorNettyHttpServer.java
package org.littlewings.reactor.netty.http; import java.util.Arrays; import java.util.Map; import java.util.function.Function; import java.util.stream.Collectors; import io.netty.handler.codec.http.HttpMethod; import reactor.core.publisher.Mono; import reactor.ipc.netty.http.HttpServer; public class ReactorNettyHttpServer { public static void main(String... args) { int port = 8080; HttpServer server = HttpServer.create(port) .get("/index", channel -> channel.sendString(Mono.just("Hello World!!"))) .directory("/src", "src/main/java") .file("/pom", "pom.xml") .route(channel -> channel.uri().startsWith("/calc?") && HttpMethod.GET.equals(channel.method()), channel -> { Function<String, Map<String, Object>> paramResolver = uri -> { String query = uri.split("\\?")[1]; return Arrays .stream(query.split("&")) .map(kv -> kv.split("=")) .collect(Collectors.toMap(kv -> kv[0], kv -> kv[1])); }; channel.paramsResolver(paramResolver); String a = (String) channel.param("a"); String b = (String) channel.param("b"); int result = Integer.parseInt(a) + Integer.parseInt(b); return channel.sendString(Mono.just(Integer.toString(result))); }); server.start().log("httpd").block(); //server.shutdownAndAwait(); //server.shutdown().block(); } }
サーバーのリッスンポートは、8080とします。
int port = 8080;
ポートを与え、HttpServer#createでHttpServerのインスタンスを作成します。
HttpServer.create(port)
このHttpServerに対して、getやpost、そしてfileやdirectoryでマッピングを書いていくことができます。
.get("/index", channel -> channel.sendString(Mono.just("Hello World!!"))) .directory("/src", "src/main/java") .file("/pom", "pom.xml")
また、複雑な条件?みたいなことをしたければ、routeを使えばいい?
.route(channel -> channel.uri().startsWith("/calc?") && HttpMethod.GET.equals(channel.method()), channel -> { Function<String, Map<String, Object>> paramResolver = uri -> { String query = uri.split("\\?")[1]; return Arrays .stream(query.split("&")) .map(kv -> kv.split("=")) .collect(Collectors.toMap(kv -> kv[0], kv -> kv[1])); }; channel.paramsResolver(paramResolver); String a = (String) channel.param("a"); String b = (String) channel.param("b"); int result = Integer.parseInt(a) + Integer.parseInt(b); return channel.sendString(Mono.just(Integer.toString(result))); });
クエリなどをパースしてくれるものはないので、今回は自前で書きました。HttpChannel#paramsResolverで設定すればいいみたいです。
また、テストコードを見ているとパラメータを使ったパスっぽいものも使えそうに見えるのですが、
httpServer.get("/get/{name}", getHandler());
これはReactor Busがないとダメみたいなので、こちらもパス。Reactor Addonsの、やっぱりSNAPSHOT依存ですし。
あとは起動して、ブロックして待機。logメソッドを使うことで、ログ出力を行うことができます。
server.start().log("httpd").block();
停止する時は、HttpServer#shutdownすればよいです。今回は、コメントアウトしてCtrl-cで停止のつもり。
//server.shutdownAndAwait(); //server.shutdown().block();
確認
それでは、作成したHTTPサーバーの動作確認をしてみます。
$ mvn compile exec:java -Dexec.mainClass=org.littlewings.reactor.netty.http.ReactorNettyHttpServer
確認。
## /index $ curl http://localhost:8080/index Hello World!! ## HTTPサーバーのソースコードを表示 $ curl http://localhost:8080/src/org/littlewings/reactor/netty/http/ReactorNettyHttpServer.java package org.littlewings.reactor.netty.http; import java.util.Arrays; import java.util.Map; import java.util.function.Function; import java.util.stream.Collectors; import io.netty.handler.codec.http.HttpMethod; import reactor.core.publisher.Mono; import reactor.ipc.netty.http.HttpServer; public class ReactorNettyHttpServer { public static void main(String... args) { int port = 8080; HttpServer server = HttpServer.create(port) 〜省略〜 server.start().log("httpd").block(); //server.shutdownAndAwait(); //server.shutdown().block(); } } ## /calcで計算 $ curl 'http://localhost:8080/calc?a=5&b=3' 8
OKそうですね。
HTTPクライアント
続いて、クライアント側。
※@makingさんに指摘をいただいたので、後に追記しています
先ほどcurlで確認したことと、同じことをやってみます。
src/main/java/org/littlewings/reactor/netty/http/ReactorNettyHttpClient.java
package org.littlewings.reactor.netty.http; import java.nio.charset.StandardCharsets; import reactor.core.publisher.Flux; import reactor.ipc.netty.http.HttpClient; public class ReactorNettyHttpClient { public static void main(String... args) { HttpClient client = HttpClient.create("localhost", 8080); Flux<String> indexResponse = client .get("/index") .map(response -> response.receiveString(StandardCharsets.UTF_8)) .block(); indexResponse.subscribe(System.out::println); Flux<String> sourceResponse = client .get("/src/org/littlewings/reactor/netty/http/ReactorNettyHttpServer.java") .map(response -> response.receiveString(StandardCharsets.UTF_8)) .block(); sourceResponse.subscribe(System.out::println); Flux<String> calcResponse = client .get("/calc?a=5&b=3") .map(response -> response.receiveString(StandardCharsets.UTF_8)) .block(); calcResponse.subscribe(System.out::println); client.shutdown().block(); } }
見た感じそんなに難しくないと思いますが、一応解説。
まず、接続先を指定して、HttpClient#createでHttpClientのインスタンスを作成します。
HttpClient client = HttpClient.create("localhost", 8080);
あとは、HttpClient#getなどでアクセスすればOKです。レスポンスの中身などは、HttpClientResponseからreceiveなどで取得することになります。
Flux<String> indexResponse = client .get("/index") .map(response -> response.receiveString(StandardCharsets.UTF_8)) .block(); indexResponse.subscribe(System.out::println);
ここでも、MonoやFluxを使うことになります。
最後はshutdownしておきましょう。
client.shutdown().block();
実行結果は、curlと同じ結果がコンソールに出力されるだけなので、端折ります。
HTTPクライアント(追記)
@kazuhira_r URLClientコードでblock()しているのがイマイチ感あるのでget(..).flatMap(i -> i.receiveString(...)).next()でMono<String>にするのどうでしょう
というわけで、ちょっと変えてみました。
package org.littlewings.reactor.netty.http; import java.nio.charset.StandardCharsets; import reactor.core.publisher.Mono; import reactor.ipc.netty.http.HttpClient; public class ReactorNettyHttpClient { public static void main(String... args) { HttpClient client = HttpClient.create("localhost", 8080); Mono<String> indexResponse = client .get("/index") .flatMap(response -> response.receiveString(StandardCharsets.UTF_8)) .next(); indexResponse.subscribe(System.out::println); Mono<String> sourceResponse = client .get("/src/org/littlewings/reactor/netty/http/ReactorNettyHttpServer.java") .flatMap(response -> response.receiveString(StandardCharsets.UTF_8)) .next(); sourceResponse.subscribe(System.out::println); Mono<String> calcResponse = client .get("/calc?a=5&b=3") .flatMap(response -> response.receiveString(StandardCharsets.UTF_8)) .next(); calcResponse.subscribe(System.out::println); indexResponse .then(sourceResponse) .then(calcResponse) .block(); client.shutdown().block(); } }
Flux#nextでMonoとして受け取るようにして
Mono<String> indexResponse = client .get("/index") .flatMap(response -> response.receiveString(StandardCharsets.UTF_8)) .next(); indexResponse.subscribe(System.out::println);
ブロックするのは最後にまとめてみました。
indexResponse .then(sourceResponse) .then(calcResponse) .block();
もしくは、こういうのでもOKみたいです。
Flux.merge(indexResponse, sourceResponse, calcResponse).blockLast();
オマケ
オマケで、HttpServer#directoryで簡単な静的なHTTPサーバーが作れるので、書いてみます。
src/main/java/org/littlewings/reactor/netty/http/SimpleStaticHttpd.java
package org.littlewings.reactor.netty.http; import reactor.ipc.netty.http.HttpServer; public class SimpleStaticHttpd { public static void main(String... args) { int port = 8080; String documentRoot = "./"; HttpServer server = HttpServer .create(port) .directory("/", documentRoot); server.start().log().block(); } }
この実装だと、カレントディレクトリ配下のファイルやディレクトリをHTTPで公開することができます。
※ホントにこのまま使ったらダメでしょうけど
ちょっと使ってみて思ったのは、404のハンドリングとか書いてないとクライアントも終わりがわからなくなるみたい(curlがずっと待ってしまう)なので、そのあたりどうしたらいいのかな?という感じですね。
まあ、ちょっとお試し的な使い方しかしないでしょうから、これでも困らないと思いますが…。
とりあえず、こんなところで。