CLOVER🍀

That was when it all began.

はじめてのReactor Core

Reacive〜と名の付くものに関する話題について、そろそろ少しずつ追ってみようかなということで。

とりあえず、触りつつ試しつつ始めてみようと思い、まずはReactorを触ってみることにしました。

Project Reactor

GitHub - reactor/reactor: Reactor BOM and legacy (2.x, 1.x)

Reactorとは、Pivotalが開発しているReactive Streamsの実装っぽいです。

Reactive Webアプリケーション - そしてSpring 5へ #jjug_ccc #ccc_ef3

まあ、いろいろ難しいことは抜きにして、単純に動かしてみましょう。

準備

今回は、Reactor Coreを使ってコードを書いて試すことにします。

GitHub - reactor/reactor-core: Non-Blocking Reactive Foundation for the JVM

Reactor Coreは3.0.0.RC2が最新のようですが、Maven Centralにはまだないので、SpringのMilestoneリポジトリを加えます。

        <repository>
            <id>spring-milestone</id>
            <name>Spring Milestone Repository</name>
            <url>https://repo.spring.io/milestone</url>
        </repository>

あとは、Reactor Coreを動作確認用のテストライブラリをMaven依存関係に記述。

        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-core</artifactId>
            <version>3.0.0.RC2</version>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.assertj</groupId>
            <artifactId>assertj-core</artifactId>
            <version>3.5.2</version>
            <scope>test</scope>
        </dependency>

はじめてのReactor

とりあえずまったく経験がないのですが、Reactor Coreのドキュメントや軽く調べてみるとFluxとMonoというものを使ってみるようです。

Getting Started

これを書いた後に気付きましたが、ハンズオンを見たらよかったかも…。
GitHub - reactor/lite-rx-api-hands-on: Lite Rx API Hands-On with Reactor Core 3

その他、参考。
Reactor Core 2.5: もう一つのJava向けReactive Extensions実装

JavaでReactiveプログラミング ~Reactor~

感覚的には、Fluxが複数の値を扱えるもので、Monoが単発の値を扱えるものみたいですね。これをStreamによく似たAPIで扱える感じです。


src/test/java/org/littlewings/reactor/helloworld/HelloReactor.java

package org.littlewings.reactor.helloworld;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import static org.assertj.core.api.Assertions.assertThat;

public class HelloReactor {
    // ここに、テストを書く!
}

Flux

では、まずはFluxから。今回は、ListからFluxを作成して、Subscirbeで受け取ってみます。

    @Test
    public void flux() {
        List<String> received = new ArrayList<>();

        Flux.fromIterable(Arrays.asList("Hello", "World", "Reactor", "Flux"))
                .subscribe(s -> received.add(s));

        assertThat(received).containsExactly("Hello", "World", "Reactor", "Flux");
    }

単純に、Fluxを作成した時の値がそのまま渡ってきています。

mapで変換。

    @Test
    public void fluxMap() {
        List<String> received = new ArrayList<>();

        Flux.fromIterable(Arrays.asList("Hello", "World", "Reactor", "Flux"))
                .map(s -> "★" + s + "★")
                .subscribe(s -> received.add(s));

        assertThat(received).containsExactly("★Hello★", "★World★", "★Reactor★", "★Flux★");
    }

takeで、最初の2つを選ぶ。

    @Test
    public void fluxTake() {
        List<String> received = new ArrayList<>();

        Flux.fromIterable(Arrays.asList("Hello", "World", "Reactor", "Flux"))
                .take((2))
                .subscribe(s -> received.add(s));

        assertThat(received).containsExactly("Hello", "World");
    }

filterで絞り込み。

    @Test
    public void fluxFilter() {
        List<String> received = new ArrayList<>();

        Flux.fromIterable(Arrays.asList("Hello", "World", "Reactor", "Flux"))
                .filter(s -> s.contains("e"))
                .subscribe(s -> received.add(s));

        assertThat(received).containsExactly("Hello", "Reactor");

    }

bufferで、複数の要素をまとめてsubscribe時にListで受け取る。

    @Test
    public void fluxBuffer() {
        List<List<String>> received = new ArrayList<>();

        Flux.fromIterable(Arrays.asList("Hello", "World", "Reactor", "Flux"))
                .buffer(2)
                .subscribe(strings -> received.add(strings));

        assertThat(received).containsExactly(Arrays.asList("Hello", "World"), Arrays.asList("Reactor", "Flux"));
    }

2つのFluxをマージ。

    @Test
    public void fluxMergeWith() {
        List<String> received = new ArrayList<>();

        Flux.fromIterable(Arrays.asList("Hello", "World", "Reactor", "Flux"))
                .mergeWith(Flux.fromIterable(Arrays.asList("A", "B", "C", "D")))
                .subscribe(s -> received.add(s));

        assertThat(received).containsExactly("Hello", "World", "Reactor", "Flux", "A", "B", "C", "D");
    }

2つのFluxをzipで結合。

    @Test
    public void fluxZip() {
        List<String> received = new ArrayList<>();

        Flux.fromIterable(Arrays.asList("Hello", "World", "Reactor", "Flux"))
                .zipWith(Flux.fromIterable(Arrays.asList("A", "B", "C", "D")))
                .subscribe(tuple -> received.add(tuple.getT1() + "-" + tuple.getT2()));

        assertThat(received).containsExactly("Hello-A", "World-B", "Reactor-C", "Flux-D");
    }

もちろん、これらの操作を組み合わせて使うこともできます。

    @Test
    public void fluxUsing() {
        List<String> received = new ArrayList<>();

        Flux.fromIterable(Arrays.asList("Hello", "World", "Reactor", "Flux"))
                .take(2)
                .map(s -> "★" + s + "★")
                .mergeWith(Flux.fromIterable(Arrays.asList("A", "B")))
                .buffer(2)
                .subscribe(strings -> received.add(strings.get(0) + "-" + strings.get(1)));

        assertThat(received).containsExactly("★Hello★-★World★", "A-B");
    }

また、subscribeで受け取るだけではなく、Iterableなどの形で戻り値としても返却することも可能です。

    @Test
    public void fluxBlock() {
        Iterable<String> result = Flux.fromIterable(Arrays.asList("Hello", "World", "Reactor", "Flux"))
                .map(s -> "★" + s + "★")
                .toIterable();

        assertThat(result).containsExactly("★Hello★", "★World★", "★Reactor★", "★Flux★");
    }

ブロックするのは嫌われるでしょうから、あんまりやらないでしょうけれど。

他にもいろいろFluxに関する操作はあるみたいですが、とりあえずこのくらいで。

Mono

続いてMono。

今回は、Mono#createでインスタンスを作成。

    @Test
    public void mono() {
        AtomicReference<String> result = new AtomicReference<>();

        Mono.fromSupplier(() -> "Hello World")
                .subscribe(s -> result.set(s));

        assertThat(result.get()).isEqualTo("Hello World");
    }

Fluxのようにmapなどの操作が可能です。

    @Test
    public void monoMap() {
        AtomicReference<String> result = new AtomicReference<>();

        Mono.fromSupplier(() -> "Hello World")
                .map(s -> "★" + s + "★")
                .subscribe(s -> result.set(s));

        assertThat(result.get()).isEqualTo("★Hello World★");
    }

subscribeではなく、結果を戻り値として受け取ることもできます。

    @Test
    public void monoResult() {
        String result = Mono.fromSupplier(() -> "Hello World")
                .map(s -> "★" + s + "★")
                .block();

        assertThat(result).isEqualTo("★Hello World★");
    }

blockと、いかにも使うのをためらいそうな名前ですが。

FluxとMonoの変換

FluxとMonoは、両者の変換が可能です。

たとえば、FluxからMonoにするには、Flux#nextなどが使えます。

    @Test
    public void fluxToMono() {
        Mono<String> mono = Flux.fromIterable(Arrays.asList("Hello", "World", "Reactor", "Flux"))
                .next();

        assertThat(mono.block()).isEqualTo("Hello");
    }

他にもlastや、単一要素のみ保持している場合にはsingleが使えたりします。

Flux#collectを使って、単一の要素にしてMonoにすることもできます。

    @Test
    public void fluxToMonoCollect() {
        Mono<String> mono = Flux.fromIterable(Arrays.<CharSequence>asList("Hello", "World", "Reactor", "Flux"))
                .collect(Collectors.joining("-"));

        assertThat(mono.block()).isEqualTo("Hello-World-Reactor-Flux");
    }

そして、MonoからFlux。今回は、Mono#fluxで変換。

    @Test
    public void monoToFlux() {
        Flux<String> flux = Mono.fromSupplier(() -> "Hello World")
                .flux();

        assertThat(flux.toIterable()).containsExactly("Hello World");
    }

この他、Mono#flatMapでFluxにすることもできるようです。

    @Test
    public void monoToFluxFlatMap() {
        Flux<String> flux = Mono.fromSupplier(() -> "Hello World")
                .flatMap(s -> Flux.fromIterable(Arrays.asList(s)));

        assertThat(flux.toIterable()).containsExactly("Hello World");
    }

まとめ

とりえあずAPIに触れる程度の意味で、Reactorを試してみました。

まだ「Streamっぽいなー」くらいにしか思っていないのですが、これからちょっとずつ調べていこうかなと思います。