始めたばかりのSpring Cloud Streamで、前回はとりあえずSourceとSinkを作ってみたものの、
今回はProcessorを書いてみたいと思います。
Processorとは?
Binderにメッセージを送り込むのがSource、メッセージを受け取るのがSinkですが、この両方を併せ持ったのが
Processorみたいです。
実際、Spring Cloud Streamのドキュメントを見ると、ProcessorはSourceとSinkを継承していることになっています。
public interface Processor extends Source, Sink {
http://docs.spring.io/spring-cloud-stream/docs/Brooklyn.SR3/reference/htmlsingle/#__literal_source_literal_literal_sink_literal_and_literal_processor_literal
}
SourceとSinkの間に挟み込むことで、メッセージを変換したり、フィルタリングしたり、集約などをする、といった
目的で使われるみたいですね。
と、前置きはこのあたりにして、書いていってみましょう。
お題と準備
今回はProcessorが焦点ですが、一応SourceとSinkも作成します。お題はこんな感じ。
- Source … 単純なREST APIとして作成し、受け取ったJSONメッセージをBinderに送信する
- Sink … 受け取ったメッセージを、コンソールに出力する
- Binder … Apache Kafkaを使用し、Source、Processor、Sinkで同じBinderを使用する
で、ProcessorはSourceが受け取ったメッセージの一部項目を、ちょっと装飾する感じにしてみましょう。
構成は、Mavenのマルチプロジェクトとします。
親pom。
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <spring.boot.version>1.4.2.RELEASE</spring.boot.version> </properties> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-dependencies</artifactId> <version>Brooklyn.SR3</version> <type>pom</type> <scope>import</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-dependencies</artifactId> <version>${spring.boot.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <build> <pluginManagement> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <version>${spring.boot.version}</version> </plugin> </plugins> </pluginManagement> </build>
以降のSource、Sink、そしてProcessorのpomは、この親pomを継承して作成するものとします。
SourceとSinkの作成
簡単にSoruceとSinkを作成します。
Sourceのpom。
<dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <executions> <execution> <goals> <goal>repackage</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
Sourceのソースコードは、こちら。
src/main/java/org/littlewings/spring/stream/api/ApiSourceApp.java
package org.littlewings.spring.stream.api; import java.util.Map; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Source; import org.springframework.http.HttpStatus; import org.springframework.messaging.support.MessageBuilder; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.ResponseStatus; import org.springframework.web.bind.annotation.RestController; @SpringBootApplication @EnableBinding(Source.class) @RestController public class ApiSourceApp { public static void main(String... args) { SpringApplication.run(ApiSourceApp.class, args); } Source source; public ApiSourceApp(Source source) { this.source = source; } @PostMapping("api/register") @ResponseStatus(HttpStatus.CREATED) public Map<String, Object> register(@RequestBody Map<String, Object> message) { source.output().send(MessageBuilder.withPayload(message).build()); return message; } }
HTTPボディで受け取った内容を、そのままBinderに送る単純な内容。
設定は、こんな感じです。
api-source/src/main/resources/application.properties
spring.cloud.stream.bindings.output.destination=message-source spring.cloud.stream.bindings.output.contentType=application/json spring.cloud.stream.kafka.binder.brokers=172.17.0.2 spring.cloud.stream.kafka.binder.defaultBrokerPort=9092 spring.cloud.stream.kafka.binder.zkNodes=172.17.0.2 spring.cloud.stream.kafka.binder.defaultZkPort=2181
Binderへ向けたTopicの名前は、「message-source」としました。また、Apache KafkaおよびApache ZooKeeperが稼働している
ホストのIPアドレスは、「172.17.0.2」とします。
続いて、Sink側。
pomの設定は、Sourceと同じです。
<dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <executions> <execution> <goals> <goal>repackage</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
Sinkのソースコード。
src/main/java/org/littlewings/spring/stream/consolesink/ConsoleSinkApp.java
package org.littlewings.spring.stream.consolesink; import java.util.Map; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Sink; @SpringBootApplication @EnableBinding(Sink.class) public class ConsoleSinkApp { public static void main(String... args) { SpringApplication.run(ConsoleSinkApp.class, args); } @StreamListener(Sink.INPUT) public void println(Map<String, Object> message) { System.out.println("received message = " + message); } }
受け取ったメッセージを、コンソール出力しているだけです。
設定は、こちら。
src/main/resources/application.properties
spring.cloud.stream.bindings.input.destination=message-decorated spring.cloud.stream.bindings.input.group=group-decorated spring.cloud.stream.kafka.binder.brokers=172.17.0.2 spring.cloud.stream.kafka.binder.defaultBrokerPort=9092 spring.cloud.stream.kafka.binder.zkNodes=172.17.0.2 spring.cloud.stream.kafka.binder.defaultZkPort=2181 server.port=10080
こちらでは、Binderからメッセージを受け取る際のTopicの名前を「message-decorated」としました。
Sourceでは、「message-source」でした。今回は、この間を埋めるのがProcessorということになります。
同じTopicの名前を使ってしまうと、今回Apache Kafkaはひとつなのでメッセージがぐるぐると回るので…。
Sink側のリッスンポートは、「10080」としました。Sourceは8080を使うことにします。
ここまでで、SourceとSinkの作成はおしまい。
Processorを作る
では、Processorを作ってみましょう。今回は、以下のようなJSONを
{"id": 12345, "name": "磯野カツオ" }
こう変換するようにしてみましょう。
※「name」の値に「★」を付ける
{"id": 12345, "name": "★磯野カツオ★" }
pomは、Source、Sinkと同じです。
<dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <executions> <execution> <goals> <goal>repackage</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
作成したProcessorのソースコードは、こちら。
src/main/java/org/littlewings/spring/stream/processor/DecorateProcessorApp.java
package org.littlewings.spring.stream.processor; import java.io.IOException; import java.util.LinkedHashMap; import java.util.Map; import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Processor; import org.springframework.integration.annotation.Transformer; import org.springframework.messaging.handler.annotation.Payload; @SpringBootApplication @EnableBinding(Processor.class) public class DecorateProcessorApp { public static void main(String... args) { SpringApplication.run(DecorateProcessorApp.class, args); } ObjectMapper objectMapper; public DecorateProcessorApp(ObjectMapper objectMapper) { this.objectMapper = objectMapper; } @Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT) public Object decorate(@Payload String payload) throws IOException { Map<String, Object> parsed = objectMapper.readValue(payload, Map.class); String name = (String) parsed.get("name"); String decoratedName = "★" + name + "★"; Map<String, Object> transformed = new LinkedHashMap<>(parsed); transformed.put("name", decoratedName); return transformed; } }
まず、@EnableBindingに、Processorクラスを指定します。
@SpringBootApplication @EnableBinding(Processor.class) public class DecorateProcessorApp {
実装する処理ですが、今回はメッセージの変換を行うので@Transformerを使用しました。メッセージはStringで来るみたい(?)なので、
Jacksonでパースしました…。
※この回避方法については、後述
@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT) public Object decorate(String payload) throws IOException { Map<String, Object> parsed = objectMapper.readValue(payload, Map.class); String name = (String) parsed.get("name"); String decoratedName = "★" + name + "★"; Map<String, Object> transformed = new LinkedHashMap<>(parsed); transformed.put("name", decoratedName); return transformed; }
で、この@Transformerアノテーションですが、Spring Integrationのアノテーションのようです。@Transformerは、メッセージを
変換できることを意味します。
org.springframework.integration.annotation (Spring Integration 5.0.7.RELEASE API)
@Transformerの他に、@Filterや@Aggregatorなどがあります。
Spring Cloud Streamが提供している、@Transformer(というかProcessor)のサンプルはこちら。
https://github.com/spring-cloud/spring-cloud-stream-samples/tree/master/transform
追記)
メッセージがStringで来てしまう件については、Spring Integrationのアノテーションではなく、Spring Cloud Streamの
SourceやSinkと同じような方法で実装すればいいみたいです。
たとえば、このように。こうすれば、今回のアプリケーションではMap形式でメッセージを受け取れるようになります。
また、キューへの送信は@SendToアノテーションを使用して戻り値としています。
@StreamListener(Processor.INPUT) @SendTo(Processor.OUTPUT) public Map<String, Object> transform(Map<String, Object> payload) { String name = (String) payload.get("name"); String decoratedName = "★" + name + "★"; Map<String, Object> transformed = new LinkedHashMap<>(payload); transformed.put("name", decoratedName); return transformed; }
追記はここまで。
設定は、このように。
decorate-processor/src/main/resources/application.properties
spring.cloud.stream.bindings.input.destination=message-source spring.cloud.stream.bindings.input.group=group-source spring.cloud.stream.bindings.output.destination=message-decorated spring.cloud.stream.bindings.output.contentType=application/json spring.cloud.stream.kafka.binder.brokers=172.17.0.2 spring.cloud.stream.kafka.binder.defaultBrokerPort=9092 spring.cloud.stream.kafka.binder.zkNodes=172.17.0.2 spring.cloud.stream.kafka.binder.defaultZkPort=2181 server.port=9080
Topicは、inputとoutput両方にあります。ここで、先ほどのSourceとSinkをつないでいることになります。
ポートは9080としました。
確認
ここまでできたら、アプリケーションを起動して確認してみます。
$ mvn package
起動。
## Source $ java -jar target/api-source-0.0.1-SNAPSHOT.jar ## Processor $ java -jar target/decorate-processor-0.0.1-SNAPSHOT.jar ## Sink $ java -jar target/console-sink-0.0.1-SNAPSHOT.jar
curlで確認。
$ curl -XPOST -H 'Content-Type: application/json' http://localhost:8080/api/register -d '{"id": 12345, "name": "磯野カツオ" }'
Sink側に、こんな内容が出力されればOKです。
received message = {id=12345, name=★磯野カツオ★}
Spring Cloud Stream側で用意されているProcessorを使ってみる
今回、Processorを自作したわけですが、Spring Cloud Stream(Spring Cloud Stream App Starters)に用意されているProcessorで
同じことをしてみましょう。
Spring Cloud Stream App Starters
@Transformerな使い方をするので、今回は「groovy-transform」を選んでみましょう。
親pomのdependencyManagementに、「groovy-transform」を追加。
<dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-dependencies</artifactId> <version>Brooklyn.SR3</version> <type>pom</type> <scope>import</scope> </dependency> <dependency> <groupId>org.springframework.cloud.stream.app</groupId> <artifactId>groovy-transform-app-dependencies</artifactId> <version>1.1.1.RELEASE</version> <type>pom</type> <scope>import</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-dependencies</artifactId> <version>${spring.boot.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
こちらを使った、新しいモジュールを作成します。
pomの設定は、こちら。
<dependencies> <dependency> <groupId>org.springframework.cloud.stream.app</groupId> <artifactId>spring-cloud-starter-stream-processor-groovy-transform</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <executions> <execution> <goals> <goal>repackage</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
「spring-cloud-starter-stream-processor-groovy-transform」が増えました。
Processorのソースコードは、このくらいでおしまいです。
src/main/java/org/littlewings/spring/stream/processor/GroovyTransformProcessorApp.java
package org.littlewings.spring.stream.processor; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.stream.app.groovy.transform.processor.GroovyTransformProcessorConfiguration; import org.springframework.context.annotation.Import; @SpringBootApplication @Import(GroovyTransformProcessorConfiguration.class) public class GroovyTransformProcessorApp { public static void main(String... args) { SpringApplication.run(GroovyTransformProcessorApp.class, args); } }
GroovyTransformProcessorConfigurationを@Importしているだけです。
設定は、こちら。
src/main/resources/application.properties
spring.cloud.stream.bindings.input.destination=message-source spring.cloud.stream.bindings.input.group=group-source spring.cloud.stream.bindings.output.destination=message-decorated spring.cloud.stream.bindings.output.contentType=application/json spring.cloud.stream.kafka.binder.brokers=172.17.0.2 spring.cloud.stream.kafka.binder.defaultBrokerPort=9092 spring.cloud.stream.kafka.binder.zkNodes=172.17.0.2 spring.cloud.stream.kafka.binder.defaultZkPort=2181 server.port=9080 groovy-transformer.script=transform.groovy
先ほどの自作のProcessorとほぼ同じ設定なのですが、最後に@Transformerで呼び出されるGroovyスクリプトのパスを書いています。
Starterで用意されている各種モジュールで、どのような設定ができるかはドキュメントに書かれているので、こちらを参照するとよいでしょう。
Spring Cloud Stream App Starters Reference Guide
Groovyスクリプトは、クラスパス上に配置しました。
src/main/resources/transform.groovy
def message = new groovy.json.JsonSlurper().parseText(payload) def decoratedName = "★${message.get('name')}★" def transformed = new LinkedHashMap(message) transformed.put('name', decoratedName) groovy.json.JsonOutput.toJson(transformed)
先ほどと同じくStringで受けているのですが、こちらではStringで返しています…。
結果は、先ほどと同様なので割愛。