CLOVER🍀

That was when it all began.

Spring Cloud StreamのProcessorを書いてみる

始めたばかりのSpring Cloud Streamで、前回はとりあえずSourceとSinkを作ってみたものの、
今回はProcessorを書いてみたいと思います。

Processorとは?

Binderにメッセージを送り込むのがSource、メッセージを受け取るのがSinkですが、この両方を併せ持ったのが
Processorみたいです。

実際、Spring Cloud Streamのドキュメントを見ると、ProcessorはSourceとSinkを継承していることになっています。

public interface Processor extends Source, Sink {
}

http://docs.spring.io/spring-cloud-stream/docs/Brooklyn.SR3/reference/htmlsingle/#__literal_source_literal_literal_sink_literal_and_literal_processor_literal

SourceとSinkの間に挟み込むことで、メッセージを変換したり、フィルタリングしたり、集約などをする、といった
目的で使われるみたいですね。

と、前置きはこのあたりにして、書いていってみましょう。

お題と準備

今回はProcessorが焦点ですが、一応SourceとSinkも作成します。お題はこんな感じ。

  • Source … 単純なREST APIとして作成し、受け取ったJSONメッセージをBinderに送信する
  • Sink … 受け取ったメッセージを、コンソールに出力する
  • Binder … Apache Kafkaを使用し、Source、Processor、Sinkで同じBinderを使用する

で、ProcessorはSourceが受け取ったメッセージの一部項目を、ちょっと装飾する感じにしてみましょう。

構成は、Mavenのマルチプロジェクトとします。

親pom。

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>

        <spring.boot.version>1.4.2.RELEASE</spring.boot.version>
    </properties>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-stream-dependencies</artifactId>
                <version>Brooklyn.SR3</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>${spring.boot.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <pluginManagement>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                    <version>${spring.boot.version}</version>
                </plugin>
            </plugins>
        </pluginManagement>
    </build>

以降のSource、Sink、そしてProcessorのpomは、この親pomを継承して作成するものとします。

SourceとSinkの作成

簡単にSoruceとSinkを作成します。

Sourceのpom。

    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <executions>
                    <execution>
                        <goals>
                            <goal>repackage</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

Sourceのソースコードは、こちら。
src/main/java/org/littlewings/spring/stream/api/ApiSourceApp.java

package org.littlewings.spring.stream.api;

import java.util.Map;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.http.HttpStatus;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController;

@SpringBootApplication
@EnableBinding(Source.class)
@RestController
public class ApiSourceApp {
    public static void main(String... args) {
        SpringApplication.run(ApiSourceApp.class, args);
    }

    Source source;

    public ApiSourceApp(Source source) {
        this.source = source;
    }

    @PostMapping("api/register")
    @ResponseStatus(HttpStatus.CREATED)
    public Map<String, Object> register(@RequestBody Map<String, Object> message) {
        source.output().send(MessageBuilder.withPayload(message).build());
        return message;
    }
}

HTTPボディで受け取った内容を、そのままBinderに送る単純な内容。

設定は、こんな感じです。
api-source/src/main/resources/application.properties

spring.cloud.stream.bindings.output.destination=message-source
spring.cloud.stream.bindings.output.contentType=application/json

spring.cloud.stream.kafka.binder.brokers=172.17.0.2
spring.cloud.stream.kafka.binder.defaultBrokerPort=9092
spring.cloud.stream.kafka.binder.zkNodes=172.17.0.2
spring.cloud.stream.kafka.binder.defaultZkPort=2181

Binderへ向けたTopicの名前は、「message-source」としました。また、Apache KafkaおよびApache ZooKeeperが稼働している
ホストのIPアドレスは、「172.17.0.2」とします。

続いて、Sink側。

pomの設定は、Sourceと同じです。

    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <executions>
                    <execution>
                        <goals>
                            <goal>repackage</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

Sinkのソースコード
src/main/java/org/littlewings/spring/stream/consolesink/ConsoleSinkApp.java

package org.littlewings.spring.stream.consolesink;

import java.util.Map;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;

@SpringBootApplication
@EnableBinding(Sink.class)
public class ConsoleSinkApp {
    public static void main(String... args) {
        SpringApplication.run(ConsoleSinkApp.class, args);
    }

    @StreamListener(Sink.INPUT)
    public void println(Map<String, Object> message) {
        System.out.println("received message = " + message);
    }
}

受け取ったメッセージを、コンソール出力しているだけです。

設定は、こちら。
src/main/resources/application.properties

spring.cloud.stream.bindings.input.destination=message-decorated
spring.cloud.stream.bindings.input.group=group-decorated

spring.cloud.stream.kafka.binder.brokers=172.17.0.2
spring.cloud.stream.kafka.binder.defaultBrokerPort=9092
spring.cloud.stream.kafka.binder.zkNodes=172.17.0.2
spring.cloud.stream.kafka.binder.defaultZkPort=2181

server.port=10080

こちらでは、Binderからメッセージを受け取る際のTopicの名前を「message-decorated」としました。
Sourceでは、「message-source」でした。今回は、この間を埋めるのがProcessorということになります。
同じTopicの名前を使ってしまうと、今回Apache Kafkaはひとつなのでメッセージがぐるぐると回るので…。

Sink側のリッスンポートは、「10080」としました。Sourceは8080を使うことにします。

ここまでで、SourceとSinkの作成はおしまい。

Processorを作る

では、Processorを作ってみましょう。今回は、以下のようなJSON

{"id": 12345, "name": "磯野カツオ" }

こう変換するようにしてみましょう。
※「name」の値に「★」を付ける

{"id": 12345, "name": "★磯野カツオ★" }

pomは、Source、Sinkと同じです。

    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <executions>
                    <execution>
                        <goals>
                            <goal>repackage</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

作成したProcessorソースコードは、こちら。
src/main/java/org/littlewings/spring/stream/processor/DecorateProcessorApp.java

package org.littlewings.spring.stream.processor;

import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.Map;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.integration.annotation.Transformer;
import org.springframework.messaging.handler.annotation.Payload;

@SpringBootApplication
@EnableBinding(Processor.class)
public class DecorateProcessorApp {
    public static void main(String... args) {
        SpringApplication.run(DecorateProcessorApp.class, args);
    }

    ObjectMapper objectMapper;

    public DecorateProcessorApp(ObjectMapper objectMapper) {
        this.objectMapper = objectMapper;
    }

    @Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
    public Object decorate(@Payload String payload) throws IOException {
        Map<String, Object> parsed = objectMapper.readValue(payload, Map.class);

        String name = (String) parsed.get("name");
        String decoratedName = "★" + name + "★";

        Map<String, Object> transformed = new LinkedHashMap<>(parsed);
        transformed.put("name", decoratedName);
        return transformed;
    }
}

まず、@EnableBindingに、Processorクラスを指定します。

@SpringBootApplication
@EnableBinding(Processor.class)
public class DecorateProcessorApp {

実装する処理ですが、今回はメッセージの変換を行うので@Transformerを使用しました。メッセージはStringで来るみたい(?)なので、
Jacksonでパースしました…。
※この回避方法については、後述

    @Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
    public Object decorate(String payload) throws IOException {
        Map<String, Object> parsed = objectMapper.readValue(payload, Map.class);

        String name = (String) parsed.get("name");
        String decoratedName = "★" + name + "★";

        Map<String, Object> transformed = new LinkedHashMap<>(parsed);
        transformed.put("name", decoratedName);
        return transformed;
    }

で、この@Transformerアノテーションですが、Spring Integrationのアノテーションのようです。@Transformerは、メッセージを
変換できることを意味します。

org.springframework.integration.annotation (Spring Integration 5.0.7.RELEASE API)

@Transformerの他に、@Filterや@Aggregatorなどがあります。

Spring Cloud Streamが提供している、@Transformer(というかProcessor)のサンプルはこちら。

https://github.com/spring-cloud/spring-cloud-stream-samples/tree/master/transform

追記
メッセージがStringで来てしまう件については、Spring Integrationのアノテーションではなく、Spring Cloud Streamの
SourceやSinkと同じような方法で実装すればいいみたいです。

たとえば、このように。こうすれば、今回のアプリケーションではMap形式でメッセージを受け取れるようになります。
また、キューへの送信は@SendToアノテーションを使用して戻り値としています。

    @StreamListener(Processor.INPUT)
    @SendTo(Processor.OUTPUT)
    public Map<String, Object> transform(Map<String, Object> payload) {
        String name = (String) payload.get("name");
        String decoratedName = "★" + name + "★";

        Map<String, Object> transformed = new LinkedHashMap<>(payload);
        transformed.put("name", decoratedName);
        return transformed;
    }

追記はここまで。

設定は、このように。
decorate-processor/src/main/resources/application.properties

spring.cloud.stream.bindings.input.destination=message-source
spring.cloud.stream.bindings.input.group=group-source

spring.cloud.stream.bindings.output.destination=message-decorated
spring.cloud.stream.bindings.output.contentType=application/json

spring.cloud.stream.kafka.binder.brokers=172.17.0.2
spring.cloud.stream.kafka.binder.defaultBrokerPort=9092
spring.cloud.stream.kafka.binder.zkNodes=172.17.0.2
spring.cloud.stream.kafka.binder.defaultZkPort=2181

server.port=9080

Topicは、inputとoutput両方にあります。ここで、先ほどのSourceとSinkをつないでいることになります。

ポートは9080としました。

確認

ここまでできたら、アプリケーションを起動して確認してみます。

$ mvn package

起動。

## Source
$ java -jar target/api-source-0.0.1-SNAPSHOT.jar

## Processor
$ java -jar target/decorate-processor-0.0.1-SNAPSHOT.jar

## Sink
$ java -jar target/console-sink-0.0.1-SNAPSHOT.jar

curlで確認。

$ curl -XPOST -H 'Content-Type: application/json' http://localhost:8080/api/register -d '{"id": 12345, "name": "磯野カツオ" }'

Sink側に、こんな内容が出力されればOKです。

received message = {id=12345, name=★磯野カツオ★}

Spring Cloud Stream側で用意されているProcessorを使ってみる

今回、Processorを自作したわけですが、Spring Cloud Stream(Spring Cloud Stream App Starters)に用意されているProcessor
同じことをしてみましょう。

Spring Cloud Stream App Starters

@Transformerな使い方をするので、今回は「groovy-transform」を選んでみましょう。

親pomのdependencyManagementに、「groovy-transform」を追加。

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-stream-dependencies</artifactId>
                <version>Brooklyn.SR3</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
            <dependency>
                <groupId>org.springframework.cloud.stream.app</groupId>
                <artifactId>groovy-transform-app-dependencies</artifactId>
                <version>1.1.1.RELEASE</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>${spring.boot.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

こちらを使った、新しいモジュールを作成します。

pomの設定は、こちら。

    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud.stream.app</groupId>
            <artifactId>spring-cloud-starter-stream-processor-groovy-transform</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <executions>
                    <execution>
                        <goals>
                            <goal>repackage</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

「spring-cloud-starter-stream-processor-groovy-transform」が増えました。

Processorソースコードは、このくらいでおしまいです。
src/main/java/org/littlewings/spring/stream/processor/GroovyTransformProcessorApp.java

package org.littlewings.spring.stream.processor;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.app.groovy.transform.processor.GroovyTransformProcessorConfiguration;
import org.springframework.context.annotation.Import;

@SpringBootApplication
@Import(GroovyTransformProcessorConfiguration.class)
public class GroovyTransformProcessorApp {
    public static void main(String... args) {
        SpringApplication.run(GroovyTransformProcessorApp.class, args);
    }
}

GroovyTransformProcessorConfigurationを@Importしているだけです。

設定は、こちら。
src/main/resources/application.properties

spring.cloud.stream.bindings.input.destination=message-source
spring.cloud.stream.bindings.input.group=group-source

spring.cloud.stream.bindings.output.destination=message-decorated
spring.cloud.stream.bindings.output.contentType=application/json

spring.cloud.stream.kafka.binder.brokers=172.17.0.2
spring.cloud.stream.kafka.binder.defaultBrokerPort=9092
spring.cloud.stream.kafka.binder.zkNodes=172.17.0.2
spring.cloud.stream.kafka.binder.defaultZkPort=2181

server.port=9080

groovy-transformer.script=transform.groovy

先ほどの自作のProcessorとほぼ同じ設定なのですが、最後に@Transformerで呼び出されるGroovyスクリプトのパスを書いています。

Starterで用意されている各種モジュールで、どのような設定ができるかはドキュメントに書かれているので、こちらを参照するとよいでしょう。

Spring Cloud Stream App Starters Reference Guide

Groovyスクリプトは、クラスパス上に配置しました。
src/main/resources/transform.groovy

def message = new groovy.json.JsonSlurper().parseText(payload)
def decoratedName = "★${message.get('name')}★"

def transformed = new LinkedHashMap(message)
transformed.put('name', decoratedName)

groovy.json.JsonOutput.toJson(transformed)

先ほどと同じくStringで受けているのですが、こちらではStringで返しています…。

結果は、先ほどと同様なので割愛。

まとめ

Spring Cloud StreamのProcessorを自分でも作ってみました。

とりあえず動かすことはできましたが、どうなんでしょ。ふつうにSourceとSinkっぽいコードを書いた時よりも、どういったところが
良いのかなぁというのが、ちょっと今回はわかりませんでした…。

もう少し理解を深めたり、APIに習熟したりするとわかるのかな?