Spring Cloud Streamを少しずつ試しているわけですが、毎回SinkからSourceまで作って動かしていくのは
ちょっと面倒なので、テストコードで動かす方法を学んでみようと思います。
Spring Cloud Stream自体に、テストのサポートがあるようですし(P.113)。
Event Driven Microservices with Spring Cloud Stream #jjug_ccc #ccc_ab3
オフィシャルのドキュメントでは、こちら。
「spring-cloud-stream-test-support」というモジュールを使うようです。こちらを使用すると、Apache Kafkaや
RabbitMQといったMessageQueueがなくてもSinkやSource、Processorを動作させることができるようです。
というわけで、試してみましょう。
準備
Maven依存関係。
<dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-dependencies</artifactId> <version>Brooklyn.SR3</version> <type>pom</type> <scope>import</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-dependencies</artifactId> <version>1.4.2.RELEASE</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-test-support</artifactId> <scope>test</scope> </dependency> </dependencies>
最低限、こんな感じでしょうか。今回はApache Kafkaは使わず、あくまでテストコードで動かすのみとします。
テストコードを含めて、動かすのに最低限@SpringBootApplicationの付与されたクラスが必要になるので、
共通的にこちらを作成しておきます。
src/main/java/org/littlewings/spring/stream/App.java
package org.littlewings.spring.stream; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class App { }
テストコードを動かすだけなら特に設定は不要なのですが、Content-Typeの設定でSourceやProcessorの戻り値が
変わるようなので、実際に使いそうな「application/json」を指定しておくことにします。
src/test/resources/application.properties
spring.cloud.stream.bindings.output.content-type=application/json
他の設定は、とりあえず不要です。
ここから先は、Source、Sink、Processorの順で本体のコードとテストコードを書いていきます。
Source
では、最初にSourceを作成します。
作成したSourceは、こんな感じ。受け取ったStringをちょっと装飾してMapに入れて送信します。
src/main/java/org/littlewings/spring/stream/MySimpleSource.java
package org.littlewings.spring.stream; import java.util.LinkedHashMap; import java.util.Map; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Source; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; @EnableBinding(Source.class) @Component public class MySimpleSource { Source source; public MySimpleSource(Source source) { this.source = source; } public void send(String message) { Map<String, Object> payload = new LinkedHashMap<>(); payload.put("message", "[" + message + "]"); source.output().send(MessageBuilder.withPayload(payload).build()); } }
@Componentは@EnableBindingをつけているので今回は別になくてもよいのですが、@EnableBindingを別のクラスに付与すると
必要になるので(そりゃそうだ)覚えておく意味でつけておきました。
対するSourceのテストは、こちら。
src/test/java/org/littlewings/spring/stream/MySimpleSourceTest.java
package org.littlewings.spring.stream; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.cloud.stream.messaging.Source; import org.springframework.cloud.stream.test.binder.MessageCollector; import org.springframework.messaging.Message; import org.springframework.test.context.junit4.SpringRunner; import static org.assertj.core.api.Assertions.assertThat; @RunWith(SpringRunner.class) @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE) public class MySimpleSourceTest { @Autowired MySimpleSource mySimpleSource; @Autowired Source source; @Autowired MessageCollector messageCollector; @Test public void send() { mySimpleSource.send("Hello Source!!"); Message<String> sendMessage = (Message<String>) messageCollector.forChannel(source.output()).poll(); assertThat(sendMessage.getPayload()) .isEqualTo("{\"message\":\"[Hello Source!!]\"}"); } }
組み込みのTomcatは起動する必要がないので、SpringBootTest.WebEnvironment.NONEを指定します。
ここでは、作成した自作のSource自身とSource、それからMessageCollectorというものを使用します。MessageCollectorは、
Spring Cloud Stream Test Supportモジュールが提供するものになります。
それぞれAutowiredでインジェクションして取得。
あとは、Sourceにメッセージを送信するようにふつうに呼び出して
mySimpleSource.send("Hello Source!!");
その結果を受け取るように、MessageCollectorにポーリングしてもらいます。
Message<String> sendMessage = (Message<String>) messageCollector.forChannel(source.output()).poll();
で、結果を確認する、と。Content-Typeに「application/json」を指定したので、ここではJSON文字列が
返ってきます。
assertThat(sendMessage.getPayload()) .isEqualTo("{\"message\":\"[Hello Source!!]\"}");
Content-Typeを指定しなかった場合は、Mapのインスタンスが返ってくることになります。
これで、Sourceについてはおしまいです。
Sink
続いて、Sinkを作成。
今回は、受け取ったメッセージをそのままコンソールに出力する実装にします。
src/main/java/org/littlewings/spring/stream/MySimpleSink.java
package org.littlewings.spring.stream; import java.util.Map; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Sink; import org.springframework.stereotype.Component; @EnableBinding(Sink.class) @Component public class MySimpleSink { @StreamListener(Sink.INPUT) public void receive(Map<String, Object> payload) { System.out.println("received = " + payload); } }
で、テスト。
src/test/java/org/littlewings/spring/stream/MySimpleSinkTest.java
package org.littlewings.spring.stream; import java.util.LinkedHashMap; import java.util.Map; import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.rule.OutputCapture; import org.springframework.cloud.stream.messaging.Sink; import org.springframework.messaging.Message; import org.springframework.messaging.support.GenericMessage; import org.springframework.test.context.junit4.SpringRunner; import static org.assertj.core.api.Assertions.assertThat; @RunWith(SpringRunner.class) @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE) public class MySimpleSinkTest { @Autowired Sink sink; @Rule public OutputCapture capture = new OutputCapture(); @Test public void receive() { Map<String, Object> payload = new LinkedHashMap<>(); payload.put("message", "Hello Sink!!"); Message<Map<String, Object>> message = new GenericMessage<>(payload); sink.input().send(message); assertThat(capture.toString()) .isEqualTo("received = {message=Hello Sink!!}" + System.lineSeparator()); } }
Sinkのテストでは、自作のSinkは登場しません。Spring Cloud StreamのSinkがあればOKです。
あと、OutputCaptureというのは標準出力をキャプチャするSpring Bootが提供するクラスで、Spring Cloud Stream自体とは
直接関係がありませんが、自作のSinkが標準出力を使用している関係上使うことになります。
Sinkへのメッセージの送信は、送りたいメッセージを作成してGenericMessageで包んだうえで、Sink#input#sendで送信します。
Map<String, Object> payload = new LinkedHashMap<>(); payload.put("message", "Hello Sink!!"); Message<Map<String, Object>> message = new GenericMessage<>(payload); sink.input().send(message);
Sourceの時とは異なり、sendの呼び出しでそのまま自作のSinkが呼び出されることになるので、Sourceの時のようにポーリング
するようなコードは登場しません。
単に標準出力の内容を検証すればOKです。
assertThat(capture.toString())
.isEqualTo("received = {message=Hello Sink!!}" + System.lineSeparator());
これで、Sinkもおしまい。
Processor
最後はProcessorです。
作成したProcessorは、こんな感じで。「★」をつけるだけです。
src/main/java/org/littlewings/spring/stream/MySimpleProcessor.java
package org.littlewings.spring.stream; import java.util.LinkedHashMap; import java.util.Map; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Processor; import org.springframework.messaging.handler.annotation.SendTo; import org.springframework.stereotype.Component; @EnableBinding(Processor.class) @Component public class MySimpleProcessor { @StreamListener(Processor.INPUT) @SendTo(Processor.OUTPUT) public Map<String, Object> transform(Map<String, Object> payload) { String message = (String) payload.get("message"); Map<String, Object> map = new LinkedHashMap<>(); map.put("message", "★" + message + "★"); return map; } }
ところで、今回Source、Sinkと並べて作っていますが、これをそのまま正直に書いてしまうと「同じSourceから読み込む@StreamListenerが
重複してるよ」と怒られてしまいます。
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'mySimpleSink' defined in file [/path/to/target/classes/org/littlewings/spring/stream/MySimpleSink.class]: Initialization of bean failed; nested exception is org.springframework.beans.factory.BeanInitializationException: Duplicate @StreamListener mapping for 'input' on org.littlewings.spring.stream.MySimpleSink#receive[1 args] already existing for org.littlewings.spring.stream.MySimpleProcessor#transform[1 args]
複数のchannelを用意してもいいのですが
Connecting to Multiple Systems
今回は簡単にデフォルトのchannel名でいくことにします。というわけで、先ほど作ったSinkはコメントアウトするなりしておきます。
Sourceも重複してしまうので、今回はコメントアウトするなりしておきます。
テストはこちら。SourceとSinkを合わせたような感じになります。
src/test/java/org/littlewings/spring/stream/MySimpleProcessorTest.java
package org.littlewings.spring.stream; import java.util.LinkedHashMap; import java.util.Map; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.cloud.stream.messaging.Processor; import org.springframework.cloud.stream.test.binder.MessageCollector; import org.springframework.messaging.Message; import org.springframework.messaging.support.GenericMessage; import org.springframework.test.context.junit4.SpringRunner; import static org.assertj.core.api.Assertions.assertThat; @RunWith(SpringRunner.class) @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE) public class MySimpleProcessorTest { @Autowired Processor processor; @Autowired MessageCollector messageCollector; @Test public void process() { Map<String, Object> payload = new LinkedHashMap<>(); payload.put("message", "Hello Sink!!"); Message<Map<String, Object>> message = new GenericMessage<>(payload); processor.input().send(message); Message<String> proccessedMessage = (Message<String>) messageCollector.forChannel(processor.output()).poll(); assertThat(proccessedMessage.getPayload()) .isEqualTo("{\"message\":\"★Hello Sink!!★\"}"); } }
Processorを@Autowiredで取得し、Processor#input#sendでメッセージを送信します。
Map<String, Object> payload = new LinkedHashMap<>(); payload.put("message", "Hello Sink!!"); Message<Map<String, Object>> message = new GenericMessage<>(payload); processor.input().send(message);
この結果は、MessageCollectorから取得して確認します。
Message<String> proccessedMessage = (Message<String>) messageCollector.forChannel(processor.output()).poll(); assertThat(proccessedMessage.getPayload()) .isEqualTo("{\"message\":\"★Hello Sink!!★\"}");
こんな感じで。