前々からReactorのSchedulerというものをぼやっとしか見てこなかったのですが、ちょうどそういう時期にこちらの
エントリが書かれたので、ちょうどよい機会だなぁと思って自分でも見てみることにしました。
ReactorでN+1問題な処理を実装してみた話 - 谷本 心 in せろ部屋
こちらのエントリでは、「ノンブロッキングでN+1にチャレンジ」という話なのですが、今回はもうちょっとSchedulerに
焦点を当てたいと思います。
準備
まあ、いろいろ言う前に、とりあえず簡単に動かすところからやっていこうと思います。
まずは準備から。Maven依存関係は、こちら。
<dependencyManagement> <dependencies> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-bom</artifactId> <version>Bismuth-SR4</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-core</artifactId> </dependency> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.2.3</version> </dependency> <dependency> <groupId>org.junit.platform</groupId> <artifactId>junit-platform-launcher</artifactId> <version>1.0.2</version> <scope>test</scope> </dependency> <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter-engine</artifactId> <version>5.0.2</version> <scope>test</scope> </dependency> </dependencies>
テストコード用に、JUnit 5を足しています(Maven Surefire Pluginの部分は端折ります)。
あとLogbackも足しているので、簡単な設定ファイルを用意しておきます。
src/test/resources/logback.xml
<?xml version="1.0" encoding="UTF-8"?> <configuration> <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> <encoder> <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger - %msg%n</pattern> </encoder> </appender> <root level="debug"> <appender-ref ref="STDOUT" /> </root> </configuration>
テストコードの雛形は、こんな感じ。
src/test/java/org/littlewings/reactor/SchedulersTest.java
package org.littlewings.reactor; import java.time.Duration; import java.util.concurrent.TimeUnit; import java.util.stream.IntStream; import org.junit.jupiter.api.Test; import reactor.core.publisher.Flux; import reactor.core.scheduler.Schedulers; import reactor.test.StepVerifier; import reactor.util.function.Tuple2; public class SchedulersTest { void sw(Runnable runnable) { long startTime = System.nanoTime(); runnable.run(); long elapsedTime = System.nanoTime() - startTime; long elapsedTimeInMillis = TimeUnit.MILLISECONDS.convert(elapsedTime, TimeUnit.NANOSECONDS); System.out.printf("elapsed time = %1$,3d msec%n", elapsedTimeInMillis); } // ここに、テストを書く! }
一応、時間を計測するためのメソッドも用意。
動かしてみる
さて、まずは単純な例から。こういうコードを用意。
@Test public void simpleFlux() { sw(() -> { Flux<Integer> flux = Flux.fromStream(IntStream.rangeClosed(1, 50).boxed()) .log(); StepVerifier .create(flux) .expectNextCount(49) .expectNext(50) .verifyComplete(); }); }
こんな結果になります。
20:00:42.178 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework 20:00:42.256 [main] INFO reactor.Flux.Stream.1 - | onSubscribe([Synchronous Fuseable] FluxIterable.IterableSubscription) 20:00:42.262 [main] INFO reactor.Flux.Stream.1 - | request(unbounded) 20:00:42.263 [main] INFO reactor.Flux.Stream.1 - | onNext(1) 20:00:42.263 [main] INFO reactor.Flux.Stream.1 - | onNext(2) 20:00:42.263 [main] INFO reactor.Flux.Stream.1 - | onNext(3) 20:00:42.263 [main] INFO reactor.Flux.Stream.1 - | onNext(4) 20:00:42.263 [main] INFO reactor.Flux.Stream.1 - | onNext(5) 〜省略〜 20:00:42.267 [main] INFO reactor.Flux.Stream.1 - | onNext(46) 20:00:42.267 [main] INFO reactor.Flux.Stream.1 - | onNext(47) 20:00:42.267 [main] INFO reactor.Flux.Stream.1 - | onNext(48) 20:00:42.267 [main] INFO reactor.Flux.Stream.1 - | onNext(49) 20:00:42.267 [main] INFO reactor.Flux.Stream.1 - | onNext(50) 20:00:42.268 [main] INFO reactor.Flux.Stream.1 - | onComplete() elapsed time = 470 msec
次に、Flux#withIntervalを使ってちょっとデータ生成に時間をかけるようにしてみましょう。
@Test public void withInterval() { sw(() -> { Flux<Integer> flux = Flux.interval(Duration.ofMillis(100L)) .zipWith(Flux.fromStream(IntStream.rangeClosed(1, 50).boxed())) .log() .map(Tuple2::getT2); StepVerifier .create(flux) .expectNextCount(49) .expectNext(50) .verifyComplete(); }); }
ログ。
20:05:17.533 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework 20:05:17.596 [main] INFO reactor.Flux.Zip.1 - onSubscribe(FluxZip.ZipCoordinator) 20:05:17.602 [main] INFO reactor.Flux.Zip.1 - request(unbounded) 20:05:17.711 [parallel-1] INFO reactor.Flux.Zip.1 - onNext([0,1]) 20:05:17.809 [parallel-1] INFO reactor.Flux.Zip.1 - onNext([1,2]) 20:05:17.910 [parallel-1] INFO reactor.Flux.Zip.1 - onNext([2,3]) 20:05:18.010 [parallel-1] INFO reactor.Flux.Zip.1 - onNext([3,4]) 20:05:18.109 [parallel-1] INFO reactor.Flux.Zip.1 - onNext([4,5]) 〜省略〜 20:05:22.309 [parallel-1] INFO reactor.Flux.Zip.1 - onNext([46,47]) 20:05:22.409 [parallel-1] INFO reactor.Flux.Zip.1 - onNext([47,48]) 20:05:22.510 [parallel-1] INFO reactor.Flux.Zip.1 - onNext([48,49]) 20:05:22.609 [parallel-1] INFO reactor.Flux.Zip.1 - onNext([49,50]) 20:05:22.613 [parallel-1] INFO reactor.Flux.Zip.1 - onComplete() elapsed time = 5,411 msec
50要素を、100msecごとに…5秒ちょっと。
ちょっとN+1っぽいものも試してみましょう。
@Test public void relation() { sw(() -> { Flux<Integer> flux = Flux.interval(Duration.ofMillis(100L)) .zipWith(Flux.fromStream(IntStream.rangeClosed(1, 50).boxed())) .log("source-stream") .map(Tuple2::getT2) .flatMap( i -> Flux.interval(Duration.ofMillis(100L)) .zipWith(Flux.just(2)) .log("doubling") .map(t -> t.getT2() * i) ); StepVerifier .create(flux) .expectNextCount(49) .expectNext(100) .verifyComplete(); }); }
ログ。
20:06:36.475 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework 20:06:36.544 [main] INFO source-stream - onSubscribe(FluxZip.ZipCoordinator) 20:06:36.548 [main] INFO source-stream - request(256) 20:06:36.656 [parallel-1] INFO source-stream - onNext([0,1]) 20:06:36.660 [parallel-1] INFO doubling - onSubscribe([Fuseable] FluxZip.ZipSingleCoordinator) 20:06:36.660 [parallel-1] INFO doubling - request(32) 20:06:36.754 [parallel-1] INFO source-stream - onNext([1,2]) 20:06:36.755 [parallel-1] INFO doubling - onSubscribe([Fuseable] FluxZip.ZipSingleCoordinator) 20:06:36.755 [parallel-1] INFO doubling - request(32) 20:06:36.762 [parallel-2] INFO doubling - onNext([0,2]) 20:06:36.763 [parallel-2] INFO doubling - onComplete() 20:06:36.763 [parallel-2] INFO source-stream - request(1) 20:06:36.854 [parallel-1] INFO source-stream - onNext([2,3]) 〜省略〜 20:06:41.456 [parallel-1] INFO doubling - onSubscribe([Fuseable] FluxZip.ZipSingleCoordinator) 20:06:41.456 [parallel-1] INFO doubling - request(32) 20:06:41.458 [parallel-1] INFO doubling - onNext([0,2]) 20:06:41.458 [parallel-1] INFO doubling - onComplete() 20:06:41.458 [parallel-1] INFO source-stream - request(1) 20:06:41.557 [parallel-1] INFO source-stream - onNext([49,50]) 20:06:41.559 [parallel-2] INFO doubling - onNext([0,2]) 20:06:41.560 [parallel-2] INFO doubling - onComplete() 20:06:41.560 [parallel-1] INFO doubling - onSubscribe([Fuseable] FluxZip.ZipSingleCoordinator) 20:06:41.560 [parallel-2] INFO source-stream - request(1) 20:06:41.560 [parallel-1] INFO doubling - request(32) 20:06:41.562 [parallel-1] INFO source-stream - onComplete() 20:06:41.670 [parallel-3] INFO doubling - onNext([0,2]) 20:06:41.671 [parallel-3] INFO doubling - onComplete() elapsed time = 5,483 msec
先ほどの結果と、ほぼ同じ実行時間ですね。なるほど。
ところで最初の例だと、ログに出力されていたスレッドはmainっぽくて
20:00:42.256 [main] INFO reactor.Flux.Stream.1 - | onSubscribe([Synchronous Fuseable] FluxIterable.IterableSubscription) 20:00:42.262 [main] INFO reactor.Flux.Stream.1 - | request(unbounded) 20:00:42.263 [main] INFO reactor.Flux.Stream.1 - | onNext(1) 20:00:42.263 [main] INFO reactor.Flux.Stream.1 - | onNext(2) 20:00:42.263 [main] INFO reactor.Flux.Stream.1 - | onNext(3) 20:00:42.263 [main] INFO reactor.Flux.Stream.1 - | onNext(4) 20:00:42.263 [main] INFO reactor.Flux.Stream.1 - | onNext(5)
withIntervalをかませた途端、parallelになったような気がします。
20:05:17.596 [main] INFO reactor.Flux.Zip.1 - onSubscribe(FluxZip.ZipCoordinator) 20:05:17.602 [main] INFO reactor.Flux.Zip.1 - request(unbounded) 20:05:17.711 [parallel-1] INFO reactor.Flux.Zip.1 - onNext([0,1]) 20:05:17.809 [parallel-1] INFO reactor.Flux.Zip.1 - onNext([1,2]) 20:05:17.910 [parallel-1] INFO reactor.Flux.Zip.1 - onNext([2,3]) 20:05:18.010 [parallel-1] INFO reactor.Flux.Zip.1 - onNext([3,4]) 20:05:18.109 [parallel-1] INFO reactor.Flux.Zip.1 - onNext([4,5])
さて、これはどうしたことでしょう。
Scheduler
ここで、実行されるスレッドが変わったのは、Schedulerが関わっているようです。
Schedulerって?
Reactorにおいて、どのように処理が実行されるかはこのSchedulerによって決定されるようです。
Schedulerには、Reactorから4種類のものが提供されています。
Schedulers (Reactor Core 3.1.2.RELEASE)
- immediate … 現在のスレッドを使うScheduler
- single … 単一のスレッドを使うScheduler
- elastic … 必要に応じて、新しいWorkerを生成するScheduler。長時間アイドル状態(デフォルト60秒)になると、破棄される。ブロッキングなIO処理に適している
- parallel … CPUコア数で固定された数のWorkerを使用するScheduler
それぞれ、Schedulersクラスのstaticメソッドとしてデフォルトのものが用意され、Schedulerインターフェースの実装が返ってきます。
Scheduler (Reactor Core 3.1.2.RELEASE)
各Schedulerの実装は、こちら。
https://github.com/reactor/reactor-core/tree/v3.1.2.RELEASE/reactor-core/src/main/java/reactor/core/scheduler
single、elastic、parallelについては、必要に応じてnewXxxx(XxxxにはSchedulerの種類が入る)にてSchedulerを作成することもできますし、
j.u.c.Executorやj.u.c.ExecutorServiceからもSchedulerを作ることが可能です。
これらにて取得・作成したSchedulerを、Publisher(Mono/Flux)のpublishOnやsubscribeOnに設定することで、指定したScheduler上でタスクを動作させる
ことができるようになります。
https://projectreactor.io/docs/core/3.1.2.RELEASE/api/reactor/core/publisher/Flux.html#publishOn-reactor.core.scheduler.Scheduler-
https://projectreactor.io/docs/core/3.1.2.RELEASE/api/reactor/core/publisher/Flux.html#subscribeOn-reactor.core.scheduler.Scheduler-
https://projectreactor.io/docs/core/3.1.2.RELEASE/api/reactor/core/publisher/Mono.html#publishOn-reactor.core.scheduler.Scheduler-
https://projectreactor.io/docs/core/3.1.2.RELEASE/api/reactor/core/publisher/Mono.html#subscribeOn-reactor.core.scheduler.Scheduler-
FluxやMonoの図を見ると、publishOn/subscribeOnのそれぞれで、どういう箇所に影響があるかわかりますね。
使ってみる
それでは、提供されているSchedulerでちょっと動作を切り替えてみましょう。
single。
@Test public void relationOnSingle() { sw(() -> { Flux<Integer> flux = Flux.interval(Duration.ofMillis(100L)) .publishOn(Schedulers.single()) .zipWith(Flux.fromStream(IntStream.rangeClosed(1, 50).boxed())) .log("single-source-stream") .map(Tuple2::getT2) .flatMap( i -> Flux.interval(Duration.ofMillis(100L)) .publishOn(Schedulers.single()) .zipWith(Flux.just(2)) .log("single-doubling") .map(t -> t.getT2() * i) ); StepVerifier .create(flux) .expectNextCount(49) .expectNext(100) .verifyComplete(); }); }
2箇所にFlux#publishOnでSchedulers#singleを指定しています。
ログ。
20:30:44.331 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework 20:30:44.410 [main] INFO single-source-stream - onSubscribe(FluxZip.ZipCoordinator) 20:30:44.414 [main] INFO single-source-stream - request(256) 20:30:44.531 [single-1] INFO single-source-stream - onNext([0,1]) 20:30:44.536 [single-1] INFO single-doubling - onSubscribe([Fuseable] FluxZip.ZipSingleCoordinator) 20:30:44.538 [single-1] INFO single-doubling - request(32) 20:30:44.639 [single-1] INFO single-source-stream - onNext([1,2]) 20:30:44.639 [single-1] INFO single-doubling - onSubscribe([Fuseable] FluxZip.ZipSingleCoordinator) 20:30:44.640 [single-1] INFO single-doubling - request(32) 20:30:44.640 [single-1] INFO single-doubling - onNext([0,2]) 20:30:44.642 [single-1] INFO single-doubling - onComplete() 20:30:44.642 [single-1] INFO single-source-stream - request(1) 20:30:44.728 [single-1] INFO single-source-stream - onNext([2,3]) 〜省略〜 20:30:49.328 [single-1] INFO single-doubling - onSubscribe([Fuseable] FluxZip.ZipSingleCoordinator) 20:30:49.328 [single-1] INFO single-doubling - request(32) 20:30:49.329 [single-1] INFO single-doubling - onNext([0,2]) 20:30:49.330 [single-1] INFO single-doubling - onComplete() 20:30:49.330 [single-1] INFO single-source-stream - request(1) 20:30:49.428 [single-1] INFO single-source-stream - onNext([49,50]) 20:30:49.428 [single-1] INFO single-doubling - onSubscribe([Fuseable] FluxZip.ZipSingleCoordinator) 20:30:49.429 [single-1] INFO single-doubling - request(32) 20:30:49.429 [single-1] INFO single-source-stream - onComplete() 20:30:49.433 [single-1] INFO single-doubling - onNext([0,2]) 20:30:49.433 [single-1] INFO single-doubling - onComplete() 20:30:49.530 [single-1] INFO single-doubling - onNext([0,2]) 20:30:49.530 [single-1] INFO single-doubling - onComplete() elapsed time = 5,485 msec
確かに、singleなSchedulerが使われるようになったようです。
続いて、elastic。
@Test public void relationOnElastic() { sw(() -> { Flux<Integer> flux = Flux.interval(Duration.ofMillis(100L)) .publishOn(Schedulers.elastic()) .zipWith(Flux.fromStream(IntStream.rangeClosed(1, 50).boxed())) .log("elastic-source-stream") .map(Tuple2::getT2) .flatMap( i -> Flux.interval(Duration.ofMillis(100L)) .publishOn(Schedulers.elastic()) .zipWith(Flux.just(2)) .log("elastic-doubling") .map(t -> t.getT2() * i) ); StepVerifier .create(flux) .expectNextCount(49) .expectNext(100) .verifyComplete(); }); }
ログ。
20:32:06.197 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework 20:32:06.260 [main] INFO elastic-source-stream - onSubscribe(FluxZip.ZipCoordinator) 20:32:06.264 [main] INFO elastic-source-stream - request(256) 20:32:06.375 [elastic-2] INFO elastic-source-stream - onNext([0,1]) 20:32:06.378 [elastic-2] INFO elastic-doubling - onSubscribe([Fuseable] FluxZip.ZipSingleCoordinator) 20:32:06.379 [elastic-2] INFO elastic-doubling - request(32) 20:32:06.474 [elastic-2] INFO elastic-source-stream - onNext([1,2]) 20:32:06.474 [elastic-2] INFO elastic-doubling - onSubscribe([Fuseable] FluxZip.ZipSingleCoordinator) 20:32:06.475 [elastic-2] INFO elastic-doubling - request(32) 20:32:06.484 [elastic-3] INFO elastic-doubling - onNext([0,2]) 20:32:06.485 [elastic-3] INFO elastic-doubling - onComplete() 20:32:06.485 [elastic-3] INFO elastic-source-stream - request(1) 20:32:06.575 [elastic-2] INFO elastic-source-stream - onNext([2,3]) 〜省略〜 20:32:11.175 [elastic-2] INFO elastic-doubling - onSubscribe([Fuseable] FluxZip.ZipSingleCoordinator) 20:32:11.175 [elastic-2] INFO elastic-doubling - request(32) 20:32:11.177 [elastic-8] INFO elastic-doubling - onNext([0,2]) 20:32:11.177 [elastic-8] INFO elastic-doubling - onComplete() 20:32:11.177 [elastic-8] INFO elastic-source-stream - request(1) 20:32:11.275 [elastic-2] INFO elastic-source-stream - onNext([49,50]) 20:32:11.276 [elastic-2] INFO elastic-doubling - onSubscribe([Fuseable] FluxZip.ZipSingleCoordinator) 20:32:11.276 [elastic-2] INFO elastic-doubling - request(32) 20:32:11.277 [elastic-4] INFO elastic-doubling - onNext([0,2]) 20:32:11.277 [elastic-4] INFO elastic-doubling - onComplete() 20:32:11.281 [elastic-4] INFO elastic-source-stream - request(1) 20:32:11.281 [elastic-2] INFO elastic-source-stream - onComplete() 20:32:11.378 [elastic-7] INFO elastic-doubling - onNext([0,2]) 20:32:11.378 [elastic-7] INFO elastic-doubling - onComplete() elapsed time = 5,455 msec
こちらも、elasticなSchedulerが使われるようになったようです。
デフォルトで用意されているSchedulerについて
Schedulersに定義されている各種staticメソッドで、定義済みのSchedulerを取得することができますが、これらの定義はどのようになっているのでしょう?
例えば、elastic。ソースコードを見ると、こんな感じの定義になっています。
https://github.com/reactor/reactor-core/blob/v3.1.2.RELEASE/reactor-core/src/main/java/reactor/core/scheduler/Schedulers.java#L122-L124
public static Scheduler elastic() { return cache(CACHED_ELASTIC, ELASTIC, ELASTIC_SUPPLIER); }
parallelとsingleもほぼ同様です。
immeidateだけ、ちょっと違いますね。
https://github.com/reactor/reactor-core/blob/v3.1.2.RELEASE/reactor-core/src/main/java/reactor/core/scheduler/Schedulers.java#L131-L133
public static Scheduler immediate() { return ImmediateScheduler.instance(); }
elastic、parallel、singleの定義済みのSchedulerは、呼び出し元に対して、同じSchedulerのインスタンスを返却するように実装されています。
// Internals static final String ELASTIC = "elastic"; // IO stuff static final String PARALLEL = "parallel"; //scale up common tasks static final String SINGLE = "single"; //non blocking tasks static final String TIMER = "timer"; //timed tasks // Cached schedulers in atomic references: static AtomicReference<CachedScheduler> CACHED_ELASTIC = new AtomicReference<>(); static AtomicReference<CachedScheduler> CACHED_PARALLEL = new AtomicReference<>(); static AtomicReference<CachedScheduler> CACHED_SINGLE = new AtomicReference<>(); static final Supplier<Scheduler> ELASTIC_SUPPLIER = () -> newElastic(ELASTIC, ElasticScheduler.DEFAULT_TTL_SECONDS, true); static final Supplier<Scheduler> PARALLEL_SUPPLIER = () -> newParallel(PARALLEL, Runtime.getRuntime() .availableProcessors(), true); static final Supplier<Scheduler> SINGLE_SUPPLIER = () -> newSingle(SINGLE, true);
Supplierの定義を見ると、だいたいわかりそうな感じですね。いずれのSchedulerも、Schedulersが提供する各種newXxxxメソッドにて実装されており、
最初に作成した結果をAtomicReferenceで保持するという形になっています。
最初の引数は、スレッド名のprefixです。
elasticの場合は第2引数がデフォルトのTTL(60秒)、parallelの場合は第2引数が生成するWorkerの数ですが、ここでは利用可能なCPUコア数、ということに
なりますね。
いずれのSchedulerも、最後の引数がtrueになっているのはdaemonスレッドとして定義するためです。
Schedulersの各種newXxxxメソッドで定義したSchedulerについて、newXxxxメソッドにはいくつかオーバーロードされたバージョンがありますが、
引数が少ないもの(daemonのON/OFFが指定できないもの)については、非daemonとして定義されます。
自分でSchedulerを定義した場合は、ちゃんと管理に注意しましょう、ってことになるでしょうね。
Schedulerを停止する場合は、Scheduler#disposeを呼び出します。
https://projectreactor.io/docs/core/3.1.2.RELEASE/api/reactor/core/scheduler/Scheduler.html#dispose--
まあ、あまり新しいSchedulerのインスタンスを作らない方向の方が…という感じですね。
publishOnやsubscribeOnで指定したSchedulerは、どう使われるか?
Fluxで見てみましょう。
publishOnの場合は、FluxPublishOn内でSchedulerからWorkerが生成され、タスクがWorker#scheduleで登録されることになります。
https://github.com/reactor/reactor-core/blob/v3.1.2.RELEASE/reactor-core/src/main/java/reactor/core/publisher/FluxPublishOn.java#L82-L83
https://github.com/reactor/reactor-core/blob/v3.1.2.RELEASE/reactor-core/src/main/java/reactor/core/publisher/FluxPublishOn.java#L282
FluxPublishOnは内部にQueueを持っており、このQueueから取得した値を使って処理を実行していくようですね。
https://github.com/reactor/reactor-core/blob/v3.1.2.RELEASE/reactor-core/src/main/java/reactor/core/publisher/FluxPublishOn.java#L307
ちゃんと確認してはいませんが、FluxPublishOn内にこのQueueに対してofferしている箇所もあるようです。
subscribeOnの場合は、subscribe時にFluxSubscribeOn内で使われるようです。
https://github.com/reactor/reactor-core/blob/v3.1.2.RELEASE/reactor-core/src/main/java/reactor/core/publisher/FluxSubscribeOn.java#L129-L147
が、requestOnSeparateThreadがfalse、またはSubscriberを動作させたスレッドとカレントスレッドが同じ場合は、そのまま同じスレッドで
動作するようです。
なお、requestOnSeparateThreadはsubscribeOn時に明示的にfalseにしない限りは、デフォルトでtrueになります。
同じようなクラスが、Monoにもあったりします。
https://github.com/reactor/reactor-core/blob/v3.1.2.RELEASE/reactor-core/src/main/java/reactor/core/publisher/MonoPublishOn.java
https://github.com/reactor/reactor-core/blob/v3.1.2.RELEASE/reactor-core/src/main/java/reactor/core/publisher/MonoSubscribeOn.java
こんな感じで、だいたい使われ方のイメージはついたかも?
Flux#interbal
ところで、Flux#intervalを使うと、突然parallelなSchedulerが使われましたね?あれはどうしてなんでしょう?
これは、Flux#intervalのコードを見るとなんとなくわかります。
https://github.com/reactor/reactor-core/blob/v3.1.2.RELEASE/reactor-core/src/main/java/reactor/core/publisher/Flux.java#L976-L978
public static Flux<Long> interval(Duration period) { return interval(period, Schedulers.parallel()); }
parallelなScheduler、思い切り使ってますね…。
Schedulers#parallelについては、意外とReactor内部でも使っているようです。
Flux#intervalを使った場合は、Schedulerの利用方法も変わってWorker#schedulePeriodicallyが使われるようになります。
https://github.com/reactor/reactor-core/blob/v3.1.2.RELEASE/reactor-core/src/main/java/reactor/core/publisher/FluxInterval.java#L68
try {
w.schedulePeriodically(r, initialDelay, period, unit);
}
mainスレッド?
そういえば、最初に戻ると1番単純な例は、mainスレッド(呼び出し元のスレッド)で動いていましたね。
これは?とか思うわけですが、最初の例だとSchedulerが絡むような(Reactor内部も含めて)コードが登場しないからでしょうね。で、特にSchedulerが
使われることなく突っ走る、と。
コールスタックを見たりしていると、FluxFlatMap$FlatMapMainとかで動いているようです。
https://github.com/reactor/reactor-core/blob/v3.1.2.RELEASE/reactor-core/src/main/java/reactor/core/publisher/FluxFlatMap.java#L182
まとめ
ちょっと気になってざっとReactorのSchedulerまわりを見ていましたが、なんとなく雰囲気は見えたような気がします。
少しは理解の足しになったのかな…。