CLOVER🍀

That was when it all began.

Spring Cloud Stream(+Apache Kafka)で、Partition Supportを試す

前に、Spring Cloud Stream+Apache Kafkaで、Consumer Groupを設定した時の動作を見てみました。

Spring Cloud Stream+Apache Kafkaで、Consumer Groupを設定した時の挙動を確認する - CLOVER

この時にも少しパーティションは使っていたのですが、今回はよりパーティションのみにフォーカスして
見ていきたいと思います。

Spring Cloud StreamのPartitioning Supportについて

Spring Cloud Streamの、Partitioning Supportについてのドキュメントはこちら。

Partitioning Support

メッセージブローカー(今回はApache Kafka)に対して1つ以上のProducerが(パーティションを含むブローカーに対して)メッセージを送信し、
かつ複数のComsumerがある場合に、あるルールに沿ったデータがいつも同じConsumerインスタンスに対して処理されるように設定することができます。

Spring Cloud Streamでは、パーティショニングの実装に対する処理を抽象化することができます(パーティショニングを
サポートしているApache Kafkaや、そうでないRabbitMQに関わらず)。

パーティショニングは、関連する一連のデータが確実に一緒に処理されるようにするために重要な、ステートフルな処理の概念です
(パフォーマンス、整合性などの理由で)。

参考)
Event Driven Microservices with Spring Cloud Stream #jjug_ccc #ccc_ab3

とまあ、パーティショニングについての話はこんな感じのようで。

Producerからどうパーティションに振り分けるかのは、キーを指定、もしくはキーを抽出するためのクラスを指定することで
決定するみたいです。

準備

それでは、試していってみましょう。

今回は、以下の条件で試してみたいと思います。

今回のアプリケーションは、Mavenのマルチプロジェクト構成とします。

親pomの設定は、こんな感じに。

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
    </properties>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-stream-dependencies</artifactId>
                <version>Chelsea.SR2</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <pluginManagement>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                    <version>1.5.3.RELEASE</version>
                </plugin>
            </plugins>
        </pluginManagement>
    </build>

ProducerおよびConsumerのpomは、これくらいの設定にします。

    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <executions>
                    <execution>
                        <goals>
                            <goal>repackage</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

Apache Kafkaは起動済みとし、バージョンは0.10.2.1とします。

Producer

それでは、最初にProducerを作成します。REST APIで、メッセージを送るだけの簡単なものにします。
api-source/src/main/java/org/littlewings/spring/cloud/ApiSourceApp.java

package org.littlewings.spring.cloud;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.http.HttpStatus;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController;

@SpringBootApplication
@EnableBinding(Source.class)
@RestController
public class ApiSourceApp {
    public static void main(String... args) {
        SpringApplication.run(ApiSourceApp.class, args);
    }

    Source source;

    public ApiSourceApp(Source source) {
        this.source = source;
    }

    @PostMapping("register")
    @ResponseStatus(HttpStatus.CREATED)
    public Message register(@RequestBody Message message) {
        source.output().send(MessageBuilder.withPayload(message).build());
        return message;
    }
}

Messageというクラスは、自作のJavaBeansでこんな感じに。プロパティとして、「category」と「value」を持つものとします。
api-source/src/main/java/org/littlewings/spring/cloud/Message.java

package org.littlewings.spring.cloud;

public class Message {
    private String category;
    private String value;

    // getter/setterは省略
}

設定は、こんな感じ。Apache KafkaおよびApache ZooKeeperは、「172.17.0.2」で動作しているものとします。
api-source/src/main/resources/application.properties

spring.cloud.stream.bindings.output.destination=message
spring.cloud.stream.bindings.output.contentType=application/json

spring.cloud.stream.bindings.output.producer.partitionCount=2

spring.cloud.stream.kafka.binder.brokers=172.17.0.2
spring.cloud.stream.kafka.binder.defaultBrokerPort=9092
spring.cloud.stream.kafka.binder.zkNodes=172.17.0.2
spring.cloud.stream.kafka.binder.defaultZkPort=2181

「spring.cloud.stream.bindings.output.producer.partitionCount」で、パーティション数は2と指定しました。

Consumer

続いては、Consumerです。

Consumerは2つ用意することにしたので、わかりやすく別々のものを用意しましょう。メッセージを受け取って標準出力に書き出すアプリケーション
として、それぞれ「★」と「[]」で囲って出力するConsumerを用意します。

まずは、「★」で囲う方。
star-console-sink/src/main/java/org/littlewings/spring/stream/StarSinkApp.java

package org.littlewings.spring.stream;

import java.util.Map;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;

@SpringBootApplication
@EnableBinding(Sink.class)
public class StarSinkApp {
    public static void main(String... args) {
        SpringApplication.run(StarSinkApp.class, args);
    }

    @StreamListener(Sink.INPUT)
    public void println(Map<String, Object> message) {
        System.out.printf("[%s] received message = ★%s★%n", message.get("category"), message.get("value"));
    }
}

「category」はそのまま出力しますが、「value」については「★」で囲って出力します。

設定については、こんな感じです。
star-console-sink/src/main/resources/application.properties

spring.cloud.stream.instanceCount=2
spring.cloud.stream.instanceIndex=0

spring.cloud.stream.bindings.input.destination=message
spring.cloud.stream.bindings.input.group=message-group

spring.cloud.stream.kafka.binder.brokers=172.17.0.2
spring.cloud.stream.kafka.binder.defaultBrokerPort=9092
spring.cloud.stream.kafka.binder.zkNodes=172.17.0.2
spring.cloud.stream.kafka.binder.defaultZkPort=2181

server.port=18080

インスタンス数は2とし、パーティション0番目からデータを読み込むことにしました。

spring.cloud.stream.instanceCount=2
spring.cloud.stream.instanceIndex=0

Consumer Groupも設定しておきます。指定しないと、それぞれのパーティションからデータを読んだりするような挙動になるので、
同じTopicからデータを読み込むのであればやはり設定しておいた方が無難です。

spring.cloud.stream.bindings.input.group=message-group

あとは、使用するポートをデフォルトから変えたくらいですね。

続いて、もうひとつの[]」で囲うConsumer。先ほどとソースコードはほとんど一緒になります。
brackets-console-sink/src/main/java/org/littlewings/spring/stream/BracketsSinkApp.java

package org.littlewings.spring.stream;

import java.util.Map;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;

@SpringBootApplication
@EnableBinding(Sink.class)
public class BracketsSinkApp {
    public static void main(String... args) {
        SpringApplication.run(BracketsSinkApp.class, args);
    }

    @StreamListener(Sink.INPUT)
    public void println(Map<String, Object> message) {
        System.out.printf("[%s] received message = [%s]%n", message.get("category"), message.get("value"));
    }
}

設定もほとんど同じですが
brackets-console-sink/src/main/resources/application.properties

spring.cloud.stream.instanceCount=2
spring.cloud.stream.instanceIndex=1

spring.cloud.stream.bindings.input.destination=message
spring.cloud.stream.bindings.input.group=message-group

spring.cloud.stream.kafka.binder.brokers=172.17.0.2
spring.cloud.stream.kafka.binder.defaultBrokerPort=9092
spring.cloud.stream.kafka.binder.zkNodes=172.17.0.2
spring.cloud.stream.kafka.binder.defaultZkPort=2181

server.port=28080

ポートをずらしているのと、「spring.cloud.stream.instanceIndex」は1として「★」で囲う方とは利用するパーティションを分けています。

動かしてみる

それでは、動作確認してみましょう。

パッケージングして、実行。

$ mvn package

## Producer
$ java -jar api-source/target/api-source-0.0.1-SNAPSHOT.jar

## Consumer 「★」
$ java -jar star-console-sink/target/star-console-sink-0.0.1-SNAPSHOT.jar

## Consumer 「[]」
$ java -jar brackets-console-sink/target/brackets-console-sink-0.0.1-SNAPSHOT.jar

Producerに対して、以下のようなリクエストを何回か投げてみます。

D=`date '+%Y-%m-%d %H:%M:%S'` && curl -i -XPOST -H 'Content-Type: application/json' http://localhost:8080/register -d '{"category": "foo", "value": "Hello World - '"$D"'"}'

D=`date '+%Y-%m-%d %H:%M:%S'` && curl -i -XPOST -H 'Content-Type: application/json' http://localhost:8080/register -d '{"category": "foo", "value": "Hello World - '"$D"'"}'

違う値は、「category」だけですね。

それぞれ、5回ずつリクエストを投げてみます。

## Consumer 「★」
[foo] received message = ★Hello World - 2017-06-03 01:41:03[foo] received message = ★Hello World - 2017-06-03 01:41:07[foo] received message = ★Hello World - 2017-06-03 01:41:11[bar] received message = ★Hello World - 2017-06-03 01:41:16[bar] received message = ★Hello World - 2017-06-03 01:41:21## Consumer 「[]」
[foo] received message = [Hello World - 2017-06-03 01:41:05]
[foo] received message = [Hello World - 2017-06-03 01:41:09]
[bar] received message = [Hello World - 2017-06-03 01:41:14]
[bar] received message = [Hello World - 2017-06-03 01:41:18]
[bar] received message = [Hello World - 2017-06-03 01:41:22]

ラウンドロビンですね。

これなのですが、パーティションを指定するためのキーを設定していないためです。

Apache KafkaのデフォルトのPartitionerは、キー指定がないとこのような振り分けを行います。
https://github.com/apache/kafka/blob/0.10.2.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java#L57-L62

まあ、適当に振り分けます。

Load Balancing

実装を見ると、実質ラウンドロビンですね。

キーを指定する

これでは、パーティションごとに関心のあるデータをルーティングできないので、パーティション用のキーを設定します。

Producerの設定ファイルに、次の1行を追加。

spring.cloud.stream.bindings.output.producer.partitionKeyExpression=payload.category

「category」プロパティで、ロードバランシング、同じ「category」のデータは、同じNodeが処理できるように設定します。

パーティショニングの自動追加設定がないとエラーになる場合は、設定を追加してください。今回はConsumerに設定を追加しました。

spring.cloud.stream.kafka.binder.autoAddPartitions=true
Caused by: org.springframework.cloud.stream.provisioning.ProvisioningException: The number of expected partitions was: 2, but 1 has been found instead.Consider either increasing the partition count of the topic or enabling `autoAddPartitions`

見てみましょう。先ほどと同様、foo、barなリクエストを送信してみます。

## Consumer 「★」
[bar] received message = ★Hello World - 2017-06-03 01:54:52[bar] received message = ★Hello World - 2017-06-03 01:54:54[bar] received message = ★Hello World - 2017-06-03 01:54:56[bar] received message = ★Hello World - 2017-06-03 01:54:58[bar] received message = ★Hello World - 2017-06-03 01:55:00## Consumer 「[]」
[foo] received message = [Hello World - 2017-06-03 01:54:35]
[foo] received message = [Hello World - 2017-06-03 01:54:37]
[foo] received message = [Hello World - 2017-06-03 01:54:39]
[foo] received message = [Hello World - 2017-06-03 01:54:41]
[foo] received message = [Hello World - 2017-06-03 01:54:44]

今度は、キレイに分かれましたね!OKそうです。

なお、partitionKeyExpressionを指定した場合は、以下の部分で割り当て先のパーティションのIDが決定するようです。
https://github.com/spring-projects/spring-integration-kafka/blob/v2.1.0.RELEASE/src/main/java/org/springframework/integration/kafka/outbound/KafkaProducerMessageHandler.java#L121-L123

※指定しなかった場合はnull

まとめ

Spring Cloud Stream+Apache Kafkaで、Partitioning Supportを試してみました。

先にConsumer Groupの設定で少しパーティショニングを使っていましたが、パーティショニングのみに着目して
見てみることで、Producerからのパーティションの振り分けなどがより理解ができた気がします。