最近興味があるプロダクトのひとつに、Spring Cloud Streamがあります。眺めていてだいぶ時間が経っているのですが、
そろそろ試してみようかなということで。
Spring Cloud Streamとは?
マイクロサービス間のメッセージングを実現するプロダクトです。
Spring Cloud Streamでマイクロサービス間メッセージング
Event Driven Microservices with Spring Cloud Stream #jjug_ccc #ccc_ab3
ドキュメントは、こちら。
Spring Cloud Stream Reference Guide
あるアプリケーション(Source)からメッセージをBinder(キュー)を介して、別のアプリケーション(Sink)側で
受け取る仕組みをSpringを使ったアプリケーションとして組み上げます。
Spring Cloud Streamの用語でメッセージの送り元をSource、受け取り側をSinkと呼ぶようですが、間に変換処理などを
行うようなProcessor(Source+Sink)も導入可能なようです。
Binderには、Apache KafkaとRabbitMQを選択できます。
また、Spring側でいくつかすでに実装済みのSource、Sinkもあります。
Spring Cloud Stream App Starters
今回は、
- Source … twitterstream(Spring Cloud Stream App Starters)
- Binder … Apache Kafka
- Sink … 自作(標準出力)
という形で組んでみます。twitterstreamを使っていることから、SourceでTwitterからStreamingで受け取った
ツイートを、Sink側で標準出力で出力するという内容でいってみます。
準備
今回は、Mavenのマルチプロジェクトとして作ってみることにします。
親pomの依存関係とプラグインの設定。
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <spring.boot.version>1.4.2.RELEASE</spring.boot.version> </properties> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-dependencies</artifactId> <version>Brooklyn.SR3</version> <type>pom</type> <scope>import</scope> </dependency> <dependency> <groupId>org.springframework.cloud.stream.app</groupId> <artifactId>twitter-app-dependencies</artifactId> <version>1.1.1.RELEASE</version> <type>pom</type> <scope>import</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-dependencies</artifactId> <version>${spring.boot.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <build> <pluginManagement> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <version>${spring.boot.version}</version> </plugin> </plugins> </pluginManagement> </build>
Spring Cloud Streamと、Spring Cloud Stream App Startersのtwitterstreamを追加。Spring Cloud Streamは
Spring Bootの1.4系を使っているみたいなので、Spring Bootはこちらに合わせました。
Apache Kafka
Apache Kafkaについては、単に起動しておけばOKですが、今回はリモート接続を行うようにしているので、
listenersだけリモート接続可能にしておきました。Apache ZooKeeperも、同じホストで動かすことにします。
今回、Apache Kafkaが動作しているホストのIPアドレスは、172.17.0.2とします。
Topicの作成は不要です。以後、Apache Kafka(およびApache ZooKeeper)は起動済みとします。
Source(twitterstream)を作る
それでは、Sourceを作成してみましょう。
親pomを継承している前提で、アプリケーションの作成に必要な依存関係とプラグインの設定は以下のとおり。
<dependencies> <dependency> <groupId>org.springframework.cloud.stream.app</groupId> <artifactId>spring-cloud-starter-stream-source-twitterstream</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <executions> <execution> <goals> <goal>repackage</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
Spring Cloud Streamが提供するtwitterstreamと、BinderにはApache Kafkaを使うので以下が必要になります。
<dependency> <groupId>org.springframework.cloud.stream.app</groupId> <artifactId>spring-cloud-starter-stream-source-twitterstream</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-kafka</artifactId> </dependency>
Spring Bootについては、明示的に依存関係に含める必要がありました。
で、アプリケーションの実装はこんな感じに。
src/main/java/org/littlewings/spring/stream/TweetSourceApp.java
package org.littlewings.spring.stream; import java.net.URI; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.stream.app.twitterstream.source.TwitterstreamSourceConfiguration; import org.springframework.context.annotation.Import; @SpringBootApplication @Import(TwitterstreamSourceConfiguration.class) public class TweetSourceApp { public static void main(String... args) { SpringApplication.run(TweetSourceApp.class, args); } }
これだけです。tweetstreamを使うには、TwitterstreamSourceConfigurationを@ImportしておけばOKな模様。
@Import(TwitterstreamSourceConfiguration.class)
あとは、設定です。
今回は、このような感じに。
src/main/resources/application.properties
spring.cloud.stream.bindings.output.destination=tweet spring.cloud.stream.bindings.output.contentType=application/json spring.cloud.stream.kafka.binder.brokers=172.17.0.2 spring.cloud.stream.kafka.binder.defaultBrokerPort=9092 spring.cloud.stream.kafka.binder.zkNodes=172.17.0.2 spring.cloud.stream.kafka.binder.defaultZkPort=2181 twitter.credentials.access-token=<your-access-token> twitter.credentials.access-token-secret=<your-access-token-secret> twitter.credentials.consumer-key=<your-consumer-key> twitter.credentials.consumer-secret=<your-consumer-secret>
以下の設定は、Apache Kafka上のTopicの名前(今回は「tweet」としました)と、Apache Kafkaに登録する時のデータフォーマットです。
今回は、JSONで保存するようにしました。指定しなかった場合は、今回の場合StringがSinkに飛んできました…。
spring.cloud.stream.bindings.output.destination=tweet spring.cloud.stream.bindings.output.contentType=application/json
次の部分は、Apache KafkaとApache ZooKeeperへの接続設定です。
spring.cloud.stream.kafka.binder.brokers=172.17.0.2 spring.cloud.stream.kafka.binder.defaultBrokerPort=9092 spring.cloud.stream.kafka.binder.zkNodes=172.17.0.2 spring.cloud.stream.kafka.binder.defaultZkPort=2181
今回はIPアドレスとデフォルトポートという形で記述しましたが、brokersやzkNodesに「host:port」の形式で書くのもOKのようです。
ドキュメント上の該当個所は、こちら。
Apache Kafka Binder / Configuration Options
また、TwitterのStreaming APIを使用するためのOAuth系の情報が必要です。
ドキュメントは、こちらを参考に。
Twitter Stream Source
あとはパッケージングして起動。組み込みTomcatが入ったSpring Bootアプリケーションが起動します。
※「tweet-source」が今回作成したアプリケーションの名前です
$ mvn package $ java -jar tweet-source/target/tweet-source-0.0.1-SNAPSHOT.jar
TwitterおよびApache ZooKeeperへ接続できれば、起動に成功します。起動してこういうログが出ればOKでしょう。
2017-04-02 22:03:02.038 INFO 64650 --- [itterSource-1-1] o.a.kafka.common.utils.AppInfoParser : Kafka version : 0.9.0.1 2017-04-02 22:03:02.038 INFO 64650 --- [itterSource-1-1] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : 23c69d62a0cabf06
注意)
このアプリケーションは、起動している間にツイートを取得してApache Kafkaに投入し続けます。不要な時は停止しておきましょう。
Sink(標準出力)を作る
続いて、Sinkを作成します。
Maven依存関係と、プラグインの設定はこんな感じで。
※親pomを継承している前提です
<dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <executions> <execution> <goals> <goal>repackage</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
アプリケーションは、こんな感じで作成。
src/main/java/org/littlewings/spring/stream/ConsoleSinkApp.java
package org.littlewings.spring.stream; import java.util.Map; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Sink; @SpringBootApplication @EnableBinding(Sink.class) public class ConsoleSinkApp { public static void main(String... args) { SpringApplication.run(ConsoleSinkApp.class, args); } @StreamListener(Sink.INPUT) public void println(Map<String, Object> tweet) { System.out.println("===================="); System.out.println(" id = " + tweet.get("id")); System.out.println(" created_at = " + tweet.get("created_at")); System.out.println(" screen_name = @" + ((Map<String, Object>) tweet.get("user")).get("screen_name")); System.out.println(" name = " + ((Map<String, Object>) tweet.get("user")).get("name")); System.out.println(" text = " + tweet.get("text")); } }
クラスの宣言には、@SpringBootApplication以外に@EnableBinding(Sink.class)を付与。
@SpringBootApplication @EnableBinding(Sink.class) public class ConsoleSinkApp {
Apache Kafka側にJSONで入っているメッセージは、@StreamListener(Sink.INPUT)を付与したメソッドでMapとして受け取れます。
JSONで受けているデータのマッピング先をMapにはしょっただけですが…。
@StreamListener(Sink.INPUT) public void println(Map<String, Object> tweet) {
出力内容としては、ツイートのidや日時、名前、ツイート内容を出力しておしまいです。
で、このアプリケーションに対する設定は、こちら。
src/main/resources/application.properties
spring.cloud.stream.bindings.input.destination=tweet spring.cloud.stream.bindings.input.group=group1 spring.cloud.stream.kafka.binder.brokers=172.17.0.2 spring.cloud.stream.kafka.binder.defaultBrokerPort=9092 spring.cloud.stream.kafka.binder.zkNodes=172.17.0.2 spring.cloud.stream.kafka.binder.defaultZkPort=2181 server.port=9080
「spring.cloud.stream.bindings.〜」の部分が「input」になります。Sourceの時は、「output」でした。
「spring.cloud.stream.bindings.input.destination」はApache KafkaのTopicを指すようになるので、Sourceと同じ値を
指定します。今回の場合は「tweet」になります。
「spring.cloud.stream.bindings.input.group」については、複数のSinkを使用する場合を考えると指定しておいた方が
無難なようです。
参考)
Spring Cloud Streamでマイクロサービス間メッセージング
Apache KafkaおよびApache ZooKeeperに関する設定は、Sourceの時と同じです。
あと、本筋とは直接関係はありませんが、今回SourceとSinkを同じホスト上で動かしているので、Tomcatのポートはずらしています。
server.port=9080
今回は、Sourceが8080、Sinkが9080を使用しています。
で、パッケージングして起動。今回のアプリケーションの名前は、「console-sink」にしています。
$ mvn package
$ java -jar console-sink/target/console-sink-0.0.1-SNAPSHOT.jar
起動に成功すれば、Sourceが取り込んだツイートがコンソールに出力されます。
少しオマケ
Spring Cloud Streamが提供するtweetsourceは、Streaming APIの結果をそのまま受け取るので、特に絞ったりすることができません。
filterを使えばいいのでしょうけれど
Consuming streaming data — Twitter Developers
実装としてはfirehoseかsampleを使うようになっています。
https://github.com/spring-cloud/spring-cloud-stream-app-starters/blob/v1.0.4.RELEASE/twitter/spring-cloud-starter-stream-source-twitterstream/src/main/java/org/springframework/cloud/stream/app/twitterstream/source/TwitterStreamMessageProducer.java
全パラメーターを指定できるわけでもないようですし(TODOになっています)。
というわけで、ちょっと自分で書いてみます。
src/main/java/org/littlewings/spring/stream/TweetSourceApp.java
package org.littlewings.spring.stream; import java.net.URI; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.stream.app.twitterstream.source.AbstractTwitterInboundChannelAdapter; import org.springframework.cloud.stream.app.twitterstream.source.TwitterstreamSourceConfiguration; import org.springframework.cloud.stream.messaging.Source; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Import; import org.springframework.integration.core.MessageProducer; import org.springframework.integration.support.MessageBuilder; import org.springframework.social.support.URIBuilder; import org.springframework.social.twitter.api.impl.TwitterTemplate; @SpringBootApplication @Import(TwitterstreamSourceConfiguration.class) public class TweetSourceApp { public static void main(String... args) { SpringApplication.run(TweetSourceApp.class, args); } @Autowired Source source; @Bean public MessageProducer twitterStream(TwitterTemplate twitterTemplate) { MyTwitterStreamMessageProducer messageProducer = new MyTwitterStreamMessageProducer(twitterTemplate); messageProducer.setOutputChannel(source.output()); return messageProducer; } static class MyTwitterStreamMessageProducer extends AbstractTwitterInboundChannelAdapter { private static final String API_URL_BASE = "https://stream.twitter.com/1.1/statuses/"; MyTwitterStreamMessageProducer(TwitterTemplate twitterTemplate) { super(twitterTemplate); } @Override protected URI buildUri() { String path = "filter.json"; URIBuilder b = URIBuilder.fromUri(API_URL_BASE + path); b.queryParam("track", "#tweettest"); return b.build(); } @Override protected void doSendLine(String line) { if (line.startsWith("{\"limit")) { logger.info("Twitter stream is being track limited."); } else if (line.startsWith("{\"delete")) { // discard } else if (line.startsWith("{\"warning")) { // discard } else { sendMessage(MessageBuilder.withPayload(line).build()); } } } }
今回は、「#tweettest」をフィルタの対象にしています。
@Override protected URI buildUri() { String path = "filter.json"; URIBuilder b = URIBuilder.fromUri(API_URL_BASE + path); b.queryParam("track", "#tweettest"); return b.build(); }
パッケージングして、確認。
$ java -jar tweet-source/target/tweet-source-0.0.1-SNAPSHOT.jar
結果(Sink側)。Source起動後に、「#tweettest」タグをつけて自分でツイートしました。それが拾われています。
==================== id = 848528590715068416 created_at = Sun Apr 02 13:32:35 +0000 2017 screen_name = @kazuhira_r name = かずひら text = てすと #tweettest
出力されました、と。
まとめ
簡単にですが、Spring Cloud Streamを試してみました。
取っ掛かりで最初はオロオロしましたが、ある程度書いてみたりドキュメント読んだり、GitHubリポジトリ見てるとなんとなく
慣れそうな感じがしてきました。
今後、もうちょっといろいろ試していきましょう。
参考)
GitHub - Pivotal-Japan/spring-cloud-stream-tutorial: Spring Cloud Stream Tutorial