CLOVER🍀

That was when it all began.

3.1からやり直すReactor Core/Test

以前に少し触ったReactor 3なのですが、当時はまだ安定バージョンでなかったり、周辺状況が整っていなかったりと
一時触るのを中断していました。

Project Reactor

はじめてのReactor Core - CLOVER

Spring Framework 5がリリースされ、その中にReactorが組み込まれたこともあり、そろそろ再開してみようかということで。

Reactive Programming with Spring 5.0 M1

Reactive API

では、軽く触ってみましょう。

ドキュメントは、こちら。

Documentation

Reactor 3 Reference Guide

準備

現在のReactorは、BOMが提供されているようですね(Reactor Core 3.0.4かららしい)。

Understanding the BOM

Mavenで、こちらをdependencyManagementに定義します。

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>io.projectreactor</groupId>
                <artifactId>reactor-bom</artifactId>
                <version>Bismuth-SR4</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

現時点でのRelease Trainは、「Bismuth-SR4」です。

今回は、「reactor-core」と「reactor-test」をdependencyに追加します。

        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-core</artifactId>

        </dependency>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-test</artifactId>
            <scope>test</scope>
        </dependency>

なお、これで入る「reactor-core」と「reactor-test」のバージョンは「3.1.2.RELEASE」です。

あとは、テスト用にJUnit 5とAssertJを足しておきましょう。

        <dependency>
            <groupId>org.junit.platform</groupId>
            <artifactId>junit-platform-launcher</artifactId>
            <version>1.0.2</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-engine</artifactId>
            <version>5.0.2</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.assertj</groupId>
            <artifactId>assertj-core</artifactId>
            <version>3.9.0</version>
            <scope>test</scope>
        </dependency>

JUnit 5向けのMaven Surefire Pluginの設定は端折ります。

これで、Reactor Core(とReactor Test)を使う準備ができました、と。

テストコードの雛形

とりあえず、テストコードの雛形を用意。
src/test/java/org/littlewings/reactor/ReactorGettingStartedTest.java

package org.littlewings.reactor;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

import org.junit.jupiter.api.Test;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import reactor.test.publisher.PublisherProbe;
import reactor.test.publisher.TestPublisher;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

public class ReactorGettingStartedTest {
    // ここに、テストを書く!!
}

とりあえず使ってみる

とりあえず、難しいことは考えずに、Mono/Fluxを使ってみましょう。

    @Test
    public void mono() {
        Mono
                .just("Hello Reactor")
                .subscribe(System.out::println);
    }

    @Test
    public void flux() {
        Flux
                .just("Hello", "Reactor")
                .buffer(2)
                .subscribe(messages -> System.out.println(String.join(" ", messages)));
    }

単に、System.out.printlnしているだけですが。

これを動かすと、どちらのコードもコンソールには

Hello Reactor

と出力されます。

テスト系のクラスを使ってみる

このままMono、Fluxをいろいろ試してみてもいいのですが、Reactor 3.0の頃からするとテスト系のクラスが増えているようなので、今回はこちらを軽く
試してみるとしましょう。

Testing

まあ、Reactor Testに含まれる内容ですね。

StepVerifier

MonoやFluxにおける、subscribeのテストをする時に使いそうな感じですね。
Testing a Scenario with StepVerifier

例えば、Monoで確認。

    @Test
    public void monoWithStepVerifier() {
        StepVerifier
                .create(Mono.just("Hello Reactor"))
                .expectNext("Hello Reactor")
                .verifyComplete();

        StepVerifier
                .create(Mono.just("Hello Reactor"))
                .expectNextCount(1)
                .verifyComplete();
    }

今回はMono#justで作成したMonoですが、これをStepVerifier#createに渡すことでStepVerifier.FirstStepのインスタンスが得られるようです。

あとは、expectNextで値を確認したり、epectNextCountであといくつデータが流れてくるかを定義し、最後にverifyCompleteで確認します。
その他、expectXxxなメソッドがいくつかあるので、Javadocで確認しましょう。

StepVerifier.Step (Reactor Test 3.1.2.RELEASE)

Fluxでも試してみましょう。

    @Test
    public void fluxWithStepVerifier() {
        StepVerifier
                .create(Flux.just("Hello", "Reactor"))
                .expectNext("Hello")
                .expectNext("Reactor")
                .verifyComplete();

        StepVerifier
                .create(Flux.just("Hello", "Reactor"))
                .expectNext("Hello", "Reactor")
                .verifyComplete();

        StepVerifier
                .create(Flux.just("Hello", "Reactor"))
                .expectNextCount(2)
                .verifyComplete();

        StepVerifier
                .create(Flux.just("Hello", "Reactor"))
                .expectNextCount(1)
                .expectNext("Reactor")
                .verifyComplete();

        assertThatThrownBy(() ->
                StepVerifier
                        .create(Flux.just("Hello", "Reactor"))
                        .expectNext("Hello")
                        .verifyComplete()
        ).isInstanceOf(AssertionError.class);
    }

今回はFlux#justで「Hello」と「World」の文字列をそれぞれ渡していますが、データに着目しているものはexpectNext、数ならexpectNextCountで
同様に確認しています。

        StepVerifier
                .create(Flux.just("Hello", "Reactor"))
                .expectNext("Hello")
                .expectNext("Reactor")
                .verifyComplete();

        StepVerifier
                .create(Flux.just("Hello", "Reactor"))
                .expectNext("Hello", "Reactor")
                .verifyComplete();

        StepVerifier
                .create(Flux.just("Hello", "Reactor"))
                .expectNextCount(2)
                .verifyComplete();

なお、expectNextとexpectNextCountは排他の関係にあるようで。

        StepVerifier
                .create(Flux.just("Hello", "Reactor"))
                .expectNextCount(1)
                .expectNext("Reactor")
                .verifyComplete();

そりゃそうですよね…。

あと、検証内容と実際のPublisherが不一致だと、AssertionErrorがスローされます。

        assertThatThrownBy(() ->
                StepVerifier
                        .create(Flux.just("Hello", "Reactor"))
                        .expectNext("Hello")
                        .verifyComplete()
        ).isInstanceOf(AssertionError.class);

この場合だと、「Reactor」メッセージを確認しないままverifyCompleteを呼び出しています。

TestPublisher

Publisherからのデータの送信などをコントロールしたい場合は、TestPublisherを使えばよいみたいです。

Manually Emitting with TestPublisher

TestPublisherを使用すると、next、emit、completeなどを使ってSubscriber側へイベントを送り込むことができます。

というわけで、まずはダミーのSubscriberを作ってみましょう。

    static class SimpleSubscriber extends BaseSubscriber<String> {
        List<String> values = Collections.synchronizedList(new ArrayList<>());

        AtomicBoolean completed = new AtomicBoolean(false);

        @Override
        protected void hookOnNext(String value) {
            values.add(value);
        }

        @Override
        protected void hookOnComplete() {
            completed.set(true);
        }
    }

確認コード。

    @Test
    public void withTestPublisher() {
        TestPublisher<String> publisher = TestPublisher.create();

        SimpleSubscriber subscriber = new SimpleSubscriber();

        publisher.subscribe(subscriber);

        publisher.next("Hello");
        publisher.next("Reactor");
        publisher.complete();

        assertThat(subscriber.values).containsExactly("Hello", "Reactor");
        assertThat(subscriber.completed.get()).isEqualTo(true);
    }

TestPublisherは、createメソッドで作成します。

        TestPublisher<String> publisher = TestPublisher.create();

とりあえず、これにSubscriberを登録して

        SimpleSubscriber subscriber = new SimpleSubscriber();

        publisher.subscribe(subscriber);

nextでデータを送ったり、completeで完了させて

        publisher.next("Hello");
        publisher.next("Reactor");
        publisher.complete();

Subscriber側の確認をしてみます。

        assertThat(subscriber.values).containsExactly("Hello", "Reactor");
        assertThat(subscriber.completed.get()).isEqualTo(true);

ドキュメントにも書かれていますが、StepVerifierに比べるとちょっとAdvancedな感…。

PublisherProbe

最後は、PublisherProbe。

Checking the Execution Path with PublisherProbe

PublisherProbeを使うと、PublisherProbeが提供するMonoやFluxを各Publisherの操作の中に入れ込み、期待通りの実行パスが動作しているかを
確認することができます。

ちなみに先ほど出てきたTestPublisherは、このPublisherProbeの実装のひとつだったりします。

ドキュメントには2つのMonoを合成する例が書かれていますが、ここでは単純に。

    @Test
    public void withPublisherProbe() {
        PublisherProbe<String> probe = PublisherProbe.empty();

        StepVerifier
                .create(Mono.just("Hello").concatWith(probe.mono()))
                .expectNextCount(1)
                .verifyComplete();

        probe.assertWasSubscribed();
        probe.assertWasRequested();
        probe.assertWasNotCancelled();

        assertThatThrownBy(() -> probe.assertWasNotRequested())
                .isInstanceOf(AssertionError.class);
    }

PublisherProbeは、PublisherProbe#emptyで作成します。

        PublisherProbe<String> probe = PublisherProbe.empty();

PublisherProbe#monoでMonoを、PublisherProbe#fluxでFluxを作成することができます。

これを、今回はMonoとconcatWithしてStepVerifierに放り込んでみます。

        StepVerifier
                .create(Mono.just("Hello").concatWith(probe.mono()))
                .expectNextCount(1)
                .verifyComplete();

あとは、PublisherProbeにより提供されたMonoがsubscribeされたか?requestされたか?キャンセルされていないか?というのを確認してみます。

        probe.assertWasSubscribed();
        probe.assertWasRequested();
        probe.assertWasNotCancelled();

実際の状態と反対のことを確認すると、AssertionErrorが飛んできます。

        assertThatThrownBy(() -> probe.assertWasNotRequested())
                .isInstanceOf(AssertionError.class);

まとめ

簡単にですが、ざっとReactor Testを扱いつつ、Hello World的な感じで遊んでみました。

テストまわりがだいぶ充実して、良い感じになっていますね。これから少しずつキャッチアップしていきましょう。