Reactor Coreを試してみようシリーズ、続いてはFluxSink、MonoSink、SynchronousSinkへ。
Hot Publishing : SynchronousSink, FluxSink, MonoSink
こちらを利用すると、SubscriberもしくはProcessorに対して、コンテキスト外からデータを供給することができるみたいです(非Concurrent)。
※Processorはまだ知らないんですけど
GitHub REAME.mdのサンプルでは、GUIのListenerっぽいサンプルが載せられています。
このサンプルをマネしつつ、ちょっと試していってみましょう。
準備
Maven依存関係。
<dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-core</artifactId> <version>3.0.0.RELEASE</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency>
Reactor Coreと、動作させるためのJUnitにとどめておきます。
また、コードの雛形は、以下のように定義しておきます。
src/test/java/org/littlewings/reactor/sink/HotPublishingSinkTest.java
package org.littlewings.reactor.sink; import java.time.Duration; import java.time.temporal.ChronoUnit; import java.util.concurrent.TimeUnit; import org.junit.Test; import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink; import reactor.core.publisher.Mono; import reactor.core.publisher.MonoSink; import reactor.core.publisher.SynchronousSink; public class HotPublishingSinkTest { // ここにコードを書く! }
FluxSink
では、まずはFluxSinkから。FluxSinkとFlux#createを使って、Fluxを生成することができます。
README.mdのサンプルがListenerっぽいサンプルになっていることから、こちらもそれに習ってみることにします。
ですが、先に載せるのはFluxSinkを使う、Flux#createを呼び出している側から。
@Test public void fluxSinkTest() throws InterruptedException { Flux.<String>create(emitter -> { MyFluxSinkListener listener = new MyFluxSinkListener(emitter); listener.publish("Hello"); listener.publish("Reactor"); listener.publish("FluxSink"); }, FluxSink.OverflowStrategy.BUFFER) .timeout(Duration.of(5L, ChronoUnit.SECONDS)) .subscribe(System.out::println); TimeUnit.SECONDS.sleep(2L); }
Flux#createのシグネチャは、以下の2通りです。
public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter) public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter, FluxSink.OverflowStrategy backpressure)
今回は、FluxSink.OverflowStrategyを明示する方を選びました。指定しない時はBUFFERを指定したものとして動作するので、この例ではあまり意味がないですが、backpressureのハンドリングを指定するものだとか。
BUFFER以外には、DROP、ERROR、IGNORE、LATESTを選択することができます。BUFFERだと、メモリに溜め込むことを意味します、と。
今回のサンプルではListenerを模したものを書いていますが、Lambda式の中でイベントを発生させた体でコードを書いています。
で、Listener側のコードはこちら。
public static class MyFluxSinkListener { FluxSink<String> emitter; public MyFluxSinkListener(FluxSink<String> emitter) { this.emitter = emitter; this.emitter.setCancellation(() -> System.out.println("listener stop")); } public void publish(String text) { emitter.next(text); } public void error() { emitter.error(new RuntimeException("Oops!!")); } public void complete() { emitter.complete(); } }
FluxSink#nextに値を渡すと、それがFluxに流れます。errorでは例外を渡してエラーを、completeで完了を意味します。キャンセルされた時になにか処理を実行できるように、FluxSink#setCancellationでキャンセル時の動作を指定できます。今回はコンソール出力のみですが、README.mdのサンプルではListenerの登録解除みたいなコードが掲載されています。
実行結果は、こちら。
Hello Reactor FluxSink
3回FluxSink#nextでデータを送っているので、それが結果に現れました、と。
続いて、エラーを起こしてみましょう。
@Test public void fluxSinkTest2() throws InterruptedException { Flux.<String>create(emitter -> { MyFluxSinkListener listener = new MyFluxSinkListener(emitter); listener.publish("Hello"); listener.publish("Reactor"); listener.publish("FluxSink"); listener.error(); }, FluxSink.OverflowStrategy.BUFFER) .timeout(Duration.of(5L, ChronoUnit.SECONDS)) .onErrorResumeWith(throwable -> Flux.create(emitter -> { MyFluxSinkListener listener = new MyFluxSinkListener(emitter); listener.publish("Resume"); listener.complete(); })) .subscribe(System.out::println); TimeUnit.SECONDS.sleep(2L); }
今回は、最初のFlux#create内でFluxSink#errorを呼び出すようにしました。FluxSink#errorを呼び出し後は、FluxSink#nextに値を渡してもすでにキャンセルされた旨のエラーがスローされます。
で、エラーが起こった際にどう復帰させるかをFlux#onErrorResumeWithで指定しています。ここで再度Flux(というかPublisher)を返してあげることで、処理を継続できます。
このコードの実行結果は、こちらです。
Hello Reactor FluxSink listener stop Resume listener stop
FluxSink#setCancellationに仕込んだログが出力されています。FluxSink#errorとFluxSink#completeの呼び出しで呼ばれたようですね。
MonoSink
続いて、MonoSink。こちらは、MonoSinkとMono#createでMonoを生成することになります。
こちらもListenerっぽいものを使って書きますが、こちらは先にListener側を載せておきます。
public static class MyMonoSinkListener { MonoSink<String> emitter; public MyMonoSinkListener(MonoSink<String> emitter) { this.emitter = emitter; this.emitter.setCancellation(() -> System.out.println("listener stop")); } public void publish(String text) { emitter.success(text); } public void error() { emitter.error(new RuntimeException("Oops!!")); } }
FluxSinkの時と異なり、completeがありません。Monoだからでしょう。また、FluxSink#nextではなく代わりにsuccessがあります。
FluxSinkを試していれば、そう使い方の感覚は変わらないのでそのままさらっと書いてみます。
@Test public void monoSinkTest() throws InterruptedException { Mono.<String>create(emitter -> { MyMonoSinkListener listener = new MyMonoSinkListener(emitter); listener.publish("Hello"); }) .repeat(5) .subscribe(System.out::println); TimeUnit.SECONDS.sleep(2L); }
MonoSink#successは、1度だけ呼び出すものです。それで終わってしまうと面白くないので、今回はrepeatで5回呼び出しました。
つまり、結果はこんな感じ。
Hello Hello Hello Hello Hello
続いて、MonoSink#errorを呼び出すコード例。
@Test(expected = RuntimeException.class) public void monoSinkTest2() throws InterruptedException { Mono.<String>create(emitter -> { MyMonoSinkListener listener = new MyMonoSinkListener(emitter); listener.error(); }) .doOnError(thrown -> System.out.println("error = " + thrown.getMessage())) .subscribe(System.out::println); TimeUnit.SECONDS.sleep(2L); }
Monoにresumeはないので、こんな感じに。Mono#subscribe(Consumer)を呼んだままだと、例外がスローされるのでとりあえずこうなりました。
Mono#doOnErrorの部分はちゃんと実行されます。
error = Oops!!
SynchronousSink
最後は、SynchronousSink。こちらは、Flux#generateと組み合わせてFluxを生成します。
だんだんListenerを書くのが面倒になったので、いきなりnextを呼び出してみましょう。
@Test public void synchronousSinkTest() { Flux.generate((state, emitter) -> { emitter.next("Hello"); return null; }) .take(5) .subscribe(System.out::println); }
stateはあとで説明します。このコードの実行結果は、こちらです。
Hello Hello Hello Hello Hello
5回の表示で止まっていますが、これはFlux#takeで5回に絞っているからです。もしもこのコードからtakeを削除すると、無限にコンソール出力し続けます。
無限ストリームみたいですねぇ。
で、stateですが、これはFlux#generate内での呼び出しごとに変化する状態を持つことができます。たとえば、カウンタ的なものを考えてみます。
nullだったら初期値を1として、そこからカウントアップしていきます。
@Test public void synchronousSinkTest2() { Flux.<String, Integer>generate((state, emitter) -> { if (state == null) { state = 1; } emitter.next("Key" + state); return state + 1; }) .take(5) .subscribe(System.out::println); }
結果はこちら。
Key1 Key2 Key3 Key4 Key5
それっぽく動いていますね。ただ、これだと初期値の指定が面倒です。
そういう場合は、Callableでstateを提供するようにします。
@Test public void synchronousSinkTest3() { Flux.<String, Integer>generate(() -> 1, (state, emitter) -> { emitter.next("Key" + state); return state + 1; }) .take(5) .subscribe(System.out::println); }
結果は、先ほどと同じです。
で、最後にやっぱりListenerっぽいものを持ち出してみます。
public static class MySynchronousSinkListener { SynchronousSink<String> emitter; public MySynchronousSinkListener(SynchronousSink<String> emitter) { this.emitter = emitter; // setCancellationはない } public void publish(String text) { emitter.next(text); } public void error() { emitter.error(new RuntimeException("Oops!!")); } public void complete() { emitter.complete(); } }
SynchronousSinkの場合は、next、error、completeがあり、setCancellationはありません。
stateを整数にし、5回呼び出されたら停止するようなコードを書いてみましょう。
@Test public void synchronousSinkTest5() { Flux.<String, Integer>generate(() -> 1, (state, emitter) -> { MySynchronousSinkListener listener = new MySynchronousSinkListener(emitter); if (state <= 5) { listener.publish("Key" + state); } else { listener.complete(); } return state + 1; }) .subscribe(System.out::println); }
先ほどまで書いていた、takeはなくなっています。
これでも、先ほどまでと同じ結果になります。
Key1 Key2 Key3 Key4 Key5
まとめ
FluxSink、MonoSink、SynchronousSynkを簡単に試してみました。これらを使うことで、配列やコレクションのような固定のデータではなく、FluxやMonoを継続して生成する方法がひとつわかった感じですね。
なかなか面白いなーと思います。