Spring Cloud Streamの設定のうち、以下の部分ですが
spring.cloud.stream.bindings.input.group= spring.cloud.stream.bindings.<channelName>.group=
指定した方がいいよーという話は見るものの、効果のほどを自分で確認したことがなかったので、
気になって見てみることにしました。
Consumer Groupって?
Spring Cloud Stream上でのドキュメントは、こちら。
Apache KafkaのConsumer Groupsにインスパイアされたもののようで、同じgroup idを持ったConsumer群に対して、
その中のひとつのConsumerにメッセージを配信してくれる仕組みのようです。
Apache Kafkaでのドキュメントだと、こちら。
試してみる
それでは、簡単なアプリケーションを書いて試してみるとしましょう。
以下の条件で、アプリケーションを作ります。
- Sourceは簡単なSpring MVCのREST API
- SinkにConsumer Groupを2つ作り、それぞれ2本にする(2 × 2で計4インスタンス)
- Message Brokerは、Apache Kafka
- 2つのSinkは、Apache Kafkaの同じTopicを使う
今回は簡単に、Mavenのマルチプロジェクト内でSourceとSinkを作成します。
親pomには、こんなことを書いておきます。
<dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-dependencies</artifactId> <version>Chelsea.SR1</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <build> <pluginManagement> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <version>1.5.2.RELEASE</version> </plugin> </plugins> </pluginManagement> </build>
サブモジュール側は、全部次の設定にしておきます。最低限、「spring-cloud-starter-stream-kafka」が使えれば
充分です。
<dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-kafka</artifactId> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <executions> <execution> <goals> <goal>repackage</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
Sourceを作る
では、最初にSourceを作成します。「api-source」というモジュールにしましょう。
こんな感じで、簡単に。
api-source/src/main/java/org/littlewings/spring/stream/ApiSourceApp.java
package org.littlewings.spring.stream; import java.util.Map; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Source; import org.springframework.http.HttpStatus; import org.springframework.messaging.support.MessageBuilder; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.ResponseStatus; import org.springframework.web.bind.annotation.RestController; @SpringBootApplication @EnableBinding(Source.class) @RestController public class ApiSourceApp { public static void main(String... args) { SpringApplication.run(ApiSourceApp.class, args); } Source source; public ApiSourceApp(Source source) { this.source = source; } @PostMapping("register") @ResponseStatus(HttpStatus.CREATED) public Map<String, Object> register(@RequestBody Map<String, Object> message) { source.output().send(MessageBuilder.withPayload(message).build()); return message; } }
Source側の設定は、特にConsumer Groupを意識しません。
api-source/src/main/resources/application.properties
spring.cloud.stream.bindings.output.destination=message spring.cloud.stream.bindings.output.contentType=application/json spring.cloud.stream.kafka.binder.brokers=172.17.0.2 spring.cloud.stream.kafka.binder.defaultBrokerPort=9092 spring.cloud.stream.kafka.binder.zkNodes=172.17.0.2 spring.cloud.stream.kafka.binder.defaultZkPort=2181
Sinkを作る
続いて、Sinkを作成します。Sinkは、受け取ったメッセージを簡単に装飾してコンソール出力するものにしましょう。
こういうJSONを受け取ったら
{"message":"Hello World"}
それぞれ次のように出力するSinkを作成します。
received message = ★Hello World★ received message = [Hello World]
ただ、最初はConsumer Groupは指定せずにいってみたいと思います。モジュールは、「star-group-console-sink」と「brackets-group-console-sink」とします。
star-group側のソースコード。
star-group-console-sink/src/main/java/org/littlewings/spring/stream/StarGroupSinkApp.java
package org.littlewings.spring.stream; import java.util.Map; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Sink; @SpringBootApplication @EnableBinding(Sink.class) public class StarGroupSinkApp { public static void main(String... args) { SpringApplication.run(StarGroupSinkApp.class, args); } @StreamListener(Sink.INPUT) public void println(Map<String, Object> message) { System.out.printf("received message = ★%s★%n", message.get("message")); } }
設定。
star-group-console-sink/src/main/resources/application.properties
spring.cloud.stream.bindings.input.destination=message spring.cloud.stream.kafka.binder.brokers=172.17.0.2 spring.cloud.stream.kafka.binder.defaultBrokerPort=9092 spring.cloud.stream.kafka.binder.zkNodes=172.17.0.2 spring.cloud.stream.kafka.binder.defaultZkPort=2181
brackets-group側も、ほとんど同じ実装です。まあ、メッセージ出力の装飾文字が違うくらいですね。
brackets-group-console-sink/src/main/java/org/littlewings/spring/stream/BracketsGroupSinkApp.java
package org.littlewings.spring.stream; import java.util.Map; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Sink; @SpringBootApplication @EnableBinding(Sink.class) public class BracketsGroupSinkApp { public static void main(String... args) { SpringApplication.run(BracketsGroupSinkApp.class, args); } @StreamListener(Sink.INPUT) public void println(Map<String, Object> message) { System.out.printf("received message = [%s]%n", message.get("message")); } }
設定。
brackets-group-console-sink/src/main/resources/application.properties
spring.cloud.stream.bindings.input.destination=message spring.cloud.stream.kafka.binder.brokers=172.17.0.2 spring.cloud.stream.kafka.binder.defaultBrokerPort=9092 spring.cloud.stream.kafka.binder.zkNodes=172.17.0.2 spring.cloud.stream.kafka.binder.defaultZkPort=2181
ここまでで、とりあえずアプリケーションの準備ができました。
Consumer Groupなしで実行
それでは、まずはConsumer Groupを指定しないまま確認してみます。
パッケージング。
$ mvn package
Sourceをひとつ、Sinkを2つずつ起動します。なお、Apache Kafkaは起動済みとします。
## Source $ java -jar api-source/target/api-source-0.0.1-SNAPSHOT.jar ## Sink star-group # instance-1 $ java -Dserver.port=28080 -jar star-group-console-sink/target/star-group-console-sink-0.0.1-SNAPSHOT.jar # instance-2 $ java -Dserver.port=28081 -jar star-group-console-sink/target/star-group-console-sink-0.0.1-SNAPSHOT.jar ## Sink brackets-group # instance-1 $ java -Dserver.port=38080 -jar brackets-group-console-sink/target/brackets-group-console-sink-0.0.1-SNAPSHOT.jar # instance-2 $ java -Dserver.port=38081 -jar brackets-group-console-sink/target/brackets-group-console-sink-0.0.1-SNAPSHOT.jar
で、よくログを見ているとこんなのが出力されています。
※2つにはしょっています
2017-04-22 18:01:24,660 INFO -kafka-consumer-1 o.a.k.c.c.i.ConsumerCoordinator:225 - Setting newly assigned partitions [message-0] for group anonymous.0e1dbc50-7800-4989-9235-ec0926d533d9 2017-04-22 18:04:21,166 INFO -kafka-consumer-1 o.a.k.c.c.i.ConsumerCoordinator:225 - Setting newly assigned partitions [message-0] for group anonymous.bd16403f-559e-4e3d-93a1-6990cc4b7db0
Consumer Groupを指定していないので、anonymousなグループに属しているって感じになるのでしょうね。
なお、Apache Kafka側のログにもこんな感じのログが出力されています。
[2017-04-22 09:04:21,151] INFO [GroupCoordinator 0]: Preparing to restabilize group anonymous.bd16403f-559e-4e3d-93a1-6990cc4b7db0 with old generation 0 (kafka.coordinator.GroupCoordinator) [2017-04-22 09:04:21,152] INFO [GroupCoordinator 0]: Stabilized group anonymous.bd16403f-559e-4e3d-93a1-6990cc4b7db0 generation 1 (kafka.coordinator.GroupCoordinator) [2017-04-22 09:04:21,160] INFO [GroupCoordinator 0]: Assignment received from leader for group anonymous.bd16403f-559e-4e3d-93a1-6990cc4b7db0 for generation 1 (kafka.coordinator.GroupCoordinator) [2017-04-22 09:04:21,151] INFO [GroupCoordinator 0]: Preparing to restabilize group anonymous.bd16403f-559e-4e3d-93a1-6990cc4b7db0 with old generation 0 (kafka.coordinator.GroupCoordinator) [2017-04-22 09:04:21,152] INFO [GroupCoordinator 0]: Stabilized group anonymous.bd16403f-559e-4e3d-93a1-6990cc4b7db0 generation 1 (kafka.coordinator.GroupCoordinator) [2017-04-22 09:04:21,160] INFO [GroupCoordinator 0]: Assignment received from leader for group anonymous.bd16403f-559e-4e3d-93a1-6990cc4b7db0 for generation 1 (kafka.coordinator.GroupCoordinator)
では、メッセージをapi-sourceに送ってみます。わかりやすいようにメッセージに時間を入れて、6回送ってみましょう。
※以下を6回繰り返す
$ D=`date '+%Y-%m-%d %H:%M:%S'` && curl -i -XPOST -H 'Content-Type: application/json' http://localhost:8080/register -d '{"message": "Hello World - '"$D"'"}'
結果。
## Sink star-group # instance-1 received message = ★Hello World - 2017-04-22 18:06:38★ received message = ★Hello World - 2017-04-22 18:06:42★ received message = ★Hello World - 2017-04-22 18:06:44★ received message = ★Hello World - 2017-04-22 18:06:46★ received message = ★Hello World - 2017-04-22 18:06:48★ received message = ★Hello World - 2017-04-22 18:06:50★ # instance-2 received message = ★Hello World - 2017-04-22 18:06:38★ received message = ★Hello World - 2017-04-22 18:06:42★ received message = ★Hello World - 2017-04-22 18:06:44★ received message = ★Hello World - 2017-04-22 18:06:46★ received message = ★Hello World - 2017-04-22 18:06:48★ received message = ★Hello World - 2017-04-22 18:06:50★ ## Sink brackets-group # instance-1 received message = [Hello World - 2017-04-22 18:06:38] received message = [Hello World - 2017-04-22 18:06:42] received message = [Hello World - 2017-04-22 18:06:44] received message = [Hello World - 2017-04-22 18:06:46] received message = [Hello World - 2017-04-22 18:06:48] received message = [Hello World - 2017-04-22 18:06:50] # instance-2 received message = [Hello World - 2017-04-22 18:06:38] received message = [Hello World - 2017-04-22 18:06:42] received message = [Hello World - 2017-04-22 18:06:44] received message = [Hello World - 2017-04-22 18:06:46] received message = [Hello World - 2017-04-22 18:06:48] received message = [Hello World - 2017-04-22 18:06:50]
確かに、同じメッセージが全インスタンスに行きますね。現時点だと、挙動は違うもののグループの指定はしていませんからね。
Consumer Groupを指定する
では、Consumer Groupを指定してみましょう。
「star-group」のapplication.propertiesには、こちらを
spring.cloud.stream.bindings.input.group=star-group
「brackets-group」のapplication.propertiesには、こちらをそれぞれ追加します。
spring.cloud.stream.bindings.input.group=brackets-group
で、パッケージングして再度確認。Apache Kafkaもデータを削除してやり直します。で、起動。
今度は、起動時にこんな感じにGroupに関する情報が出力されます。
2017-04-22 18:15:43,995 INFO -kafka-consumer-1 o.a.k.c.c.i.ConsumerCoordinator:225 - Setting newly assigned partitions [message-0] for group star-group 2017-04-22 18:15:48,042 INFO -kafka-consumer-1 o.a.k.c.c.i.ConsumerCoordinator:225 - Setting newly assigned partitions [message-0] for group brackets-group
Apache Kafka側でも、同様に。
[2017-04-22 09:15:43,765] INFO [GroupCoordinator 0]: Preparing to restabilize group star-group with old generation 1 (kafka.coordinator.GroupCoordinator) [2017-04-22 09:15:43,814] INFO [GroupCoordinator 0]: Stabilized group star-group generation 2 (kafka.coordinator.GroupCoordinator) [2017-04-22 09:15:43,842] INFO [GroupCoordinator 0]: Assignment received from leader for group star-group for generation 2 (kafka.coordinator.GroupCoordinator) [2017-04-22 09:15:46,255] INFO [GroupCoordinator 0]: Preparing to restabilize group brackets-group with old generation 1 (kafka.coordinator.GroupCoordinator) [2017-04-22 09:15:48,024] INFO [GroupCoordinator 0]: Stabilized group brackets-group generation 2 (kafka.coordinator.GroupCoordinator) [2017-04-22 09:15:48,032] INFO [GroupCoordinator 0]: Assignment received from leader for group brackets-group for generation 2 (kafka.coordinator.GroupCoordinator)
Apache KafkaでのConsumer Groupになってそうですね。
で、また6回データを投げ込んでみた時の結果は、こちら。
結果。
## Sink star-group # instance-1 received message = ★Hello World - 2017-04-22 18:18:35★ received message = ★Hello World - 2017-04-22 18:18:37★ received message = ★Hello World - 2017-04-22 18:18:39★ received message = ★Hello World - 2017-04-22 18:18:42★ received message = ★Hello World - 2017-04-22 18:18:44★ received message = ★Hello World - 2017-04-22 18:18:46★ # instance-2 (なし) ## Sink brackets-group # instance-1 なし # instance-2 received message = [Hello World - 2017-04-22 18:18:35] received message = [Hello World - 2017-04-22 18:18:37] received message = [Hello World - 2017-04-22 18:18:39] received message = [Hello World - 2017-04-22 18:18:42] received message = [Hello World - 2017-04-22 18:18:44] received message = [Hello World - 2017-04-22 18:18:46]
Consumer Group内の、ひとつのインスタンスだけが反応するようになりました。やりましたね。
SubscribeするConsumerが偏ってしまう
なんですが、よくよく見るとメッセージを全部同じインスタンスが見てしまっています。こういう挙動をされてしまうと、
もうちょっと振り分けたくなりますね?
で、このあたりとドキュメントを参考にして…。
Spring Cloud StreamでKafkaのPartitionをインスタンス別に読み込む
現在の設定だと、Apache KafkaのTopicsはSpring Cloud Streamが自動的に作成します。「spring.cloud.stream.kafka.binder.autoCreateTopics」が
デフォルトでtrueなので。
で、パーティション数は1になっています。
Consumerをもうちょっと振り分けたいと思ったら、パーティションを調整することになりそうです。
Spring Cloud Stream側にTopicsを作成してもらうというのを変えないままで、絡んでくる要素はこのあたりでしょうか。
Sink側の設定で、このあたり。
- spring.cloud.stream.instanceCount
- spring.cloud.stream.instanceIndex
- spring.cloud.stream.kafka.binder.autoAddPartitions
どうも、instanceCountとinstanceIndexでConsumerがどう受け取るのかを制御するみたいです。
※Spring Cloud Data Flowを使うと自動で面倒を見てくれるっぽいですが、今回はSpring Cloud Streamのみに話題を絞って突っ走ります
Instance Index and Instance Count
とりあえず、今回はinstanceCountを2に、instanceIndexは実行時にシステムプロパティを指定する方針でいきます。instanceIndexは、
パーティションの位置を指定する感じみたいです(0起点)。
また、instanceCountを増やした時にパーティションがないとNGのようなので、今回はSpring Cloud Streamに自動で追加してもらうように
「spring.cloud.stream.kafka.binder.autoAddPartitions」をtrueにします。
結果、設定としては以下を2つのSink、それぞれに追加しました。
spring.cloud.stream.instanceCount=2 spring.cloud.stream.kafka.binder.autoAddPartitions=true
最後に、Source側には「spring.cloud.stream.bindings.output.producer.partitionCount」でパーティション数を書いておきます。とりあえず、
現時点では振り分け方については考えないでおきます。
spring.cloud.stream.bindings.output.producer.partitionCount=2
Configuring Output Bindings for Partitioning
結局、instanceIndexでどのパーティションからSubscribeするか指定するようなものなので、Source側がパーティション別に
登録しないと機能しないみたいなんですよね。partitionCountを設定しなかった場合は、同じパーティションにデータを
登録し続けてしまいます…。
では、パッケージングしてアプリケーションを起動しましょう。Sink側は、起動コマンドも変わります。
それぞれ、instanceIndexをシステムプロパティで指定するように調整。
## Sink star-group # instance-1 $ java -Dserver.port=28080 -Dspring.cloud.stream.instanceIndex=0 -jar star-group-console-sink/target/star-group-console-sink-0.0.1-SNAPSHOT.jar # instance-2 $ java -Dserver.port=28081 -Dspring.cloud.stream.instanceIndex=1 -jar star-group-console-sink/target/star-group-console-sink-0.0.1-SNAPSHOT.jar ## Sink brackets-group # instance-1 $ java -Dserver.port=38080 -Dspring.cloud.stream.instanceIndex=0 -jar brackets-group-console-sink/target/brackets-group-console-sink-0.0.1-SNAPSHOT.jar # instance-2 $ java -Dserver.port=38081 -Dspring.cloud.stream.instanceIndex=1 -jar brackets-group-console-sink/target/brackets-group-console-sink-0.0.1-SNAPSHOT.jar
これで、確認してみましょう。データを登録してみた結果は、こちら。
## Sink star-group # instance-1 received message = ★Hello World - 2017-04-22 18:54:19★ received message = ★Hello World - 2017-04-22 18:54:23★ received message = ★Hello World - 2017-04-22 18:54:26★ # instance-2 received message = ★Hello World - 2017-04-22 18:54:21★ received message = ★Hello World - 2017-04-22 18:54:24★ received message = ★Hello World - 2017-04-22 18:54:28★ ## Sink brackets-group # instance-1 received message = [Hello World - 2017-04-22 18:54:19] received message = [Hello World - 2017-04-22 18:54:23] received message = [Hello World - 2017-04-22 18:54:26] # instance-2 received message = [Hello World - 2017-04-22 18:54:21] received message = [Hello World - 2017-04-22 18:54:24] received message = [Hello World - 2017-04-22 18:54:28]
ラウンドロビン的なバラけ方をしましたね。
とりあえず、こんなところかと…。
まとめ
Spring Cloud StreamとApache Kafkaで、Consumer Groupの設定と確認、それから少しパーティショニングまわりの内容を
見てみました。
とりあえずConsumer Groupはいいですが、パーティショニングについての制御だったりはちょっとまだよくわからない感じですね。
あと、instanceCountとinstanceIndexは確かにSpring Cloud Stream単体だと面倒かも…。
パーティショニングまわりの話題は、また今度追ってみましょう。