CLOVER🍀

That was when it all began.

Reactor CoreのProcessors(Hot Publishing)を試す

Reactor CoreのREADMEに載っている最後のテーマ、Hot Publishing:Processorsを試してみます。

Hot Publishing : Processors

いくつかのProcessorがあるようですが、そこからSinkを取得してSubscribeしたり、Processorに直接Subscriberを登録してSubscribeするといった使い方になるようです。

とりあえず、順次使っていってみましょう。

準備

Maven依存関係は、こんな感じ。

        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-core</artifactId>
            <version>3.0.2.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.assertj</groupId>
            <artifactId>assertj-core</artifactId>
            <version>3.5.2</version>
            <scope>test</scope>
        </dependency>

9月になってから、Reactor Coreが3.0.1.RELEASE、3.0.2.RELEASEとリリースされています。

また、テストコードにはTestSubscriberを使うのがよいと@makingさんに教えていただいたので

こちらを使ってテストコードを書いてみることにします。

とはいえ、このツイートの通りテストコードそのものはMaven Centralにアップロードされていないので、今は自分で手元に置くことにしておきましょう。

$ wget https://raw.githubusercontent.com/reactor/reactor-core/v3.0.2.RELEASE/src/test/java/reactor/test/TestSubscriber.java -O src/test/java/reactor/test/TestSubscriber.java

こちらを使って、できる範囲でテストコードを書いていくことにします。

また、テストコード全体の雛形は、こんな感じ。
src/test/java/org/littlewings/reactor/processors/HotPublishingProcessorsTest.java

package org.littlewings.reactor.processors;

import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

import org.junit.Test;
import reactor.core.publisher.BlockingSink;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.ReplayProcessor;
import reactor.core.publisher.TopicProcessor;
import reactor.core.publisher.WorkQueueProcessor;
import reactor.test.TestSubscriber;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

public class HotPublishingProcessorsTest {
    // ここに、テストを書く!
}

では、見ていってみます。

EmitterProcessor

N個のSubscriberを紐付けることができる、Processor

Pub-Sub : EmitterProcessor

READMEに非同期〜みたいなことが書いてありますが、Javadocを見ると内部的にループして同期的っぽく処理する方法を取ってそうな感じです。
http://projectreactor.io/core/docs/api/?reactor/core/publisher/EmitterProcessor.html

では、コードを書いてみます。

    @Test
    public void pubSub_emitterProcesssr() {
        TestSubscriber<Integer> subscriber = TestSubscriber.create();

        EmitterProcessor<Integer> emitter = EmitterProcessor.create();
        BlockingSink<Integer> sink = emitter.connectSink();

        emitter.subscribe(subscriber);

        sink.next(1);
        sink.next(2);

        subscriber.assertValues(1, 2);

        sink.next(3);
        subscriber.assertValues(1, 2, 3);

        sink.complete();
        subscriber.assertComplete();
    }

まずはEmitterProcessor#createでEmitterProcessorを取得し、そこからBlockingSinkを取得します(READMEとちょっと違う)。

        EmitterProcessor<Integer> emitter = EmitterProcessor.create();
        BlockingSink<Integer> sink = emitter.connectSink();

このSinkにSubscriberを紐付けます。ここでのSubscriberはTestSubscriberです。

        emitter.subscribe(subscriber);

あとはBlockingSink#nextでデータを送ります。

        sink.next(1);
        sink.next(2);

        subscriber.assertValues(1, 2);

        sink.next(3);
        subscriber.assertValues(1, 2, 3);

Subscriber側では、受信ができます、と。

BlockingSink#completeで終了。

        sink.complete();
        subscriber.assertComplete();

続いて、複数のSubscriberを紐付けてみましょう。

    @Test
    public void pubSub_emitterProcessorAndSubscribers() {
        TestSubscriber<Integer> subscriber1 = TestSubscriber.create();
        TestSubscriber<Integer> subscriber2 = TestSubscriber.create();

        EmitterProcessor<Integer> emitter = EmitterProcessor.create();
        BlockingSink<Integer> sink = emitter.connectSink();
        emitter.subscribe(subscriber1);
        emitter.subscribe(subscriber2);
        sink.next(1);
        sink.next(2);

        subscriber1.assertValues(1, 2);
        subscriber2.assertValues(1, 2);

        sink.next(3);
        subscriber1.assertValues(1, 2, 3);
        subscriber2.assertValues(1, 2, 3);

        sink.complete();
        subscriber1.assertComplete();
        subscriber2.assertComplete();
    }

普通にどのSubscriberもデータを受信できますし、BlokingSink#completeするとどのSubscriberも終了します、と。

また、Subscriberを紐付けていない状態でBlockingSink#nextすると、例外となります。

    @Test
    public void pubSub_emitterProcesssrOverflow() {
        TestSubscriber<Integer> subscriber = TestSubscriber.create();

        EmitterProcessor<Integer> emitter = EmitterProcessor.create();
        BlockingSink<Integer> sink = emitter.connectSink();

        assertThatThrownBy(() -> sink.next(1))
                .isInstanceOf(IllegalStateException.class)
                .hasMessage("The receiver is overrun by more signals than expected (bounded queue...)");
    }

なるほど。

ReplayProcessor

続いて、ReplayProcessor。このProcessorもN個のSubscriberを紐付けることができますが、このProcessorは発生したイベントを覚えておくことができます。

Pub-Sub Replay : ReplayProcessor

覚えておくサイズや方法は、ある程度設定することができます。

まずは例。

    @Test
    public void pubSub_replayProcessor() {
        TestSubscriber<Integer> subscriber1 = TestSubscriber.create();

        ReplayProcessor<Integer> replayer = ReplayProcessor.create();

        replayer.subscribe(subscriber1);

        replayer.onNext(1);
        replayer.onNext(2);

        subscriber1.assertValues(1, 2);

        TestSubscriber<Integer> subscriber2 = TestSubscriber.create();
        replayer.subscribe(subscriber2);

        subscriber2.assertValues(1, 2);

        replayer.onNext(3);

        subscriber1.assertValues(1, 2, 3);
        subscriber2.assertValues(1, 2, 3);

        replayer.onComplete();
        subscriber1.assertComplete();
        subscriber2.assertComplete();
    }

ここでも使うSubscriberは、TestSubscriberとします。

最初にReplayProcessor#createでRelayProcessorを作成し、RelayProcessorに直接Subscriberを紐付けます。RelayProcessor#onNextでデータを送ると、Subscriberが受信します。ここまでは、普通です。

        ReplayProcessor<Integer> replayer = ReplayProcessor.create();

        replayer.subscribe(subscriber1);

        replayer.onNext(1);
        replayer.onNext(2);

        subscriber1.assertValues(1, 2);

この後にSubscriberを追加すると、この時点までに送られた内容も受信します。

        TestSubscriber<Integer> subscriber2 = TestSubscriber.create();
        replayer.subscribe(subscriber2);

        subscriber2.assertValues(1, 2);

結果、途中で追加されたSubscriberも、他のSubscriberに追いついたような動きになります。

        replayer.onNext(3);

        subscriber1.assertValues(1, 2, 3);
        subscriber2.assertValues(1, 2, 3);

        replayer.onComplete();
        subscriber1.assertComplete();
        subscriber2.assertComplete();

終了は、ReplayProcessor#onCompleteで。

で、どのくらい過去の履歴を保持しているかですが、デフォルトでは256です。でも、それ以上の数を与えても過去のデータを持っていたりします。

    @Test
    public void pubSub_replayProcessorOverflow() {
        TestSubscriber<Integer> subscriber1 = TestSubscriber.create();

        ReplayProcessor<Integer> replayer = ReplayProcessor.create();

        replayer.subscribe(subscriber1);

        IntStream.rangeClosed(1, 300).forEach(replayer::onNext);

        subscriber1.assertValueCount(300L);

        TestSubscriber<Integer> subscriber2 = TestSubscriber.create();
        replayer.subscribe(subscriber2);

        subscriber2.assertValueCount(300L);

        replayer.onComplete();
        subscriber1.assertComplete();
        subscriber2.assertComplete();
    }

これは、ReplayProcessor#createの引数で設定することができますが、historySizeが256で、限界を越えた時もメモリに溜め込むような設定(unbounded)になってるからです。

ここで、historySizeの明示とunbounded=falseにしてReplayProcessorを作成してみます。

    @Test
    public void pubSub_replayProcessorOverflow2() {
        TestSubscriber<Integer> subscriber1 = TestSubscriber.create();

        ReplayProcessor<Integer> replayer = ReplayProcessor.create(256, false);

        replayer.subscribe(subscriber1);

        IntStream.rangeClosed(1, 300).forEach(replayer::onNext);

        subscriber1.assertValueCount(300L);

        TestSubscriber<Integer> subscriber2 = TestSubscriber.create();
        replayer.subscribe(subscriber2);

        subscriber2.assertValueCount(256L);

        replayer.onComplete();
        subscriber1.assertComplete();
        subscriber2.assertComplete();
    }

追加したSubscriber側では、256個までしかイベントを受信しなくなりました。

TopicProcessor

次は、TopicProcessor。TopicProcessorはpublish-subscribeを、非同期のイベントループで行うProcessorです。

Async Pub-Sub : TopicProcessor

このため、ここまでのProcessorのコードと異なり、テストコードだと待ち合わせを考慮しないと、テストが不安定になります。が、このあたりはTestSubscriberがうまくやってくれます。

では、作成したコード。

    @Test
    public void asyncPubSub_topicProcessor() {
        TestSubscriber<Integer> subscriber = TestSubscriber.create();

        TopicProcessor<Integer> topic = TopicProcessor.create();

        topic.subscribe(subscriber);

        topic.onNext(1);
        topic.onNext(2);

        subscriber
                .awaitAndAssertNextValueCount(2L)
                .assertValues(1, 2);

        topic.onNext(3);

        subscriber
                .awaitAndAssertNextValueCount(1L)
                .assertValues(1, 2, 3);

        topic.onComplete();
        subscriber.await().assertComplete();
    }

TopicProcessor#createした後にSubscriberを紐付けるところまではふつうですが、

        TopicProcessor<Integer> topic = TopicProcessor.create();

        topic.subscribe(subscriber);

TopicProcessor#onNextでデータを送った後は、テストコードとしてはSuscriber側でwaitの考慮がないとうまく動かなくなります。

        topic.onNext(1);
        topic.onNext(2);

        subscriber
                .awaitAndAssertNextValueCount(2L)
                .assertValues(1, 2);

まあ、非同期に動いてますよってことですね。

TopicProcessor#onCompleteも非同期。

        topic.onComplete();
        subscriber.await().assertComplete();

Subscriberを複数紐付けた場合は、途中からの受信になります。

   @Test
    public void asyncPubSub_topicProcessorAndSubscribers() {
        TestSubscriber<Integer> subscriber1 = TestSubscriber.create();

        TopicProcessor<Integer> topic = TopicProcessor.create();

        topic.subscribe(subscriber1);

        topic.onNext(1);
        topic.onNext(2);

        subscriber1
                .awaitAndAssertNextValueCount(2L)
                .assertValues(1, 2);

        TestSubscriber<Integer> subscriber2 = TestSubscriber.create();
        topic.subscribe(subscriber2);

        topic.onNext(3);

        subscriber1
                .awaitAndAssertNextValueCount(1L)
                .assertValues(1, 2, 3);

        subscriber2
                .awaitAndAssertNextValueCount(1L)
                .assertValues(3);

        topic.onComplete();

        subscriber1.await().assertComplete();
        subscriber2.await().assertComplete();
    }

WorkQueueProcessor

最後は、WorkQueueProcessorです。このProcessorは、TopicProcessorに似ていますが複数のSubscriberを紐付けて、各Subscriberにイベントは送信されないようです。どれかひとつって感じのようですね。

Async Distributed : WorkQueueProcessor

また、Reactive Streamsでの仕様は、一部守っていないっぽい?です。

The processor is very similar to TopicProcessor but only partially respects the Reactive Streams contract.

The purpose of this processor is to distribute the signals to only one of the subscribed subscribers and to share the demand amongst all subscribers. The scenario is akin to Executor or Round-Robin distribution. However there is no guarantee the distribution will be respecting a round-robin distribution all the time.

http://projectreactor.io/core/docs/api/?reactor/core/publisher/WorkQueueProcessor.html

とりあえず、コードを書いてみましょう。

    @Test
    public void asyncDistributed_workQueueProcessorWithSubscribers() {
        TestSubscriber<Integer> subscriber1 = TestSubscriber.create();

        WorkQueueProcessor<Integer> queue = WorkQueueProcessor.create();
        queue.subscribe(subscriber1);
        queue.onNext(1);
        queue.onNext(2);

        subscriber1
                .awaitAndAssertNextValueCount(2L)
                .assertValues(1, 2);

        TestSubscriber<Integer> subscriber2 = TestSubscriber.create();
        queue.subscribe(subscriber2);

        queue.onNext(3);
        queue.onNext(4);

        subscriber1
                .awaitAndAssertNextValueCount(1L)
                .assertValues(1, 2, 3, 4);
        subscriber2
                .awaitAndAssertNextValueCount(0L);

        queue.onComplete();

        subscriber1.await().assertComplete();
        subscriber2.await().assertComplete();
    }

WorkQueueProcessor#createでWorkQueueProcessorを作成し、Subscriberを紐付けます。

        WorkQueueProcessor<Integer> queue = WorkQueueProcessor.create();
        queue.subscribe(subscriber1);
        queue.onNext(1);
        queue.onNext(2);

        subscriber1
                .awaitAndAssertNextValueCount(2L)
                .assertValues(1, 2);

ですが、途中で紐付けたSubscriberにはonNextで送った内容が来なかったり。

        queue.onNext(3);
        queue.onNext(4);

        subscriber1
                .awaitAndAssertNextValueCount(1L)
                .assertValues(1, 2, 3, 4);
        subscriber2
                .awaitAndAssertNextValueCount(0L);

こちらも、やっぱり非同期で動いているようです。

最初から、2つSubscriberを紐付けてみましょう。

    @Test
    public void asyncDistributed_workQueueProcessorWithSubscribers2() throws InterruptedException {
        WorkQueueProcessor<Integer> queue = WorkQueueProcessor.create();
        queue.subscribe(v -> System.out.println("subscriber1: " + v));
        queue.subscribe(v -> System.out.println("subscriber2: " + v));

        queue.onNext(1);
        queue.onNext(2);

        queue.onNext(3);
        queue.onNext(4);

        queue.onNext(5);
        queue.onNext(6);

        queue.onComplete();

        TimeUnit.SECONDS.sleep(2L);
    }

結果。

subscriber1: 1
subscriber1: 2
subscriber1: 3
subscriber1: 4
subscriber1: 5
subscriber1: 6

全部、Subscriber1に寄りました…。

そこで、今度はWorkQueueProcessor#shareでWorkQueueProcessorを作成してみます。Subscriberは、途中から紐付けます。

    @Test
    public void asyncDistributed_workQueueProcessorWithSubscribersShared() throws InterruptedException {
        WorkQueueProcessor<Integer> queue = WorkQueueProcessor.share(true);
        queue.subscribe(v -> System.out.println("subscriber1: " + v));
        queue.onNext(1);
        queue.onNext(2);

        queue.subscribe(v -> System.out.println("subscriber2: " + v));

        queue.onNext(3);
        queue.onNext(4);

        queue.onNext(5);
        queue.onNext(6);

        queue.onComplete();

        TimeUnit.SECONDS.sleep(2L);
    }

結果。

subscriber1: 1
subscriber1: 2
subscriber1: 3
subscriber1: 4
subscriber1: 5
subscriber1: 6

ちなみに、この結果は不定で、subscriber2に寄ることもありました。

もう少し長めに紐付けてみましょう。

    @Test
    public void asyncDistributed_workQueueProcessorWithSubscribersShared2() throws InterruptedException {
        WorkQueueProcessor<Integer> queue = WorkQueueProcessor.share(true);
        queue.subscribe(v -> System.out.println("subscriber1: " + v));
        queue.subscribe(v -> System.out.println("subscriber2: " + v));

        queue.onNext(1);
        queue.onNext(2);

        queue.onNext(3);
        queue.onNext(4);

        queue.onNext(5);
        queue.onNext(6);

        queue.onComplete();

        TimeUnit.SECONDS.sleep(2L);
    }

結果。

subscriber1: 1
subscriber1: 3
subscriber1: 4
subscriber1: 5
subscriber1: 6
subscriber2: 2

subscriber2も現れました。

まあ、ひとつのイベントを、ひとつのSubscriberのみに配信するよってことだと思いますが、ちょっと試したデータ件数が少なかったかな?

まとめ

Reactor CoreのREADME.mdに載っている、各種Processorを試してみました。

それぞれ特性が違うのはよくわかりましたが、さてどう使ったものかな?という感じです。

とりあえず、こんなところで。