CLOVER🍀

That was when it all began.

Apache Kafkaのkafka-console-producer.shでキーを指定する

Apache Kafkaには付属のツールとして、コンソールを使ったProducer/Consumerが付いています。

このうち、Producer(kafka-console-producer.sh)を使った時に、キーを指定してみようというお話。

まずは、Apache Kafkaのクラスタを用意します。Brokerは3つ、Apache ZooKeeperはひとつとします。

Topicを作成。最初はキーを指定しないパターンでやってみます。Partition数は3、レプリケーションは2とします。

$ bin/kafka-topics.sh --create --zookeeper 172.21.0.2:2181 --replication-factor 2 --partitions 3 --topic my-topic
Created topic "my-topic".

確認。

$ bin/kafka-topics.sh --describe --zookeeper 172.21.0.2:2181 --topic my-topic
Topic:my-topic	PartitionCount:3	ReplicationFactor:2	Configs:
	Topic: my-topic	Partition: 0	Leader: 3	Replicas: 3,5	Isr: 3,5
	Topic: my-topic	Partition: 1	Leader: 4	Replicas: 4,3	Isr: 4,3
	Topic: my-topic	Partition: 2	Leader: 5	Replicas: 5,4	Isr: 5,4

Producer(kafka-console-producer.sh)でデータを登録してみます。

$ bin/kafka-console-producer.sh --broker-list 172.21.0.3:9092 --topic my-topic
>value1
>value2
>value3
>value4
>value5
>value6
>value7
>value8
>value9

Consumer(bin/kafka-console-consumer.sh)で、最初からデータを読み出してみます。

$ bin/kafka-console-consumer.sh --bootstrap-server 172.21.0.3:9092 --topic my-topic --from-beginning
value1
value4
value7
value2
value5
value8
value3
value6
value9

けっこうすごい順番になりました。パーティションごとにデータを抜いてきている感じがしますねぇ。

なお、キー指定がない場合のパーティション振り分けは、ラウンドロビンなのでした。
https://github.com/apache/kafka/blob/0.11.0.0/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java#L57-L66

今度は、キーを指定するようにしてみます。先ほどとは別のTopicを作ってみましょう。

$ bin/kafka-topics.sh --create --zookeeper 172.21.0.2:2181 --replication-factor 2 --partitions 3 --topic my-topic-with-key
Created topic "my-topic-with-key".

Partition数とレプリケーション数は同じです。

describe。

$ bin/kafka-topics.sh --describe --zookeeper 172.21.0.2:2181 --topic my-topic-with-key
Topic:my-topic-with-key	PartitionCount:3	ReplicationFactor:2	Configs:
	Topic: my-topic-with-key	Partition: 0	Leader: 4	Replicas: 4,5	Isr: 4,5
	Topic: my-topic-with-key	Partition: 1	Leader: 5	Replicas: 5,3	Isr: 5,3
	Topic: my-topic-with-key	Partition: 2	Leader: 3	Replicas: 3,4	Isr: 3,4

データ登録。キーは、データ3つにつき1回切り替えます。

$ bin/kafka-console-producer.sh --broker-list 172.21.0.3:9092 --topic my-topic-with-key --property "parse.key=true" --property "key.separator=:"
>key1:value1
>key1:value2
>key1:value3
>key2:value4
>key2:value5
>key2:value6
>key3:value7
>key3:value8
>key3:value9

プロパティ「parse.key」をtrueにすることでキーのパースを有効に、セパレーターはプロパティ「key.separator」で指定することができます。
https://github.com/apache/kafka/blob/0.11.0.0/core/src/main/scala/kafka/tools/ConsoleProducer.scala#L299-L302

今回は、セパレーターを「:」としました。

データの取得。

$ bin/kafka-console-consumer.sh --bootstrap-server 172.21.0.3:9092 --topic my-topic-with-key --from-beginning
value1
value2
value3
value4
value5
value6
value7
value8
value9

今回は、順番どおりになりました、と。キーを指定された場合の振り分けの実装は、こちら。
https://github.com/apache/kafka/blob/0.11.0.0/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java#L68-L69

Trifectaで見ると、ちゃんと(?)振り分けられているのが確認できます。

キー指定なし。

キー指定あり。

ちょっと偏っていますが…ご愛嬌…。