Apache Kafkaには付属のツールとして、コンソールを使ったProducer/Consumerが付いています。
このうち、Producer(kafka-console-producer.sh)を使った時に、キーを指定してみようというお話。
まずは、Apache Kafkaのクラスタを用意します。Brokerは3つ、Apache ZooKeeperはひとつとします。
Topicを作成。最初はキーを指定しないパターンでやってみます。Partition数は3、レプリケーションは2とします。
$ bin/kafka-topics.sh --create --zookeeper 172.21.0.2:2181 --replication-factor 2 --partitions 3 --topic my-topic Created topic "my-topic".
確認。
$ bin/kafka-topics.sh --describe --zookeeper 172.21.0.2:2181 --topic my-topic Topic:my-topic PartitionCount:3 ReplicationFactor:2 Configs: Topic: my-topic Partition: 0 Leader: 3 Replicas: 3,5 Isr: 3,5 Topic: my-topic Partition: 1 Leader: 4 Replicas: 4,3 Isr: 4,3 Topic: my-topic Partition: 2 Leader: 5 Replicas: 5,4 Isr: 5,4
Producer(kafka-console-producer.sh)でデータを登録してみます。
$ bin/kafka-console-producer.sh --broker-list 172.21.0.3:9092 --topic my-topic >value1 >value2 >value3 >value4 >value5 >value6 >value7 >value8 >value9
Consumer(bin/kafka-console-consumer.sh)で、最初からデータを読み出してみます。
$ bin/kafka-console-consumer.sh --bootstrap-server 172.21.0.3:9092 --topic my-topic --from-beginning value1 value4 value7 value2 value5 value8 value3 value6 value9
けっこうすごい順番になりました。パーティションごとにデータを抜いてきている感じがしますねぇ。
なお、キー指定がない場合のパーティション振り分けは、ラウンドロビンなのでした。
https://github.com/apache/kafka/blob/0.11.0.0/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java#L57-L66
今度は、キーを指定するようにしてみます。先ほどとは別のTopicを作ってみましょう。
$ bin/kafka-topics.sh --create --zookeeper 172.21.0.2:2181 --replication-factor 2 --partitions 3 --topic my-topic-with-key Created topic "my-topic-with-key".
Partition数とレプリケーション数は同じです。
describe。
$ bin/kafka-topics.sh --describe --zookeeper 172.21.0.2:2181 --topic my-topic-with-key Topic:my-topic-with-key PartitionCount:3 ReplicationFactor:2 Configs: Topic: my-topic-with-key Partition: 0 Leader: 4 Replicas: 4,5 Isr: 4,5 Topic: my-topic-with-key Partition: 1 Leader: 5 Replicas: 5,3 Isr: 5,3 Topic: my-topic-with-key Partition: 2 Leader: 3 Replicas: 3,4 Isr: 3,4
データ登録。キーは、データ3つにつき1回切り替えます。
$ bin/kafka-console-producer.sh --broker-list 172.21.0.3:9092 --topic my-topic-with-key --property "parse.key=true" --property "key.separator=:" >key1:value1 >key1:value2 >key1:value3 >key2:value4 >key2:value5 >key2:value6 >key3:value7 >key3:value8 >key3:value9
プロパティ「parse.key」をtrueにすることでキーのパースを有効に、セパレーターはプロパティ「key.separator」で指定することができます。
https://github.com/apache/kafka/blob/0.11.0.0/core/src/main/scala/kafka/tools/ConsoleProducer.scala#L299-L302
今回は、セパレーターを「:」としました。
データの取得。
$ bin/kafka-console-consumer.sh --bootstrap-server 172.21.0.3:9092 --topic my-topic-with-key --from-beginning value1 value2 value3 value4 value5 value6 value7 value8 value9
今回は、順番どおりになりました、と。キーを指定された場合の振り分けの実装は、こちら。
https://github.com/apache/kafka/blob/0.11.0.0/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java#L68-L69
Trifectaで見ると、ちゃんと(?)振り分けられているのが確認できます。
ちょっと偏っていますが…ご愛嬌…。