CLOVER🍀

That was when it all began.

Reactor CoreのFluxSink、MonoSink、SynchronousSinkを試す

Reactor Coreを試してみようシリーズ、続いてはFluxSink、MonoSink、SynchronousSinkへ。

Hot Publishing : SynchronousSink, FluxSink, MonoSink

こちらを利用すると、SubscriberもしくはProcessorに対して、コンテキスト外からデータを供給することができるみたいです(非Concurrent)。
Processorはまだ知らないんですけど

GitHub REAME.mdのサンプルでは、GUIのListenerっぽいサンプルが載せられています。

このサンプルをマネしつつ、ちょっと試していってみましょう。

準備

Maven依存関係。

        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-core</artifactId>
            <version>3.0.0.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>

Reactor Coreと、動作させるためのJUnitにとどめておきます。

また、コードの雛形は、以下のように定義しておきます。
src/test/java/org/littlewings/reactor/sink/HotPublishingSinkTest.java

package org.littlewings.reactor.sink;

import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.concurrent.TimeUnit;

import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.core.publisher.SynchronousSink;

public class HotPublishingSinkTest {
    // ここにコードを書く!
}

FluxSink

では、まずはFluxSinkから。FluxSinkとFlux#createを使って、Fluxを生成することができます。

README.mdのサンプルがListenerっぽいサンプルになっていることから、こちらもそれに習ってみることにします。

ですが、先に載せるのはFluxSinkを使う、Flux#createを呼び出している側から。

    @Test
    public void fluxSinkTest() throws InterruptedException {
        Flux.<String>create(emitter -> {
            MyFluxSinkListener listener = new MyFluxSinkListener(emitter);

            listener.publish("Hello");
            listener.publish("Reactor");
            listener.publish("FluxSink");
        }, FluxSink.OverflowStrategy.BUFFER)
                .timeout(Duration.of(5L, ChronoUnit.SECONDS))
                .subscribe(System.out::println);

        TimeUnit.SECONDS.sleep(2L);
    }

Flux#createのシグネチャは、以下の2通りです。

public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter)

public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter,
                                 FluxSink.OverflowStrategy backpressure)

今回は、FluxSink.OverflowStrategyを明示する方を選びました。指定しない時はBUFFERを指定したものとして動作するので、この例ではあまり意味がないですが、backpressureのハンドリングを指定するものだとか。

BUFFER以外には、DROP、ERROR、IGNORE、LATESTを選択することができます。BUFFERだと、メモリに溜め込むことを意味します、と。

今回のサンプルではListenerを模したものを書いていますが、Lambda式の中でイベントを発生させた体でコードを書いています。

で、Listener側のコードはこちら。

    public static class MyFluxSinkListener {
        FluxSink<String> emitter;

        public MyFluxSinkListener(FluxSink<String> emitter) {
            this.emitter = emitter;
            this.emitter.setCancellation(() -> System.out.println("listener stop"));
        }

        public void publish(String text) {
            emitter.next(text);
        }

        public void error() {
            emitter.error(new RuntimeException("Oops!!"));
        }

        public void complete() {
            emitter.complete();
        }
    }

FluxSink#nextに値を渡すと、それがFluxに流れます。errorでは例外を渡してエラーを、completeで完了を意味します。キャンセルされた時になにか処理を実行できるように、FluxSink#setCancellationでキャンセル時の動作を指定できます。今回はコンソール出力のみですが、README.mdのサンプルではListenerの登録解除みたいなコードが掲載されています。

実行結果は、こちら。

Hello
Reactor
FluxSink

3回FluxSink#nextでデータを送っているので、それが結果に現れました、と。

続いて、エラーを起こしてみましょう。

    @Test
    public void fluxSinkTest2() throws InterruptedException {
        Flux.<String>create(emitter -> {
            MyFluxSinkListener listener = new MyFluxSinkListener(emitter);

            listener.publish("Hello");
            listener.publish("Reactor");
            listener.publish("FluxSink");

            listener.error();
        }, FluxSink.OverflowStrategy.BUFFER)
                .timeout(Duration.of(5L, ChronoUnit.SECONDS))
                .onErrorResumeWith(throwable -> Flux.create(emitter -> {
                    MyFluxSinkListener listener = new MyFluxSinkListener(emitter);
                    listener.publish("Resume");

                    listener.complete();
                }))
                .subscribe(System.out::println);

        TimeUnit.SECONDS.sleep(2L);
    }

今回は、最初のFlux#create内でFluxSink#errorを呼び出すようにしました。FluxSink#errorを呼び出し後は、FluxSink#nextに値を渡してもすでにキャンセルされた旨のエラーがスローされます。

で、エラーが起こった際にどう復帰させるかをFlux#onErrorResumeWithで指定しています。ここで再度Flux(というかPublisher)を返してあげることで、処理を継続できます。

このコードの実行結果は、こちらです。

Hello
Reactor
FluxSink
listener stop
Resume
listener stop

FluxSink#setCancellationに仕込んだログが出力されています。FluxSink#errorとFluxSink#completeの呼び出しで呼ばれたようですね。

MonoSink

続いて、MonoSink。こちらは、MonoSinkとMono#createでMonoを生成することになります。

こちらもListenerっぽいものを使って書きますが、こちらは先にListener側を載せておきます。

    public static class MyMonoSinkListener {
        MonoSink<String> emitter;

        public MyMonoSinkListener(MonoSink<String> emitter) {
            this.emitter = emitter;
            this.emitter.setCancellation(() -> System.out.println("listener stop"));
        }

        public void publish(String text) {
            emitter.success(text);
        }

        public void error() {
            emitter.error(new RuntimeException("Oops!!"));
        }
    }

FluxSinkの時と異なり、completeがありません。Monoだからでしょう。また、FluxSink#nextではなく代わりにsuccessがあります。

FluxSinkを試していれば、そう使い方の感覚は変わらないのでそのままさらっと書いてみます。

    @Test
    public void monoSinkTest() throws InterruptedException {
        Mono.<String>create(emitter -> {
            MyMonoSinkListener listener = new MyMonoSinkListener(emitter);
            listener.publish("Hello");
        })
                .repeat(5)
                .subscribe(System.out::println);

        TimeUnit.SECONDS.sleep(2L);
    }

MonoSink#successは、1度だけ呼び出すものです。それで終わってしまうと面白くないので、今回はrepeatで5回呼び出しました。

つまり、結果はこんな感じ。

Hello
Hello
Hello
Hello
Hello

続いて、MonoSink#errorを呼び出すコード例。

    @Test(expected = RuntimeException.class)
    public void monoSinkTest2() throws InterruptedException {
        Mono.<String>create(emitter -> {
            MyMonoSinkListener listener = new MyMonoSinkListener(emitter);
            listener.error();
        })
                .doOnError(thrown -> System.out.println("error = " + thrown.getMessage()))
                .subscribe(System.out::println);

        TimeUnit.SECONDS.sleep(2L);
    }

Monoにresumeはないので、こんな感じに。Mono#subscribe(Consumer)を呼んだままだと、例外がスローされるのでとりあえずこうなりました。

Mono#doOnErrorの部分はちゃんと実行されます。

error = Oops!!

SynchronousSink

最後は、SynchronousSink。こちらは、Flux#generateと組み合わせてFluxを生成します。

だんだんListenerを書くのが面倒になったので、いきなりnextを呼び出してみましょう。

    @Test
    public void synchronousSinkTest() {
        Flux.generate((state, emitter) -> {
            emitter.next("Hello");
            return null;
        })
                .take(5)
                .subscribe(System.out::println);
    }

stateはあとで説明します。このコードの実行結果は、こちらです。

Hello
Hello
Hello
Hello
Hello

5回の表示で止まっていますが、これはFlux#takeで5回に絞っているからです。もしもこのコードからtakeを削除すると、無限にコンソール出力し続けます。

無限ストリームみたいですねぇ。

で、stateですが、これはFlux#generate内での呼び出しごとに変化する状態を持つことができます。たとえば、カウンタ的なものを考えてみます。

nullだったら初期値を1として、そこからカウントアップしていきます。

    @Test
    public void synchronousSinkTest2() {
        Flux.<String, Integer>generate((state, emitter) -> {
            if (state == null) {
                state = 1;
            }

            emitter.next("Key" + state);
            return state + 1;
        })
                .take(5)
                .subscribe(System.out::println);
    }

結果はこちら。

Key1
Key2
Key3
Key4
Key5

それっぽく動いていますね。ただ、これだと初期値の指定が面倒です。

そういう場合は、Callableでstateを提供するようにします。

    @Test
    public void synchronousSinkTest3() {
        Flux.<String, Integer>generate(() -> 1, (state, emitter) -> {
            emitter.next("Key" + state);
            return state + 1;
        })
                .take(5)
                .subscribe(System.out::println);
    }

結果は、先ほどと同じです。

で、最後にやっぱりListenerっぽいものを持ち出してみます。

    public static class MySynchronousSinkListener {
        SynchronousSink<String> emitter;

        public MySynchronousSinkListener(SynchronousSink<String> emitter) {
            this.emitter = emitter;
            // setCancellationはない
        }

        public void publish(String text) {
            emitter.next(text);
        }

        public void error() {
            emitter.error(new RuntimeException("Oops!!"));
        }

        public void complete() {
            emitter.complete();
        }
    }

SynchronousSinkの場合は、next、error、completeがあり、setCancellationはありません。

stateを整数にし、5回呼び出されたら停止するようなコードを書いてみましょう。

    @Test
    public void synchronousSinkTest5() {
        Flux.<String, Integer>generate(() -> 1, (state, emitter) -> {
            MySynchronousSinkListener listener = new MySynchronousSinkListener(emitter);

            if (state <= 5) {
                listener.publish("Key" + state);
            } else {
                listener.complete();
            }
            return state + 1;
        })
                .subscribe(System.out::println);
    }

先ほどまで書いていた、takeはなくなっています。

これでも、先ほどまでと同じ結果になります。

Key1
Key2
Key3
Key4
Key5

まとめ

FluxSink、MonoSink、SynchronousSynkを簡単に試してみました。これらを使うことで、配列やコレクションのような固定のデータではなく、FluxやMonoを継続して生成する方法がひとつわかった感じですね。

なかなか面白いなーと思います。