前に、Spring Cloud Stream+Apache Kafkaで、Consumer Groupを設定した時の動作を見てみました。
Spring Cloud Stream+Apache Kafkaで、Consumer Groupを設定した時の挙動を確認する - CLOVER
この時にも少しパーティションは使っていたのですが、今回はよりパーティションのみにフォーカスして
見ていきたいと思います。
Spring Cloud StreamのPartitioning Supportについて
Spring Cloud Streamの、Partitioning Supportについてのドキュメントはこちら。
メッセージブローカー(今回はApache Kafka)に対して1つ以上のProducerが(パーティションを含むブローカーに対して)メッセージを送信し、
かつ複数のComsumerがある場合に、あるルールに沿ったデータがいつも同じConsumerインスタンスに対して処理されるように設定することができます。
Spring Cloud Streamでは、パーティショニングの実装に対する処理を抽象化することができます(パーティショニングを
サポートしているApache Kafkaや、そうでないRabbitMQに関わらず)。
パーティショニングは、関連する一連のデータが確実に一緒に処理されるようにするために重要な、ステートフルな処理の概念です
(パフォーマンス、整合性などの理由で)。
参考)
Event Driven Microservices with Spring Cloud Stream #jjug_ccc #ccc_ab3
とまあ、パーティショニングについての話はこんな感じのようで。
Producerからどうパーティションに振り分けるかのは、キーを指定、もしくはキーを抽出するためのクラスを指定することで
決定するみたいです。
準備
それでは、試していってみましょう。
今回は、以下の条件で試してみたいと思います。
今回のアプリケーションは、Mavenのマルチプロジェクト構成とします。
親pomの設定は、こんな感じに。
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-dependencies</artifactId> <version>Chelsea.SR2</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <build> <pluginManagement> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <version>1.5.3.RELEASE</version> </plugin> </plugins> </pluginManagement> </build>
ProducerおよびConsumerのpomは、これくらいの設定にします。
<dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-kafka</artifactId> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <executions> <execution> <goals> <goal>repackage</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
Producer
それでは、最初にProducerを作成します。REST APIで、メッセージを送るだけの簡単なものにします。
api-source/src/main/java/org/littlewings/spring/cloud/ApiSourceApp.java
package org.littlewings.spring.cloud; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Source; import org.springframework.http.HttpStatus; import org.springframework.messaging.support.MessageBuilder; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.ResponseStatus; import org.springframework.web.bind.annotation.RestController; @SpringBootApplication @EnableBinding(Source.class) @RestController public class ApiSourceApp { public static void main(String... args) { SpringApplication.run(ApiSourceApp.class, args); } Source source; public ApiSourceApp(Source source) { this.source = source; } @PostMapping("register") @ResponseStatus(HttpStatus.CREATED) public Message register(@RequestBody Message message) { source.output().send(MessageBuilder.withPayload(message).build()); return message; } }
Messageというクラスは、自作のJavaBeansでこんな感じに。プロパティとして、「category」と「value」を持つものとします。
api-source/src/main/java/org/littlewings/spring/cloud/Message.java
package org.littlewings.spring.cloud; public class Message { private String category; private String value; // getter/setterは省略 }
設定は、こんな感じ。Apache KafkaおよびApache ZooKeeperは、「172.17.0.2」で動作しているものとします。
api-source/src/main/resources/application.properties
spring.cloud.stream.bindings.output.destination=message spring.cloud.stream.bindings.output.contentType=application/json spring.cloud.stream.bindings.output.producer.partitionCount=2 spring.cloud.stream.kafka.binder.brokers=172.17.0.2 spring.cloud.stream.kafka.binder.defaultBrokerPort=9092 spring.cloud.stream.kafka.binder.zkNodes=172.17.0.2 spring.cloud.stream.kafka.binder.defaultZkPort=2181
「spring.cloud.stream.bindings.output.producer.partitionCount」で、パーティション数は2と指定しました。
Consumer
続いては、Consumerです。
Consumerは2つ用意することにしたので、わかりやすく別々のものを用意しましょう。メッセージを受け取って標準出力に書き出すアプリケーション
として、それぞれ「★」と「[]」で囲って出力するConsumerを用意します。
まずは、「★」で囲う方。
star-console-sink/src/main/java/org/littlewings/spring/stream/StarSinkApp.java
package org.littlewings.spring.stream; import java.util.Map; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Sink; @SpringBootApplication @EnableBinding(Sink.class) public class StarSinkApp { public static void main(String... args) { SpringApplication.run(StarSinkApp.class, args); } @StreamListener(Sink.INPUT) public void println(Map<String, Object> message) { System.out.printf("[%s] received message = ★%s★%n", message.get("category"), message.get("value")); } }
「category」はそのまま出力しますが、「value」については「★」で囲って出力します。
設定については、こんな感じです。
star-console-sink/src/main/resources/application.properties
spring.cloud.stream.instanceCount=2 spring.cloud.stream.instanceIndex=0 spring.cloud.stream.bindings.input.destination=message spring.cloud.stream.bindings.input.group=message-group spring.cloud.stream.kafka.binder.brokers=172.17.0.2 spring.cloud.stream.kafka.binder.defaultBrokerPort=9092 spring.cloud.stream.kafka.binder.zkNodes=172.17.0.2 spring.cloud.stream.kafka.binder.defaultZkPort=2181 server.port=18080
インスタンス数は2とし、パーティション0番目からデータを読み込むことにしました。
spring.cloud.stream.instanceCount=2 spring.cloud.stream.instanceIndex=0
Consumer Groupも設定しておきます。指定しないと、それぞれのパーティションからデータを読んだりするような挙動になるので、
同じTopicからデータを読み込むのであればやはり設定しておいた方が無難です。
spring.cloud.stream.bindings.input.group=message-group
あとは、使用するポートをデフォルトから変えたくらいですね。
続いて、もうひとつの[]」で囲うConsumer。先ほどとソースコードはほとんど一緒になります。
brackets-console-sink/src/main/java/org/littlewings/spring/stream/BracketsSinkApp.java
package org.littlewings.spring.stream; import java.util.Map; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Sink; @SpringBootApplication @EnableBinding(Sink.class) public class BracketsSinkApp { public static void main(String... args) { SpringApplication.run(BracketsSinkApp.class, args); } @StreamListener(Sink.INPUT) public void println(Map<String, Object> message) { System.out.printf("[%s] received message = [%s]%n", message.get("category"), message.get("value")); } }
設定もほとんど同じですが
brackets-console-sink/src/main/resources/application.properties
spring.cloud.stream.instanceCount=2 spring.cloud.stream.instanceIndex=1 spring.cloud.stream.bindings.input.destination=message spring.cloud.stream.bindings.input.group=message-group spring.cloud.stream.kafka.binder.brokers=172.17.0.2 spring.cloud.stream.kafka.binder.defaultBrokerPort=9092 spring.cloud.stream.kafka.binder.zkNodes=172.17.0.2 spring.cloud.stream.kafka.binder.defaultZkPort=2181 server.port=28080
ポートをずらしているのと、「spring.cloud.stream.instanceIndex」は1として「★」で囲う方とは利用するパーティションを分けています。
動かしてみる
それでは、動作確認してみましょう。
パッケージングして、実行。
$ mvn package ## Producer $ java -jar api-source/target/api-source-0.0.1-SNAPSHOT.jar ## Consumer 「★」 $ java -jar star-console-sink/target/star-console-sink-0.0.1-SNAPSHOT.jar ## Consumer 「[]」 $ java -jar brackets-console-sink/target/brackets-console-sink-0.0.1-SNAPSHOT.jar
Producerに対して、以下のようなリクエストを何回か投げてみます。
D=`date '+%Y-%m-%d %H:%M:%S'` && curl -i -XPOST -H 'Content-Type: application/json' http://localhost:8080/register -d '{"category": "foo", "value": "Hello World - '"$D"'"}' D=`date '+%Y-%m-%d %H:%M:%S'` && curl -i -XPOST -H 'Content-Type: application/json' http://localhost:8080/register -d '{"category": "foo", "value": "Hello World - '"$D"'"}'
違う値は、「category」だけですね。
それぞれ、5回ずつリクエストを投げてみます。
## Consumer 「★」 [foo] received message = ★Hello World - 2017-06-03 01:41:03★ [foo] received message = ★Hello World - 2017-06-03 01:41:07★ [foo] received message = ★Hello World - 2017-06-03 01:41:11★ [bar] received message = ★Hello World - 2017-06-03 01:41:16★ [bar] received message = ★Hello World - 2017-06-03 01:41:21★ ## Consumer 「[]」 [foo] received message = [Hello World - 2017-06-03 01:41:05] [foo] received message = [Hello World - 2017-06-03 01:41:09] [bar] received message = [Hello World - 2017-06-03 01:41:14] [bar] received message = [Hello World - 2017-06-03 01:41:18] [bar] received message = [Hello World - 2017-06-03 01:41:22]
ラウンドロビンですね。
これなのですが、パーティションを指定するためのキーを設定していないためです。
Apache KafkaのデフォルトのPartitionerは、キー指定がないとこのような振り分けを行います。
https://github.com/apache/kafka/blob/0.10.2.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java#L57-L62
まあ、適当に振り分けます。
実装を見ると、実質ラウンドロビンですね。
キーを指定する
これでは、パーティションごとに関心のあるデータをルーティングできないので、パーティション用のキーを設定します。
Producerの設定ファイルに、次の1行を追加。
spring.cloud.stream.bindings.output.producer.partitionKeyExpression=payload.category
「category」プロパティで、ロードバランシング、同じ「category」のデータは、同じNodeが処理できるように設定します。
パーティショニングの自動追加設定がないとエラーになる場合は、設定を追加してください。今回はConsumerに設定を追加しました。
spring.cloud.stream.kafka.binder.autoAddPartitions=true
Caused by: org.springframework.cloud.stream.provisioning.ProvisioningException: The number of expected partitions was: 2, but 1 has been found instead.Consider either increasing the partition count of the topic or enabling `autoAddPartitions`
見てみましょう。先ほどと同様、foo、barなリクエストを送信してみます。
## Consumer 「★」 [bar] received message = ★Hello World - 2017-06-03 01:54:52★ [bar] received message = ★Hello World - 2017-06-03 01:54:54★ [bar] received message = ★Hello World - 2017-06-03 01:54:56★ [bar] received message = ★Hello World - 2017-06-03 01:54:58★ [bar] received message = ★Hello World - 2017-06-03 01:55:00★ ## Consumer 「[]」 [foo] received message = [Hello World - 2017-06-03 01:54:35] [foo] received message = [Hello World - 2017-06-03 01:54:37] [foo] received message = [Hello World - 2017-06-03 01:54:39] [foo] received message = [Hello World - 2017-06-03 01:54:41] [foo] received message = [Hello World - 2017-06-03 01:54:44]
今度は、キレイに分かれましたね!OKそうです。
なお、partitionKeyExpressionを指定した場合は、以下の部分で割り当て先のパーティションのIDが決定するようです。
https://github.com/spring-projects/spring-integration-kafka/blob/v2.1.0.RELEASE/src/main/java/org/springframework/integration/kafka/outbound/KafkaProducerMessageHandler.java#L121-L123
※指定しなかった場合はnull