CLOVER🍀

That was when it all began.

Spring Cloud Stream+Apache Kafkaで、Consumer Groupを設定した時の挙動を確認する

Spring Cloud Streamの設定のうち、以下の部分ですが

spring.cloud.stream.bindings.input.group=
spring.cloud.stream.bindings.<channelName>.group=

指定した方がいいよーという話は見るものの、効果のほどを自分で確認したことがなかったので、
気になって見てみることにしました。

Consumer Groupって?

Spring Cloud Stream上でのドキュメントは、こちら。

Consumer Groups

Apache KafkaのConsumer Groupsにインスパイアされたもののようで、同じgroup idを持ったConsumer群に対して、
その中のひとつのConsumerにメッセージを配信してくれる仕組みのようです。

Apache Kafkaでのドキュメントだと、こちら。

Consumers and Consumer Groups

試してみる

それでは、簡単なアプリケーションを書いて試してみるとしましょう。

以下の条件で、アプリケーションを作ります。

  • Sourceは簡単なSpring MVCREST API
  • SinkにConsumer Groupを2つ作り、それぞれ2本にする(2 × 2で計4インスタンス
  • Message Brokerは、Apache Kafka
  • 2つのSinkは、Apache Kafkaの同じTopicを使う

今回は簡単に、Mavenのマルチプロジェクト内でSourceとSinkを作成します。

親pomには、こんなことを書いておきます。

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-stream-dependencies</artifactId>
                <version>Chelsea.SR1</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <pluginManagement>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                    <version>1.5.2.RELEASE</version>
                </plugin>
            </plugins>
        </pluginManagement>
    </build>

サブモジュール側は、全部次の設定にしておきます。最低限、「spring-cloud-starter-stream-kafka」が使えれば
充分です。

    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <executions>
                    <execution>
                        <goals>
                            <goal>repackage</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

Sourceを作る

では、最初にSourceを作成します。「api-source」というモジュールにしましょう。

こんな感じで、簡単に。
api-source/src/main/java/org/littlewings/spring/stream/ApiSourceApp.java

package org.littlewings.spring.stream;

import java.util.Map;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.http.HttpStatus;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController;

@SpringBootApplication
@EnableBinding(Source.class)
@RestController
public class ApiSourceApp {
    public static void main(String... args) {
        SpringApplication.run(ApiSourceApp.class, args);
    }

    Source source;

    public ApiSourceApp(Source source) {
        this.source = source;
    }

    @PostMapping("register")
    @ResponseStatus(HttpStatus.CREATED)
    public Map<String, Object> register(@RequestBody Map<String, Object> message) {
        source.output().send(MessageBuilder.withPayload(message).build());
        return message;
    }
}

Source側の設定は、特にConsumer Groupを意識しません。
api-source/src/main/resources/application.properties

spring.cloud.stream.bindings.output.destination=message
spring.cloud.stream.bindings.output.contentType=application/json

spring.cloud.stream.kafka.binder.brokers=172.17.0.2
spring.cloud.stream.kafka.binder.defaultBrokerPort=9092
spring.cloud.stream.kafka.binder.zkNodes=172.17.0.2
spring.cloud.stream.kafka.binder.defaultZkPort=2181

Sinkを作る

続いて、Sinkを作成します。Sinkは、受け取ったメッセージを簡単に装飾してコンソール出力するものにしましょう。

こういうJSONを受け取ったら

{"message":"Hello World"}

それぞれ次のように出力するSinkを作成します。

received message = ★Hello World★

received message = [Hello World]

ただ、最初はConsumer Groupは指定せずにいってみたいと思います。モジュールは、「star-group-console-sink」と「brackets-group-console-sink」とします。

star-group側のソースコード
star-group-console-sink/src/main/java/org/littlewings/spring/stream/StarGroupSinkApp.java

package org.littlewings.spring.stream;

import java.util.Map;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;

@SpringBootApplication
@EnableBinding(Sink.class)
public class StarGroupSinkApp {
    public static void main(String... args) {
        SpringApplication.run(StarGroupSinkApp.class, args);
    }

    @StreamListener(Sink.INPUT)
    public void println(Map<String, Object> message) {
        System.out.printf("received message = ★%s★%n", message.get("message"));
    }
}

設定。
star-group-console-sink/src/main/resources/application.properties

spring.cloud.stream.bindings.input.destination=message

spring.cloud.stream.kafka.binder.brokers=172.17.0.2
spring.cloud.stream.kafka.binder.defaultBrokerPort=9092
spring.cloud.stream.kafka.binder.zkNodes=172.17.0.2
spring.cloud.stream.kafka.binder.defaultZkPort=2181

brackets-group側も、ほとんど同じ実装です。まあ、メッセージ出力の装飾文字が違うくらいですね。
brackets-group-console-sink/src/main/java/org/littlewings/spring/stream/BracketsGroupSinkApp.java

package org.littlewings.spring.stream;

import java.util.Map;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;

@SpringBootApplication
@EnableBinding(Sink.class)
public class BracketsGroupSinkApp {
    public static void main(String... args) {
        SpringApplication.run(BracketsGroupSinkApp.class, args);
    }

    @StreamListener(Sink.INPUT)
    public void println(Map<String, Object> message) {
        System.out.printf("received message = [%s]%n", message.get("message"));
    }
}

設定。
brackets-group-console-sink/src/main/resources/application.properties

spring.cloud.stream.bindings.input.destination=message

spring.cloud.stream.kafka.binder.brokers=172.17.0.2
spring.cloud.stream.kafka.binder.defaultBrokerPort=9092
spring.cloud.stream.kafka.binder.zkNodes=172.17.0.2
spring.cloud.stream.kafka.binder.defaultZkPort=2181

ここまでで、とりあえずアプリケーションの準備ができました。

Consumer Groupなしで実行

それでは、まずはConsumer Groupを指定しないまま確認してみます。

パッケージング。

$ mvn package

Sourceをひとつ、Sinkを2つずつ起動します。なお、Apache Kafkaは起動済みとします。

## Source
$ java -jar api-source/target/api-source-0.0.1-SNAPSHOT.jar

## Sink star-group
# instance-1
$ java -Dserver.port=28080 -jar star-group-console-sink/target/star-group-console-sink-0.0.1-SNAPSHOT.jar
# instance-2
$ java -Dserver.port=28081 -jar star-group-console-sink/target/star-group-console-sink-0.0.1-SNAPSHOT.jar

## Sink brackets-group
# instance-1
$ java -Dserver.port=38080 -jar brackets-group-console-sink/target/brackets-group-console-sink-0.0.1-SNAPSHOT.jar
# instance-2
$ java -Dserver.port=38081 -jar brackets-group-console-sink/target/brackets-group-console-sink-0.0.1-SNAPSHOT.jar

で、よくログを見ているとこんなのが出力されています。
※2つにはしょっています

2017-04-22 18:01:24,660  INFO -kafka-consumer-1 o.a.k.c.c.i.ConsumerCoordinator:225 - Setting newly assigned partitions [message-0] for group anonymous.0e1dbc50-7800-4989-9235-ec0926d533d9

2017-04-22 18:04:21,166  INFO -kafka-consumer-1 o.a.k.c.c.i.ConsumerCoordinator:225 - Setting newly assigned partitions [message-0] for group anonymous.bd16403f-559e-4e3d-93a1-6990cc4b7db0

Consumer Groupを指定していないので、anonymousなグループに属しているって感じになるのでしょうね。

なお、Apache Kafka側のログにもこんな感じのログが出力されています。

[2017-04-22 09:04:21,151] INFO [GroupCoordinator 0]: Preparing to restabilize group anonymous.bd16403f-559e-4e3d-93a1-6990cc4b7db0 with old generation 0 (kafka.coordinator.GroupCoordinator)
[2017-04-22 09:04:21,152] INFO [GroupCoordinator 0]: Stabilized group anonymous.bd16403f-559e-4e3d-93a1-6990cc4b7db0 generation 1 (kafka.coordinator.GroupCoordinator)
[2017-04-22 09:04:21,160] INFO [GroupCoordinator 0]: Assignment received from leader for group anonymous.bd16403f-559e-4e3d-93a1-6990cc4b7db0 for generation 1 (kafka.coordinator.GroupCoordinator)

[2017-04-22 09:04:21,151] INFO [GroupCoordinator 0]: Preparing to restabilize group anonymous.bd16403f-559e-4e3d-93a1-6990cc4b7db0 with old generation 0 (kafka.coordinator.GroupCoordinator)
[2017-04-22 09:04:21,152] INFO [GroupCoordinator 0]: Stabilized group anonymous.bd16403f-559e-4e3d-93a1-6990cc4b7db0 generation 1 (kafka.coordinator.GroupCoordinator)
[2017-04-22 09:04:21,160] INFO [GroupCoordinator 0]: Assignment received from leader for group anonymous.bd16403f-559e-4e3d-93a1-6990cc4b7db0 for generation 1 (kafka.coordinator.GroupCoordinator)

では、メッセージをapi-sourceに送ってみます。わかりやすいようにメッセージに時間を入れて、6回送ってみましょう。
※以下を6回繰り返す

$ D=`date '+%Y-%m-%d %H:%M:%S'` && curl -i -XPOST -H 'Content-Type: application/json' http://localhost:8080/register -d '{"message": "Hello World - '"$D"'"}'

結果。

## Sink star-group
# instance-1
received message = ★Hello World - 2017-04-22 18:06:38★
received message = ★Hello World - 2017-04-22 18:06:42★
received message = ★Hello World - 2017-04-22 18:06:44★
received message = ★Hello World - 2017-04-22 18:06:46★
received message = ★Hello World - 2017-04-22 18:06:48★
received message = ★Hello World - 2017-04-22 18:06:50★

# instance-2
received message = ★Hello World - 2017-04-22 18:06:38★
received message = ★Hello World - 2017-04-22 18:06:42★
received message = ★Hello World - 2017-04-22 18:06:44★
received message = ★Hello World - 2017-04-22 18:06:46★
received message = ★Hello World - 2017-04-22 18:06:48★
received message = ★Hello World - 2017-04-22 18:06:50★


## Sink brackets-group
# instance-1
received message = [Hello World - 2017-04-22 18:06:38]
received message = [Hello World - 2017-04-22 18:06:42]
received message = [Hello World - 2017-04-22 18:06:44]
received message = [Hello World - 2017-04-22 18:06:46]
received message = [Hello World - 2017-04-22 18:06:48]
received message = [Hello World - 2017-04-22 18:06:50]

# instance-2
received message = [Hello World - 2017-04-22 18:06:38]
received message = [Hello World - 2017-04-22 18:06:42]
received message = [Hello World - 2017-04-22 18:06:44]
received message = [Hello World - 2017-04-22 18:06:46]
received message = [Hello World - 2017-04-22 18:06:48]
received message = [Hello World - 2017-04-22 18:06:50]

確かに、同じメッセージが全インスタンスに行きますね。現時点だと、挙動は違うもののグループの指定はしていませんからね。

Consumer Groupを指定する

では、Consumer Groupを指定してみましょう。

「star-group」のapplication.propertiesには、こちらを

spring.cloud.stream.bindings.input.group=star-group

brackets-group」のapplication.propertiesには、こちらをそれぞれ追加します。

spring.cloud.stream.bindings.input.group=brackets-group

で、パッケージングして再度確認。Apache Kafkaもデータを削除してやり直します。で、起動。

今度は、起動時にこんな感じにGroupに関する情報が出力されます。

2017-04-22 18:15:43,995  INFO -kafka-consumer-1 o.a.k.c.c.i.ConsumerCoordinator:225 - Setting newly assigned partitions [message-0] for group star-group

2017-04-22 18:15:48,042  INFO -kafka-consumer-1 o.a.k.c.c.i.ConsumerCoordinator:225 - Setting newly assigned partitions [message-0] for group brackets-group

Apache Kafka側でも、同様に。

[2017-04-22 09:15:43,765] INFO [GroupCoordinator 0]: Preparing to restabilize group star-group with old generation 1 (kafka.coordinator.GroupCoordinator)
[2017-04-22 09:15:43,814] INFO [GroupCoordinator 0]: Stabilized group star-group generation 2 (kafka.coordinator.GroupCoordinator)
[2017-04-22 09:15:43,842] INFO [GroupCoordinator 0]: Assignment received from leader for group star-group for generation 2 (kafka.coordinator.GroupCoordinator)

[2017-04-22 09:15:46,255] INFO [GroupCoordinator 0]: Preparing to restabilize group brackets-group with old generation 1 (kafka.coordinator.GroupCoordinator)
[2017-04-22 09:15:48,024] INFO [GroupCoordinator 0]: Stabilized group brackets-group generation 2 (kafka.coordinator.GroupCoordinator)
[2017-04-22 09:15:48,032] INFO [GroupCoordinator 0]: Assignment received from leader for group brackets-group for generation 2 (kafka.coordinator.GroupCoordinator)

Apache KafkaでのConsumer Groupになってそうですね。

で、また6回データを投げ込んでみた時の結果は、こちら。
結果。

## Sink star-group
# instance-1
received message = ★Hello World - 2017-04-22 18:18:35★
received message = ★Hello World - 2017-04-22 18:18:37★
received message = ★Hello World - 2017-04-22 18:18:39★
received message = ★Hello World - 2017-04-22 18:18:42★
received message = ★Hello World - 2017-04-22 18:18:44★
received message = ★Hello World - 2017-04-22 18:18:46★

# instance-2
(なし)


## Sink brackets-group
# instance-1
なし

# instance-2
received message = [Hello World - 2017-04-22 18:18:35]
received message = [Hello World - 2017-04-22 18:18:37]
received message = [Hello World - 2017-04-22 18:18:39]
received message = [Hello World - 2017-04-22 18:18:42]
received message = [Hello World - 2017-04-22 18:18:44]
received message = [Hello World - 2017-04-22 18:18:46]

Consumer Group内の、ひとつのインスタンスだけが反応するようになりました。やりましたね。

SubscribeするConsumerが偏ってしまう

なんですが、よくよく見るとメッセージを全部同じインスタンスが見てしまっています。こういう挙動をされてしまうと、
もうちょっと振り分けたくなりますね?

で、このあたりとドキュメントを参考にして…。

Spring Cloud StreamでKafkaのPartitionをインスタンス別に読み込む

現在の設定だと、Apache KafkaのTopicsはSpring Cloud Streamが自動的に作成します。「spring.cloud.stream.kafka.binder.autoCreateTopics」が
デフォルトでtrueなので。

Kafka Binder Properties

で、パーティション数は1になっています。

Consumerをもうちょっと振り分けたいと思ったら、パーティションを調整することになりそうです。

Spring Cloud Stream側にTopicsを作成してもらうというのを変えないままで、絡んでくる要素はこのあたりでしょうか。

Sink側の設定で、このあたり。

  • spring.cloud.stream.instanceCount
  • spring.cloud.stream.instanceIndex
  • spring.cloud.stream.kafka.binder.autoAddPartitions

Consumer properties

どうも、instanceCountとinstanceIndexでConsumerがどう受け取るのかを制御するみたいです。
※Spring Cloud Data Flowを使うと自動で面倒を見てくれるっぽいですが、今回はSpring Cloud Streamのみに話題を絞って突っ走ります

Instance Index and Instance Count

とりあえず、今回はinstanceCountを2に、instanceIndexは実行時にシステムプロパティを指定する方針でいきます。instanceIndexは、
パーティションの位置を指定する感じみたいです(0起点)。

また、instanceCountを増やした時にパーティションがないとNGのようなので、今回はSpring Cloud Streamに自動で追加してもらうように
「spring.cloud.stream.kafka.binder.autoAddPartitions」をtrueにします。

Kafka Binder Properties

結果、設定としては以下を2つのSink、それぞれに追加しました。

spring.cloud.stream.instanceCount=2
spring.cloud.stream.kafka.binder.autoAddPartitions=true

最後に、Source側には「spring.cloud.stream.bindings.output.producer.partitionCount」でパーティション数を書いておきます。とりあえず、
現時点では振り分け方については考えないでおきます。

spring.cloud.stream.bindings.output.producer.partitionCount=2

Configuring Output Bindings for Partitioning

結局、instanceIndexでどのパーティションからSubscribeするか指定するようなものなので、Source側がパーティション別に
登録しないと機能しないみたいなんですよね。partitionCountを設定しなかった場合は、同じパーティションにデータを
登録し続けてしまいます…。

では、パッケージングしてアプリケーションを起動しましょう。Sink側は、起動コマンドも変わります。
それぞれ、instanceIndexをシステムプロパティで指定するように調整。

## Sink star-group
# instance-1
$ java -Dserver.port=28080 -Dspring.cloud.stream.instanceIndex=0 -jar star-group-console-sink/target/star-group-console-sink-0.0.1-SNAPSHOT.jar
# instance-2
$ java -Dserver.port=28081 -Dspring.cloud.stream.instanceIndex=1 -jar star-group-console-sink/target/star-group-console-sink-0.0.1-SNAPSHOT.jar 

## Sink brackets-group
# instance-1
$ java -Dserver.port=38080 -Dspring.cloud.stream.instanceIndex=0 -jar brackets-group-console-sink/target/brackets-group-console-sink-0.0.1-SNAPSHOT.jar
# instance-2
$ java -Dserver.port=38081 -Dspring.cloud.stream.instanceIndex=1 -jar brackets-group-console-sink/target/brackets-group-console-sink-0.0.1-SNAPSHOT.jar

これで、確認してみましょう。データを登録してみた結果は、こちら。

## Sink star-group
# instance-1
received message = ★Hello World - 2017-04-22 18:54:19★
received message = ★Hello World - 2017-04-22 18:54:23★
received message = ★Hello World - 2017-04-22 18:54:26★

# instance-2
received message = ★Hello World - 2017-04-22 18:54:21★
received message = ★Hello World - 2017-04-22 18:54:24★
received message = ★Hello World - 2017-04-22 18:54:28★


## Sink brackets-group
# instance-1
received message = [Hello World - 2017-04-22 18:54:19]
received message = [Hello World - 2017-04-22 18:54:23]
received message = [Hello World - 2017-04-22 18:54:26]

# instance-2
received message = [Hello World - 2017-04-22 18:54:21]
received message = [Hello World - 2017-04-22 18:54:24]
received message = [Hello World - 2017-04-22 18:54:28]

ラウンドロビン的なバラけ方をしましたね。

とりあえず、こんなところかと…。

まとめ

Spring Cloud StreamとApache Kafkaで、Consumer Groupの設定と確認、それから少しパーティショニングまわりの内容を
見てみました。

とりあえずConsumer Groupはいいですが、パーティショニングについての制御だったりはちょっとまだよくわからない感じですね。

あと、instanceCountとinstanceIndexは確かにSpring Cloud Stream単体だと面倒かも…。

パーティショニングまわりの話題は、また今度追ってみましょう。

Partitioning