CLOVER🍀

That was when it all began.

はじめてのSpring Cloud Stream

最近興味があるプロダクトのひとつに、Spring Cloud Streamがあります。眺めていてだいぶ時間が経っているのですが、
そろそろ試してみようかなということで。

Spring Cloud Streamとは?

マイクロサービス間のメッセージングを実現するプロダクトです。

Spring Cloud Stream

Spring Cloud Streamでマイクロサービス間メッセージング

Event Driven Microservices with Spring Cloud Stream #jjug_ccc #ccc_ab3

ドキュメントは、こちら。
Spring Cloud Stream Reference Guide

あるアプリケーション(Source)からメッセージをBinder(キュー)を介して、別のアプリケーション(Sink)側で
受け取る仕組みをSpringを使ったアプリケーションとして組み上げます。

Spring Cloud Streamの用語でメッセージの送り元をSource、受け取り側をSinkと呼ぶようですが、間に変換処理などを
行うようなProcessor(Source+Sink)も導入可能なようです。

Binderには、Apache KafkaとRabbitMQを選択できます。

また、Spring側でいくつかすでに実装済みのSource、Sinkもあります。

Spring Cloud Stream App Starters

今回は、

  • Source … twitterstream(Spring Cloud Stream App Starters)
  • Binder … Apache Kafka
  • Sink … 自作(標準出力)

という形で組んでみます。twitterstreamを使っていることから、SourceでTwitterからStreamingで受け取った
ツイートを、Sink側で標準出力で出力するという内容でいってみます。

準備

今回は、Mavenのマルチプロジェクトとして作ってみることにします。

親pomの依存関係とプラグインの設定。

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>

        <spring.boot.version>1.4.2.RELEASE</spring.boot.version>
    </properties>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-stream-dependencies</artifactId>
                <version>Brooklyn.SR3</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
            <dependency>
                <groupId>org.springframework.cloud.stream.app</groupId>
                <artifactId>twitter-app-dependencies</artifactId>
                <version>1.1.1.RELEASE</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>${spring.boot.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <pluginManagement>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                    <version>${spring.boot.version}</version>
                </plugin>
            </plugins>
        </pluginManagement>
    </build>

Spring Cloud Streamと、Spring Cloud Stream App Startersのtwitterstreamを追加。Spring Cloud Streamは
Spring Bootの1.4系を使っているみたいなので、Spring Bootはこちらに合わせました。

Apache Kafka

Apache Kafkaについては、単に起動しておけばOKですが、今回はリモート接続を行うようにしているので、
listenersだけリモート接続可能にしておきました。Apache ZooKeeperも、同じホストで動かすことにします。

今回、Apache Kafkaが動作しているホストのIPアドレスは、172.17.0.2とします。

Topicの作成は不要です。以後、Apache Kafka(およびApache ZooKeeper)は起動済みとします。

Source(twitterstream)を作る

それでは、Sourceを作成してみましょう。

親pomを継承している前提で、アプリケーションの作成に必要な依存関係とプラグインの設定は以下のとおり。

    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud.stream.app</groupId>
            <artifactId>spring-cloud-starter-stream-source-twitterstream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <executions>
                    <execution>
                        <goals>
                            <goal>repackage</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

Spring Cloud Streamが提供するtwitterstreamと、BinderにはApache Kafkaを使うので以下が必要になります。

        <dependency>
            <groupId>org.springframework.cloud.stream.app</groupId>
            <artifactId>spring-cloud-starter-stream-source-twitterstream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
        </dependency>

Spring Bootについては、明示的に依存関係に含める必要がありました。

で、アプリケーションの実装はこんな感じに。
src/main/java/org/littlewings/spring/stream/TweetSourceApp.java

package org.littlewings.spring.stream;

import java.net.URI;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.app.twitterstream.source.TwitterstreamSourceConfiguration;
import org.springframework.context.annotation.Import;

@SpringBootApplication
@Import(TwitterstreamSourceConfiguration.class)
public class TweetSourceApp {
    public static void main(String... args) {
        SpringApplication.run(TweetSourceApp.class, args);
    }
}

これだけです。tweetstreamを使うには、TwitterstreamSourceConfigurationを@ImportしておけばOKな模様。

@Import(TwitterstreamSourceConfiguration.class)

あとは、設定です。

今回は、このような感じに。
src/main/resources/application.properties

spring.cloud.stream.bindings.output.destination=tweet
spring.cloud.stream.bindings.output.contentType=application/json

spring.cloud.stream.kafka.binder.brokers=172.17.0.2
spring.cloud.stream.kafka.binder.defaultBrokerPort=9092
spring.cloud.stream.kafka.binder.zkNodes=172.17.0.2
spring.cloud.stream.kafka.binder.defaultZkPort=2181

twitter.credentials.access-token=<your-access-token>
twitter.credentials.access-token-secret=<your-access-token-secret>
twitter.credentials.consumer-key=<your-consumer-key>
twitter.credentials.consumer-secret=<your-consumer-secret>

以下の設定は、Apache Kafka上のTopicの名前(今回は「tweet」としました)と、Apache Kafkaに登録する時のデータフォーマットです。
今回は、JSONで保存するようにしました。指定しなかった場合は、今回の場合StringがSinkに飛んできました…。

spring.cloud.stream.bindings.output.destination=tweet
spring.cloud.stream.bindings.output.contentType=application/json

次の部分は、Apache KafkaとApache ZooKeeperへの接続設定です。

spring.cloud.stream.kafka.binder.brokers=172.17.0.2
spring.cloud.stream.kafka.binder.defaultBrokerPort=9092
spring.cloud.stream.kafka.binder.zkNodes=172.17.0.2
spring.cloud.stream.kafka.binder.defaultZkPort=2181

今回はIPアドレスとデフォルトポートという形で記述しましたが、brokersやzkNodesに「host:port」の形式で書くのもOKのようです。

ドキュメント上の該当個所は、こちら。
Apache Kafka Binder / Configuration Options

また、TwitterのStreaming APIを使用するためのOAuth系の情報が必要です。

ドキュメントは、こちらを参考に。
Twitter Stream Source

あとはパッケージングして起動。組み込みTomcatが入ったSpring Bootアプリケーションが起動します。
※「tweet-source」が今回作成したアプリケーションの名前です

$ mvn package
$ java -jar tweet-source/target/tweet-source-0.0.1-SNAPSHOT.jar

TwitterおよびApache ZooKeeperへ接続できれば、起動に成功します。起動してこういうログが出ればOKでしょう。

2017-04-02 22:03:02.038  INFO 64650 --- [itterSource-1-1] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 0.9.0.1
2017-04-02 22:03:02.038  INFO 64650 --- [itterSource-1-1] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : 23c69d62a0cabf06

注意)
このアプリケーションは、起動している間にツイートを取得してApache Kafkaに投入し続けます。不要な時は停止しておきましょう。

Sink(標準出力)を作る

続いて、Sinkを作成します。

Maven依存関係と、プラグインの設定はこんな感じで。
※親pomを継承している前提です

    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <executions>
                    <execution>
                        <goals>
                            <goal>repackage</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

アプリケーションは、こんな感じで作成。
src/main/java/org/littlewings/spring/stream/ConsoleSinkApp.java

package org.littlewings.spring.stream;

import java.util.Map;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;

@SpringBootApplication
@EnableBinding(Sink.class)
public class ConsoleSinkApp {
    public static void main(String... args) {
        SpringApplication.run(ConsoleSinkApp.class, args);
    }

    @StreamListener(Sink.INPUT)
    public void println(Map<String, Object> tweet) {
        System.out.println("====================");
        System.out.println("  id = " + tweet.get("id"));
        System.out.println("  created_at = " + tweet.get("created_at"));
        System.out.println("  screen_name = @" + ((Map<String, Object>) tweet.get("user")).get("screen_name"));
        System.out.println("  name = " + ((Map<String, Object>) tweet.get("user")).get("name"));
        System.out.println("  text = " + tweet.get("text"));
    }
}

クラスの宣言には、@SpringBootApplication以外に@EnableBinding(Sink.class)を付与。

@SpringBootApplication
@EnableBinding(Sink.class)
public class ConsoleSinkApp {

Apache Kafka側にJSONで入っているメッセージは、@StreamListener(Sink.INPUT)を付与したメソッドでMapとして受け取れます。
JSONで受けているデータのマッピング先をMapにはしょっただけですが…。

    @StreamListener(Sink.INPUT)
    public void println(Map<String, Object> tweet) {

出力内容としては、ツイートのidや日時、名前、ツイート内容を出力しておしまいです。

で、このアプリケーションに対する設定は、こちら。
src/main/resources/application.properties

spring.cloud.stream.bindings.input.destination=tweet
spring.cloud.stream.bindings.input.group=group1

spring.cloud.stream.kafka.binder.brokers=172.17.0.2
spring.cloud.stream.kafka.binder.defaultBrokerPort=9092
spring.cloud.stream.kafka.binder.zkNodes=172.17.0.2
spring.cloud.stream.kafka.binder.defaultZkPort=2181

server.port=9080

「spring.cloud.stream.bindings.〜」の部分が「input」になります。Sourceの時は、「output」でした。
「spring.cloud.stream.bindings.input.destination」はApache KafkaのTopicを指すようになるので、Sourceと同じ値を
指定します。今回の場合は「tweet」になります。

「spring.cloud.stream.bindings.input.group」については、複数のSinkを使用する場合を考えると指定しておいた方が
無難なようです。

参考)
Spring Cloud Streamでマイクロサービス間メッセージング

Apache KafkaおよびApache ZooKeeperに関する設定は、Sourceの時と同じです。

あと、本筋とは直接関係はありませんが、今回SourceとSinkを同じホスト上で動かしているので、Tomcatのポートはずらしています。

server.port=9080

今回は、Sourceが8080、Sinkが9080を使用しています。

で、パッケージングして起動。今回のアプリケーションの名前は、「console-sink」にしています。

$ mvn package
$ java -jar console-sink/target/console-sink-0.0.1-SNAPSHOT.jar

起動に成功すれば、Sourceが取り込んだツイートがコンソールに出力されます。

少しオマケ

Spring Cloud Streamが提供するtweetsourceは、Streaming APIの結果をそのまま受け取るので、特に絞ったりすることができません。

filterを使えばいいのでしょうけれど
Consuming streaming data — Twitter Developers

実装としてはfirehoseかsampleを使うようになっています。
https://github.com/spring-cloud/spring-cloud-stream-app-starters/blob/v1.0.4.RELEASE/twitter/spring-cloud-starter-stream-source-twitterstream/src/main/java/org/springframework/cloud/stream/app/twitterstream/source/TwitterStreamMessageProducer.java

全パラメーターを指定できるわけでもないようですし(TODOになっています)。

というわけで、ちょっと自分で書いてみます。
src/main/java/org/littlewings/spring/stream/TweetSourceApp.java

package org.littlewings.spring.stream;

import java.net.URI;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.app.twitterstream.source.AbstractTwitterInboundChannelAdapter;
import org.springframework.cloud.stream.app.twitterstream.source.TwitterstreamSourceConfiguration;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Import;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.social.support.URIBuilder;
import org.springframework.social.twitter.api.impl.TwitterTemplate;

@SpringBootApplication
@Import(TwitterstreamSourceConfiguration.class)
public class TweetSourceApp {
    public static void main(String... args) {
        SpringApplication.run(TweetSourceApp.class, args);
    }

    @Autowired
    Source source;

    @Bean
    public MessageProducer twitterStream(TwitterTemplate twitterTemplate) {
        MyTwitterStreamMessageProducer messageProducer =
                new MyTwitterStreamMessageProducer(twitterTemplate);
        messageProducer.setOutputChannel(source.output());
        return messageProducer;
    }

    static class MyTwitterStreamMessageProducer extends AbstractTwitterInboundChannelAdapter {

        private static final String API_URL_BASE = "https://stream.twitter.com/1.1/statuses/";

        MyTwitterStreamMessageProducer(TwitterTemplate twitterTemplate) {
            super(twitterTemplate);
        }

        @Override
        protected URI buildUri() {
            String path = "filter.json";
            URIBuilder b = URIBuilder.fromUri(API_URL_BASE + path);
            b.queryParam("track", "#tweettest");

            return b.build();
        }

        @Override
        protected void doSendLine(String line) {
            if (line.startsWith("{\"limit")) {
                logger.info("Twitter stream is being track limited.");
            } else if (line.startsWith("{\"delete")) {
                // discard
            } else if (line.startsWith("{\"warning")) {
                // discard
            } else {
                sendMessage(MessageBuilder.withPayload(line).build());
            }
        }
    }
}

元ネタは、こちらですよ。
https://github.com/spring-cloud/spring-cloud-stream-app-starters/blob/v1.0.4.RELEASE/twitter/spring-cloud-starter-stream-source-twitterstream/src/main/java/org/springframework/cloud/stream/app/twitterstream/source/TwitterStreamMessageProducer.java

今回は、「#tweettest」をフィルタの対象にしています。

        @Override
        protected URI buildUri() {
            String path = "filter.json";
            URIBuilder b = URIBuilder.fromUri(API_URL_BASE + path);
            b.queryParam("track", "#tweettest");

            return b.build();
        }

パッケージングして、確認。

$ java -jar tweet-source/target/tweet-source-0.0.1-SNAPSHOT.jar

結果(Sink側)。Source起動後に、「#tweettest」タグをつけて自分でツイートしました。それが拾われています。

====================
  id = 848528590715068416
  created_at = Sun Apr 02 13:32:35 +0000 2017
  screen_name = @kazuhira_r
  name = かずひら
  text = てすと #tweettest

出力されました、と。

まとめ

簡単にですが、Spring Cloud Streamを試してみました。

取っ掛かりで最初はオロオロしましたが、ある程度書いてみたりドキュメント読んだり、GitHubリポジトリ見てるとなんとなく
慣れそうな感じがしてきました。

今後、もうちょっといろいろ試していきましょう。

参考)
GitHub - Pivotal-Japan/spring-cloud-stream-tutorial: Spring Cloud Stream Tutorial