CLOVER🍀

That was when it all began.

Reactor CoreのSchedulersとParallelFluxで遊ぶ

Reactorをちょっとずつ触ってみようという話、次はSchedulersとParallelFluxということで。

Schedulers

https://github.com/reactor/reactor-core#parallelflux=title=ParallelFlux

まあ、GitHubに書かれている内容を上から試していってるだけなんですけど。

Schedulersではタスクをどのスケジューラー上(実際はスレッドプールっぽいですけど)で実行するか、ParallelFluxではタスクを分割しての並行実行についてをテーマとしているみたいです。

とりあえず、使ってみましょう。

準備

Maven依存関係は、以下のとおり。

        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-core</artifactId>
            <version>3.0.0.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>

そういえば、Reactor Coreが3.0.0.RELEASEになりましたね。

特に値の検証とかはやらないんですけど、テストコードで実行するのでJUnitを入れています。

テストコードの全体像は、以下のようにしています。
src/test/java/org/littlewings/reactor/schedulerparallelism/ReactorSchedulerParallelismTest.java

package org.littlewings.reactor.schedulerparallelism;

import java.time.Duration;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.util.concurrent.TimeUnit;

import org.junit.Test;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

public class ReactorSchedulerParallelismTest {
    // ここにテストを書く!
}

Schedulers

では、まずはGitHub上のサンプルに習ってSchedulersから。

こんなコードを用意。

    @Test
    public void schedulers() throws InterruptedException {
        Mono.fromCallable(() -> LocalDateTime.now())
                .repeat()
                .publishOn(Schedulers.single())
                .flatMap(dateTime ->
                        Mono.fromCallable(() -> {
                            TimeUnit.SECONDS.sleep(1L);
                            return dateTime;
                        }).subscribeOn(Schedulers.parallel()), 8)
                .subscribe(dateTime ->
                        System.out.printf("[%s] time -> %s%n", Thread.currentThread().getName(), dateTime)
                );

        TimeUnit.SECONDS.sleep(3L);
    }

これを実行すると、こんな結果になります。

[parallel-3] time -> 2016-08-27T18:03:40.819
[parallel-3] time -> 2016-08-27T18:03:40.820
[parallel-3] time -> 2016-08-27T18:03:40.820
[parallel-4] time -> 2016-08-27T18:03:40.820
[parallel-4] time -> 2016-08-27T18:03:40.820
[parallel-4] time -> 2016-08-27T18:03:40.820
[parallel-4] time -> 2016-08-27T18:03:40.820
[parallel-4] time -> 2016-08-27T18:03:40.820
[parallel-2] time -> 2016-08-27T18:03:40.820
[parallel-2] time -> 2016-08-27T18:03:40.820
[parallel-7] time -> 2016-08-27T18:03:40.820
[parallel-7] time -> 2016-08-27T18:03:40.820
[parallel-7] time -> 2016-08-27T18:03:40.820
[parallel-7] time -> 2016-08-27T18:03:40.820
[parallel-7] time -> 2016-08-27T18:03:40.820
[parallel-1] time -> 2016-08-27T18:03:40.820
[parallel-1] time -> 2016-08-27T18:03:40.820

〜省略〜

書いたコードがどういう意味か?という話ですが、Mono#fromCallableで現在時刻を取得する処理をrepeatで繰り返します。

        Mono.fromCallable(() -> LocalDateTime.now())
                .repeat()

それをシングルスレッド上(Schedulers#single)で動かします。

                .publishOn(Schedulers.single())

次に、FlatMapで1度sleepを置いていますが、この時に並列度8にしてSchedulers#parallelで並列実行可能にします。ログ出力している時に、並列っぽく動いているのはこのためですね。

                .flatMap(dateTime ->
                        Mono.fromCallable(() -> {
                            TimeUnit.SECONDS.sleep(1L);
                            return dateTime;
                        }).subscribeOn(Schedulers.parallel()), 8)

最後に、現在のスレッド名と共にMono#fromCallableで取得したLocalDateTimeを出力しています。

                .subscribe(dateTime ->
                        System.out.printf("[%s] time -> %s%n", Thread.currentThread().getName(), dateTime)
                );

ParallelFlux

続いて、ParallelFlux。

    @Test
    public void parallelFlux() throws InterruptedException {
        Mono.fromCallable(() -> LocalDateTime.now())
                .repeat()
                .parallel(8) // parallelism
                .runOn(Schedulers.parallel())
                .sequential()
                .subscribe(dateTime ->
                        System.out.printf("[%s] time -> %s%n", Thread.currentThread().getName(), dateTime)
                );

        TimeUnit.SECONDS.sleep(3L);
    }

コードとしてはParallelFluxという名前は出てきていませんが、Mono#repeatを呼んだ後のFlux#parallelの後からは、ParallelFluxになっています。

こちらを実行すると、こんな結果になります。

[parallel-1] time -> 2016-08-27T18:17:56.441
[parallel-1] time -> 2016-08-27T18:17:56.439
[parallel-1] time -> 2016-08-27T18:17:56.441
[parallel-1] time -> 2016-08-27T18:17:56.432
[parallel-1] time -> 2016-08-27T18:17:56.441
[parallel-1] time -> 2016-08-27T18:17:56.437
[parallel-1] time -> 2016-08-27T18:17:56.443
[parallel-1] time -> 2016-08-27T18:17:56.441
[parallel-1] time -> 2016-08-27T18:17:56.439
[parallel-1] time -> 2016-08-27T18:17:56.441
[parallel-1] time -> 2016-08-27T18:17:56.432
[parallel-1] time -> 2016-08-27T18:17:56.441
[parallel-1] time -> 2016-08-27T18:17:56.437
[parallel-1] time -> 2016-08-27T18:17:56.443
[parallel-1] time -> 2016-08-27T18:17:56.441
[parallel-1] time -> 2016-08-27T18:17:56.439
[parallel-1] time -> 2016-08-27T18:17:56.441
[parallel-1] time -> 2016-08-27T18:17:56.432
[parallel-1] time -> 2016-08-27T18:17:56.441
[parallel-1] time -> 2016-08-27T18:17:56.437
[parallel-1] time -> 2016-08-27T18:17:56.443
[parallel-1] time -> 2016-08-27T18:17:56.441
[parallel-1] time -> 2016-08-27T18:17:56.439

〜省略〜

スレッド名、ひとつだけですね…。このあたりはもう少し後で見てみましょう。

組んだコードを見直してみます。Mono#fromCallable、repeatの後に、並列度8でSchedulers#parallel上で動作するように設定します。

        Mono.fromCallable(() -> LocalDateTime.now())
                .repeat()
                .parallel(8) // parallelism
                .runOn(Schedulers.parallel())

その結果を、シーケンシャルにsubscribeするように設定しています。

                .sequential()

ちなみに、sequencialという名前ですが、結果の渡ってくる順番については"unordered"なのでご注意を。実際にログを見たらわかりますが、LocalDateTime#nowの順番で並ぶわけではありません。

で、最後に結果を出力、と。

                .subscribe(dateTime ->
                        System.out.printf("[%s] time -> %s%n", Thread.currentThread().getName(), dateTime)
                );

なるほど。とりあえず、GitHubに載っているものは試しました。

でも、これだけでは面白くありません。もう少しいろいろやってみましょう。

ParallelFlux#groups

先ほどのParallelFluxの例でしたが、sequentialを使いました。もうひとつ、ParallelFlux#groupsを使う方法もあるようです。

使ってみたコードが、こちら。

    @Test
    public void parallelFluxGroups() throws InterruptedException {
        Mono.fromCallable(() -> LocalDateTime.now())
                .repeat()
                .parallel(8) // parallelism
                .runOn(Schedulers.parallel())
                .groups()
                .subscribe(grouped ->
                        grouped.subscribe(dateTime ->
                                System.out.printf("[%s] group[%s] time -> %s%n", Thread.currentThread().getName(), grouped.key(), dateTime)
                        )
                );

        TimeUnit.SECONDS.sleep(3L);
    }

結果は、このように。

[parallel-1] group[0] time -> 2016-08-27T18:23:52.936
[parallel-1] group[0] time -> 2016-08-27T18:23:52.936
[parallel-1] group[0] time -> 2016-08-27T18:23:52.936
[parallel-1] group[0] time -> 2016-08-27T18:23:52.936

〜省略〜

[parallel-3] group[2] time -> 2016-08-27T18:23:52.895
[parallel-3] group[2] time -> 2016-08-27T18:23:52.895
[parallel-3] group[2] time -> 2016-08-27T18:23:52.895
[parallel-3] group[2] time -> 2016-08-27T18:23:52.895
[parallel-3] group[2] time -> 2016-08-27T18:23:52.895
[parallel-3] group[2] time -> 2016-08-27T18:23:52.895

〜省略〜

[parallel-4] group[3] time -> 2016-08-27T18:23:53.070
[parallel-4] group[3] time -> 2016-08-27T18:23:53.070
[parallel-4] group[3] time -> 2016-08-27T18:23:53.070
[parallel-7] group[6] time -> 2016-08-27T18:23:53.042
[parallel-7] group[6] time -> 2016-08-27T18:23:53.042
[parallel-2] group[1] time -> 2016-08-27T18:23:53.068
[parallel-2] group[1] time -> 2016-08-27T18:23:53.068
[parallel-2] group[1] time -> 2016-08-27T18:23:53.068
[parallel-2] group[1] time -> 2016-08-27T18:23:53.068
[parallel-2] group[1] time -> 2016-08-27T18:23:53.068

とまあ、並列っぽく動いている感じになります。

先ほどのsequentialの例から変わったのは、ここだけですね。

                .groups()
                .subscribe(grouped ->
                        grouped.subscribe(dateTime ->
                                System.out.printf("[%s] group[%s] time -> %s%n", Thread.currentThread().getName(), grouped.key(), dateTime)
                        )

sequentialの部分をgroupsに変えています。すると、そこから返るParallelFluxが

Flux<GroupedFlux<java.lang.Integer,T>>

となります。subscribe時に、GroupedFluxに対してさらにsubscribeすることになります。GroupedFluxもFluxです。

groupsを使うと、グループ単位でsubscribe部の並列実行が可能になるようですね。

delay

先ほどの例では、開始とともにMono#fromCallableに対してsubscribeにし行っていたのですが、Mono#delay〜を使用することでそのタイミングをコントロールすることができます。

今回は、500ミリ秒おきにsubscribeしてみましょう。

    @Test
    public void schedulersDelayed() throws InterruptedException {
        Mono.fromCallable(() -> LocalDateTime.now())
                .delaySubscription(Duration.of(500L, ChronoUnit.MILLIS))
                .repeat()
                .parallel(8)
                .runOn(Schedulers.parallel())
                .subscribe(dateTime ->
                        System.out.printf("[%s] time -> %s%n", Thread.currentThread().getName(), dateTime)
                );

        TimeUnit.SECONDS.sleep(5L);
    }

結果。今回は、これで全部です。subscribe時も、ParallelFlux#runOnで割り当てたスケジューラ上で各スレッドで実行されているようですね。

[parallel-1] time -> 2016-08-27T18:31:03.225
[parallel-2] time -> 2016-08-27T18:31:03.727
[parallel-3] time -> 2016-08-27T18:31:04.228
[parallel-4] time -> 2016-08-27T18:31:04.728
[parallel-5] time -> 2016-08-27T18:31:05.229
[parallel-6] time -> 2016-08-27T18:31:05.730
[parallel-7] time -> 2016-08-27T18:31:06.231
[parallel-8] time -> 2016-08-27T18:31:06.732
[parallel-1] time -> 2016-08-27T18:31:07.233

Schedulerとスレッド

1番最初に乗せた例はflatMapなどいろいろあったので、もっと単純にしてみましょう。

    @Test
    public void fromCallThreadSingle() throws InterruptedException {
        Mono.fromCallable(() -> Entry.of(Thread.currentThread().getName(), LocalDateTime.now()))
                .repeat()
                // .publishOn(Schedulers.single())
                .subscribe(entry ->
                        System.out.printf("[%s] entry -> %s%n", Thread.currentThread().getName(), entry)
                );

        TimeUnit.SECONDS.sleep(3L);
    }

ただ、ここからはMono#fromCallableがどのスレッドで呼ばれたかも見ていくことにします。

        Mono.fromCallable(() -> Entry.of(Thread.currentThread().getName(), LocalDateTime.now()))

Entryというクラスの定義は、こんな感じです。
src/test/java/org/littlewings/reactor/schedulerparallelism/Entry.java

package org.littlewings.reactor.schedulerparallelism;

import java.time.LocalDateTime;

public class Entry {
    private String name;
    private LocalDateTime time;

    public static Entry of(String name, LocalDateTime time) {
        Entry entry = new Entry();
        entry.name = name;
        entry.time = time;
        return entry;
    }

    public String getName() {
        return name;
    }

    public LocalDateTime getTime() {
        return time;
    }

    @Override
    public String toString() {
        return String.format("thread[%s], time[%s]", name, time);
    }
}

で、動かしてみると全部mainスレッド上で動き出します。

[main] entry -> thread[main], time[2016-08-27T18:34:38.095]
[main] entry -> thread[main], time[2016-08-27T18:34:38.095]
[main] entry -> thread[main], time[2016-08-27T18:34:38.095]
[main] entry -> thread[main], time[2016-08-27T18:34:38.095]
[main] entry -> thread[main], time[2016-08-27T18:34:38.095]
[main] entry -> thread[main], time[2016-08-27T18:34:38.095]
[main] entry -> thread[main], time[2016-08-27T18:34:38.095]
[main] entry -> thread[main], time[2016-08-27T18:34:38.095]

〜省略〜

ここで、repeatの後のFlux#publishOnにSchedulers#singleを渡しているところのコメントアウトを外すと

                .repeat()
                .publishOn(Schedulers.single())

こうなります。

[single-1] entry -> thread[single-1], time[2016-08-27T18:35:12.752]
[single-1] entry -> thread[single-1], time[2016-08-27T18:35:12.752]
[single-1] entry -> thread[single-1], time[2016-08-27T18:35:12.752]
[single-1] entry -> thread[single-1], time[2016-08-27T18:35:12.752]
[single-1] entry -> thread[single-1], time[2016-08-27T18:35:12.752]

どのスケジューラー上で動かすかは、両方のスレッドに影響するんですね、と。

続いて、ParallelFlux。

    @Test
    public void fromCallThreadParallel() throws InterruptedException {
        Mono.fromCallable(() -> Entry.of(Thread.currentThread().getName(), LocalDateTime.now()))
                .repeat()
                .parallel(8)
                .runOn(Schedulers.parallel())
                .subscribe(entry ->
                        System.out.printf("[%s] entry -> %s%n", Thread.currentThread().getName(), entry)
                );

        TimeUnit.SECONDS.sleep(3L);
    }

このサンプルの場合、こういう結果になります。

[parallel-3] entry -> thread[parallel-3], time[2016-08-27T18:38:53.333]
[parallel-3] entry -> thread[parallel-3], time[2016-08-27T18:38:53.333]
[parallel-3] entry -> thread[parallel-3], time[2016-08-27T18:38:53.333]
[parallel-3] entry -> thread[parallel-3], time[2016-08-27T18:38:53.333]
[parallel-3] entry -> thread[parallel-3], time[2016-08-27T18:38:53.333]
[parallel-3] entry -> thread[parallel-3], time[2016-08-27T18:38:53.333]

〜省略〜

[parallel-4] entry -> thread[parallel-4], time[2016-08-27T18:38:53.317]
[parallel-4] entry -> thread[parallel-4], time[2016-08-27T18:38:53.317]
[parallel-4] entry -> thread[parallel-4], time[2016-08-27T18:38:53.317]
[parallel-4] entry -> thread[parallel-4], time[2016-08-27T18:38:53.317]
[parallel-4] entry -> thread[parallel-4], time[2016-08-27T18:38:53.317]

〜省略〜

[parallel-6] entry -> thread[parallel-2], time[2016-08-27T18:38:53.379]
[parallel-6] entry -> thread[parallel-2], time[2016-08-27T18:38:53.379]
[parallel-6] entry -> thread[parallel-2], time[2016-08-27T18:38:53.379]
[parallel-6] entry -> thread[parallel-6], time[2016-08-27T18:38:53.381]
[parallel-6] entry -> thread[parallel-6], time[2016-08-27T18:38:53.381]
[parallel-6] entry -> thread[parallel-6], time[2016-08-27T18:38:53.381]

こちらも、Mono#fromCallableとsubscribeの両方の動作に使用されるスレッドに影響があることがわかりました。

なお、ParallelFlux#parallelで並列度を指定する他、Schedulerも並列度が指定できるのですが、両方指定している場合はParallelFlux#parallelで指定した並列度で動作するようです。そりゃあそうか、という気はしますね…。

Schedulerとdelayとスレッド

delayさせてみましょう。

    @Test
    public void fromCallThreadSingleWithDelayed() throws InterruptedException {
        Mono.fromCallable(() -> Entry.of(Thread.currentThread().getName(), LocalDateTime.now()))
                .delaySubscription(Duration.of(500L, ChronoUnit.MILLIS))
                .repeat()
                // .publishOn(Schedulers.single())
                .subscribe(entry ->
                        System.out.printf("[%s] entry -> %s%n", Thread.currentThread().getName(), entry)
                );

        TimeUnit.SECONDS.sleep(3L);
    }

結果は、こんな感じ。まあ、事実上singleなので、そうですねと。

[timer-1] entry -> thread[timer-1], time[2016-08-27T18:41:45.534]
[timer-1] entry -> thread[timer-1], time[2016-08-27T18:41:46.036]
[timer-1] entry -> thread[timer-1], time[2016-08-27T18:41:46.537]
[timer-1] entry -> thread[timer-1], time[2016-08-27T18:41:47.038]
[timer-1] entry -> thread[timer-1], time[2016-08-27T18:41:47.539]

続いてParallelFlux。

    @Test
    public void fromCallThreadParallelWithDelayed() throws InterruptedException {
        Mono.fromCallable(() -> Entry.of(Thread.currentThread().getName(), LocalDateTime.now()))
                .delaySubscription(Duration.of(500L, ChronoUnit.MILLIS))
                .repeat()
                .parallel(8)
                .runOn(Schedulers.parallel())
                .subscribe(entry ->
                        System.out.printf("[%s] entry -> %s%n", Thread.currentThread().getName(), entry)
                );

        TimeUnit.SECONDS.sleep(3L);
    }

こちらでは、Mono#fromCallableを呼ぶスレッドがシングルスレッドになり、subscribe側で各スレッドが使われるようになります。

[parallel-1] entry -> thread[timer-1], time[2016-08-27T18:43:01.802]
[parallel-2] entry -> thread[timer-1], time[2016-08-27T18:43:02.305]
[parallel-3] entry -> thread[timer-1], time[2016-08-27T18:43:02.806]
[parallel-4] entry -> thread[timer-1], time[2016-08-27T18:43:03.306]
[parallel-5] entry -> thread[timer-1], time[2016-08-27T18:43:03.807]

まとめ

Reactor Coreで、SchedulersとParallelFluxをちょっとかじってみました。

興味本位でいくつかパターンを試してみましたが、けっこう挙動が変わるんですね。ちょっとずつ慣れていきましょう。