CLOVER🍀

That was when it all began.

Spring Cloud StreamのTest Supportを使う

Spring Cloud Streamを少しずつ試しているわけですが、毎回SinkからSourceまで作って動かしていくのは
ちょっと面倒なので、テストコードで動かす方法を学んでみようと思います。

Spring Cloud Stream自体に、テストのサポートがあるようですし(P.113)。

Event Driven Microservices with Spring Cloud Stream #jjug_ccc #ccc_ab3

オフィシャルのドキュメントでは、こちら。

Testing

「spring-cloud-stream-test-support」というモジュールを使うようです。こちらを使用すると、Apache Kafkaや
RabbitMQといったMessageQueueがなくてもSinkやSource、Processorを動作させることができるようです。

というわけで、試してみましょう。

準備

Maven依存関係。

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-stream-dependencies</artifactId>
                <version>Brooklyn.SR3</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>1.4.2.RELEASE</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-test-support</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

最低限、こんな感じでしょうか。今回はApache Kafkaは使わず、あくまでテストコードで動かすのみとします。

テストコードを含めて、動かすのに最低限@SpringBootApplicationの付与されたクラスが必要になるので、
共通的にこちらを作成しておきます。
src/main/java/org/littlewings/spring/stream/App.java

package org.littlewings.spring.stream;

import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class App {
}

テストコードを動かすだけなら特に設定は不要なのですが、Content-Typeの設定でSourceやProcessorの戻り値が
変わるようなので、実際に使いそうな「application/json」を指定しておくことにします。
src/test/resources/application.properties

spring.cloud.stream.bindings.output.content-type=application/json

他の設定は、とりあえず不要です。

ここから先は、Source、Sink、Processorの順で本体のコードとテストコードを書いていきます。

Source

では、最初にSourceを作成します。

作成したSourceは、こんな感じ。受け取ったStringをちょっと装飾してMapに入れて送信します。
src/main/java/org/littlewings/spring/stream/MySimpleSource.java

package org.littlewings.spring.stream;

import java.util.LinkedHashMap;
import java.util.Map;

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

@EnableBinding(Source.class)
@Component
public class MySimpleSource {
    Source source;

    public MySimpleSource(Source source) {
        this.source = source;
    }

    public void send(String message) {
        Map<String, Object> payload = new LinkedHashMap<>();
        payload.put("message", "[" + message + "]");
        source.output().send(MessageBuilder.withPayload(payload).build());
    }
}

@Componentは@EnableBindingをつけているので今回は別になくてもよいのですが、@EnableBindingを別のクラスに付与すると
必要になるので(そりゃそうだ)覚えておく意味でつけておきました。

対するSourceのテストは、こちら。
src/test/java/org/littlewings/spring/stream/MySimpleSourceTest.java

package org.littlewings.spring.stream;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.cloud.stream.test.binder.MessageCollector;
import org.springframework.messaging.Message;
import org.springframework.test.context.junit4.SpringRunner;

import static org.assertj.core.api.Assertions.assertThat;

@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE)
public class MySimpleSourceTest {
    @Autowired
    MySimpleSource mySimpleSource;

    @Autowired
    Source source;

    @Autowired
    MessageCollector messageCollector;

    @Test
    public void send() {
        mySimpleSource.send("Hello Source!!");
        Message<String> sendMessage =
                (Message<String>) messageCollector.forChannel(source.output()).poll();
        assertThat(sendMessage.getPayload())
                .isEqualTo("{\"message\":\"[Hello Source!!]\"}");
    }
}

組み込みのTomcatは起動する必要がないので、SpringBootTest.WebEnvironment.NONEを指定します。

ここでは、作成した自作のSource自身とSource、それからMessageCollectorというものを使用します。MessageCollectorは、
Spring Cloud Stream Test Supportモジュールが提供するものになります。

それぞれAutowiredでインジェクションして取得。

あとは、Sourceにメッセージを送信するようにふつうに呼び出して

        mySimpleSource.send("Hello Source!!");

その結果を受け取るように、MessageCollectorにポーリングしてもらいます。

        Message<String> sendMessage =
                (Message<String>) messageCollector.forChannel(source.output()).poll();

で、結果を確認する、と。Content-Typeに「application/json」を指定したので、ここではJSON文字列が
返ってきます。

        assertThat(sendMessage.getPayload())
                .isEqualTo("{\"message\":\"[Hello Source!!]\"}");

Content-Typeを指定しなかった場合は、Mapのインスタンスが返ってくることになります。

これで、Sourceについてはおしまいです。

Sink

続いて、Sinkを作成。

今回は、受け取ったメッセージをそのままコンソールに出力する実装にします。
src/main/java/org/littlewings/spring/stream/MySimpleSink.java

package org.littlewings.spring.stream;

import java.util.Map;

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.stereotype.Component;

@EnableBinding(Sink.class)
@Component
public class MySimpleSink {
    @StreamListener(Sink.INPUT)
    public void receive(Map<String, Object> payload) {
        System.out.println("received = " + payload);
    }
}

で、テスト。
src/test/java/org/littlewings/spring/stream/MySimpleSinkTest.java

package org.littlewings.spring.stream;

import java.util.LinkedHashMap;
import java.util.Map;

import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.rule.OutputCapture;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.test.context.junit4.SpringRunner;

import static org.assertj.core.api.Assertions.assertThat;

@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE)
public class MySimpleSinkTest {
    @Autowired
    Sink sink;

    @Rule
    public OutputCapture capture = new OutputCapture();

    @Test
    public void receive() {
        Map<String, Object> payload = new LinkedHashMap<>();
        payload.put("message", "Hello Sink!!");
        Message<Map<String, Object>> message = new GenericMessage<>(payload);
        sink.input().send(message);
        assertThat(capture.toString())
                .isEqualTo("received = {message=Hello Sink!!}" + System.lineSeparator());
    }
}

Sinkのテストでは、自作のSinkは登場しません。Spring Cloud StreamのSinkがあればOKです。

あと、OutputCaptureというのは標準出力をキャプチャするSpring Bootが提供するクラスで、Spring Cloud Stream自体とは
直接関係がありませんが、自作のSinkが標準出力を使用している関係上使うことになります。

Sinkへのメッセージの送信は、送りたいメッセージを作成してGenericMessageで包んだうえで、Sink#input#sendで送信します。

        Map<String, Object> payload = new LinkedHashMap<>();
        payload.put("message", "Hello Sink!!");
        Message<Map<String, Object>> message = new GenericMessage<>(payload);
        sink.input().send(message);

Sourceの時とは異なり、sendの呼び出しでそのまま自作のSinkが呼び出されることになるので、Sourceの時のようにポーリング
するようなコードは登場しません。

単に標準出力の内容を検証すればOKです。

        assertThat(capture.toString())
                .isEqualTo("received = {message=Hello Sink!!}" + System.lineSeparator());

これで、Sinkもおしまい。

Processor

最後はProcessorです。

作成したProcessorは、こんな感じで。「★」をつけるだけです。
src/main/java/org/littlewings/spring/stream/MySimpleProcessor.java

package org.littlewings.spring.stream;

import java.util.LinkedHashMap;
import java.util.Map;

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;

@EnableBinding(Processor.class)
@Component
public class MySimpleProcessor {
    @StreamListener(Processor.INPUT)
    @SendTo(Processor.OUTPUT)
    public Map<String, Object> transform(Map<String, Object> payload) {
        String message = (String) payload.get("message");

        Map<String, Object> map = new LinkedHashMap<>();
        map.put("message", "★" + message + "★");
        return map;
    }
}

ところで、今回Source、Sinkと並べて作っていますが、これをそのまま正直に書いてしまうと「同じSourceから読み込む@StreamListenerが
重複してるよ」と怒られてしまいます。

org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'mySimpleSink' defined in file [/path/to/target/classes/org/littlewings/spring/stream/MySimpleSink.class]: Initialization of bean failed; nested exception is org.springframework.beans.factory.BeanInitializationException: Duplicate @StreamListener mapping for 'input' on org.littlewings.spring.stream.MySimpleSink#receive[1 args] already existing for org.littlewings.spring.stream.MySimpleProcessor#transform[1 args]

複数のchannelを用意してもいいのですが

Connecting to Multiple Systems

今回は簡単にデフォルトのchannel名でいくことにします。というわけで、先ほど作ったSinkはコメントアウトするなりしておきます。
Sourceも重複してしまうので、今回はコメントアウトするなりしておきます。

テストはこちら。SourceとSinkを合わせたような感じになります。
src/test/java/org/littlewings/spring/stream/MySimpleProcessorTest.java

package org.littlewings.spring.stream;

import java.util.LinkedHashMap;
import java.util.Map;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.cloud.stream.test.binder.MessageCollector;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.test.context.junit4.SpringRunner;

import static org.assertj.core.api.Assertions.assertThat;

@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE)
public class MySimpleProcessorTest {
    @Autowired
    Processor processor;

    @Autowired
    MessageCollector messageCollector;

    @Test
    public void process() {
        Map<String, Object> payload = new LinkedHashMap<>();
        payload.put("message", "Hello Sink!!");
        Message<Map<String, Object>> message = new GenericMessage<>(payload);

        processor.input().send(message);

        Message<String> proccessedMessage =
                (Message<String>) messageCollector.forChannel(processor.output()).poll();
        assertThat(proccessedMessage.getPayload())
                .isEqualTo("{\"message\":\"★Hello Sink!!★\"}");
    }
}

Processorを@Autowiredで取得し、Processor#input#sendでメッセージを送信します。

        Map<String, Object> payload = new LinkedHashMap<>();
        payload.put("message", "Hello Sink!!");
        Message<Map<String, Object>> message = new GenericMessage<>(payload);

        processor.input().send(message);

この結果は、MessageCollectorから取得して確認します。

        Message<String> proccessedMessage =
                (Message<String>) messageCollector.forChannel(processor.output()).poll();
        assertThat(proccessedMessage.getPayload())
                .isEqualTo("{\"message\":\"★Hello Sink!!★\"}");

こんな感じで。

まとめ

Spring Cloud Streamでの、Source、Sink、そしてProcessorのテストを書いてみました。

これで、今後はテストコードを使った確認ができるようになるはず…です。