Reacive〜と名の付くものに関する話題について、そろそろ少しずつ追ってみようかなということで。
とりあえず、触りつつ試しつつ始めてみようと思い、まずはReactorを触ってみることにしました。
GitHub - reactor/reactor: Reactor BOM and legacy (2.x, 1.x)
Reactorとは、Pivotalが開発しているReactive Streamsの実装っぽいです。
Reactive Webアプリケーション - そしてSpring 5へ #jjug_ccc #ccc_ef3
まあ、いろいろ難しいことは抜きにして、単純に動かしてみましょう。
準備
今回は、Reactor Coreを使ってコードを書いて試すことにします。
GitHub - reactor/reactor-core: Non-Blocking Reactive Foundation for the JVM
Reactor Coreは3.0.0.RC2が最新のようですが、Maven Centralにはまだないので、SpringのMilestoneリポジトリを加えます。
<repository> <id>spring-milestone</id> <name>Spring Milestone Repository</name> <url>https://repo.spring.io/milestone</url> </repository>
あとは、Reactor Coreを動作確認用のテストライブラリをMaven依存関係に記述。
<dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-core</artifactId> <version>3.0.0.RC2</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> <dependency> <groupId>org.assertj</groupId> <artifactId>assertj-core</artifactId> <version>3.5.2</version> <scope>test</scope> </dependency>
はじめてのReactor
とりあえずまったく経験がないのですが、Reactor Coreのドキュメントや軽く調べてみるとFluxとMonoというものを使ってみるようです。
これを書いた後に気付きましたが、ハンズオンを見たらよかったかも…。
GitHub - reactor/lite-rx-api-hands-on: Lite Rx API Hands-On with Reactor Core 3
その他、参考。
Reactor Core 2.5: もう一つのJava向けReactive Extensions実装
JavaでReactiveプログラミング ~Reactor~
感覚的には、Fluxが複数の値を扱えるもので、Monoが単発の値を扱えるものみたいですね。これをStreamによく似たAPIで扱える感じです。
src/test/java/org/littlewings/reactor/helloworld/HelloReactor.java
package org.littlewings.reactor.helloworld; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import org.junit.Test; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import static org.assertj.core.api.Assertions.assertThat; public class HelloReactor { // ここに、テストを書く! }
Flux
では、まずはFluxから。今回は、ListからFluxを作成して、Subscirbeで受け取ってみます。
@Test public void flux() { List<String> received = new ArrayList<>(); Flux.fromIterable(Arrays.asList("Hello", "World", "Reactor", "Flux")) .subscribe(s -> received.add(s)); assertThat(received).containsExactly("Hello", "World", "Reactor", "Flux"); }
単純に、Fluxを作成した時の値がそのまま渡ってきています。
mapで変換。
@Test public void fluxMap() { List<String> received = new ArrayList<>(); Flux.fromIterable(Arrays.asList("Hello", "World", "Reactor", "Flux")) .map(s -> "★" + s + "★") .subscribe(s -> received.add(s)); assertThat(received).containsExactly("★Hello★", "★World★", "★Reactor★", "★Flux★"); }
takeで、最初の2つを選ぶ。
@Test public void fluxTake() { List<String> received = new ArrayList<>(); Flux.fromIterable(Arrays.asList("Hello", "World", "Reactor", "Flux")) .take((2)) .subscribe(s -> received.add(s)); assertThat(received).containsExactly("Hello", "World"); }
filterで絞り込み。
@Test public void fluxFilter() { List<String> received = new ArrayList<>(); Flux.fromIterable(Arrays.asList("Hello", "World", "Reactor", "Flux")) .filter(s -> s.contains("e")) .subscribe(s -> received.add(s)); assertThat(received).containsExactly("Hello", "Reactor"); }
bufferで、複数の要素をまとめてsubscribe時にListで受け取る。
@Test public void fluxBuffer() { List<List<String>> received = new ArrayList<>(); Flux.fromIterable(Arrays.asList("Hello", "World", "Reactor", "Flux")) .buffer(2) .subscribe(strings -> received.add(strings)); assertThat(received).containsExactly(Arrays.asList("Hello", "World"), Arrays.asList("Reactor", "Flux")); }
2つのFluxをマージ。
@Test public void fluxMergeWith() { List<String> received = new ArrayList<>(); Flux.fromIterable(Arrays.asList("Hello", "World", "Reactor", "Flux")) .mergeWith(Flux.fromIterable(Arrays.asList("A", "B", "C", "D"))) .subscribe(s -> received.add(s)); assertThat(received).containsExactly("Hello", "World", "Reactor", "Flux", "A", "B", "C", "D"); }
2つのFluxをzipで結合。
@Test public void fluxZip() { List<String> received = new ArrayList<>(); Flux.fromIterable(Arrays.asList("Hello", "World", "Reactor", "Flux")) .zipWith(Flux.fromIterable(Arrays.asList("A", "B", "C", "D"))) .subscribe(tuple -> received.add(tuple.getT1() + "-" + tuple.getT2())); assertThat(received).containsExactly("Hello-A", "World-B", "Reactor-C", "Flux-D"); }
もちろん、これらの操作を組み合わせて使うこともできます。
@Test public void fluxUsing() { List<String> received = new ArrayList<>(); Flux.fromIterable(Arrays.asList("Hello", "World", "Reactor", "Flux")) .take(2) .map(s -> "★" + s + "★") .mergeWith(Flux.fromIterable(Arrays.asList("A", "B"))) .buffer(2) .subscribe(strings -> received.add(strings.get(0) + "-" + strings.get(1))); assertThat(received).containsExactly("★Hello★-★World★", "A-B"); }
また、subscribeで受け取るだけではなく、Iterableなどの形で戻り値としても返却することも可能です。
@Test public void fluxBlock() { Iterable<String> result = Flux.fromIterable(Arrays.asList("Hello", "World", "Reactor", "Flux")) .map(s -> "★" + s + "★") .toIterable(); assertThat(result).containsExactly("★Hello★", "★World★", "★Reactor★", "★Flux★"); }
ブロックするのは嫌われるでしょうから、あんまりやらないでしょうけれど。
他にもいろいろFluxに関する操作はあるみたいですが、とりあえずこのくらいで。
Mono
続いてMono。
今回は、Mono#createでインスタンスを作成。
@Test public void mono() { AtomicReference<String> result = new AtomicReference<>(); Mono.fromSupplier(() -> "Hello World") .subscribe(s -> result.set(s)); assertThat(result.get()).isEqualTo("Hello World"); }
Fluxのようにmapなどの操作が可能です。
@Test public void monoMap() { AtomicReference<String> result = new AtomicReference<>(); Mono.fromSupplier(() -> "Hello World") .map(s -> "★" + s + "★") .subscribe(s -> result.set(s)); assertThat(result.get()).isEqualTo("★Hello World★"); }
subscribeではなく、結果を戻り値として受け取ることもできます。
@Test public void monoResult() { String result = Mono.fromSupplier(() -> "Hello World") .map(s -> "★" + s + "★") .block(); assertThat(result).isEqualTo("★Hello World★"); }
blockと、いかにも使うのをためらいそうな名前ですが。
FluxとMonoの変換
FluxとMonoは、両者の変換が可能です。
たとえば、FluxからMonoにするには、Flux#nextなどが使えます。
@Test public void fluxToMono() { Mono<String> mono = Flux.fromIterable(Arrays.asList("Hello", "World", "Reactor", "Flux")) .next(); assertThat(mono.block()).isEqualTo("Hello"); }
他にもlastや、単一要素のみ保持している場合にはsingleが使えたりします。
Flux#collectを使って、単一の要素にしてMonoにすることもできます。
@Test public void fluxToMonoCollect() { Mono<String> mono = Flux.fromIterable(Arrays.<CharSequence>asList("Hello", "World", "Reactor", "Flux")) .collect(Collectors.joining("-")); assertThat(mono.block()).isEqualTo("Hello-World-Reactor-Flux"); }
そして、MonoからFlux。今回は、Mono#fluxで変換。
@Test public void monoToFlux() { Flux<String> flux = Mono.fromSupplier(() -> "Hello World") .flux(); assertThat(flux.toIterable()).containsExactly("Hello World"); }
この他、Mono#flatMapでFluxにすることもできるようです。
@Test public void monoToFluxFlatMap() { Flux<String> flux = Mono.fromSupplier(() -> "Hello World") .flatMap(s -> Flux.fromIterable(Arrays.asList(s))); assertThat(flux.toIterable()).containsExactly("Hello World"); }