CLOVER🍀

That was when it all began.

Apache Kafkaで、クラスタメンバーの増減とPartitionの再割り当てをやってみる

Apache Kafkaでクラスタのメンバーの増減や、Partitionの再割り当てみたいなことをやってみたいなぁと思い、ちょっと
試してみることにしました。

環境

Apache Kafkaのバージョンは、0.11.0.1とします。

また、Apache ZooKeeper、Apache Kafka(Broker)のIPアドレスは、次のようなものとします。

  • Apache ZooKepper … 172.21.0.2
  • Apache Kafka … 172.21.0.3〜

Apache KafkaのBrokerのIDは、IPアドレスの末尾とします(172.21.0.3なら、3)。

これで、Apache KafkaのNodeを増減させたりして、どのようになるかちょっと見てみたいと思います。

メッセージ投入用のプログラム

Apache Kafka(Broker)へのメッセージの投入には、Javaで書いたクライアントプログラムを使うことにします。

Maven依存関係。

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.11.0.1</version>
        </dependency>

起動引数で指定されたメッセージ数を、Apache Kafkaに投げ込むプログラムです。
src/main/java/org/littlewings/kafka/SimpleLoader.java

package org.littlewings.kafka;

import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.stream.IntStream;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;

public class SimpleLoader {
    public static void main(String... args) {
        int messageSize;
        if (args.length > 0) {
            messageSize = Integer.parseInt(args[0]);
        } else {
            messageSize = 10;
        }

        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.21.0.3:9092");

        try (KafkaProducer<Integer, String> producer =
                     new KafkaProducer<>(properties, new IntegerSerializer(), new StringSerializer())) {

            IntStream
                    .rangeClosed(1, messageSize)
                    .forEach(i -> {
                        try {
                            producer.send(new ProducerRecord<>("my-topic", null, "message-" + i)).get();
                        } catch (InterruptedException | ExecutionException e) {
                            throw new RuntimeException(e);
                        }
                    });
        }
    }
}

では、これで。

最初のクラスタを構成する

まずは、基点となるApache Kafkaのクラスタを構成しましょう。ちょっと数は微妙ですが、Brokerを2つとします。

Topicの作成。Partition数は9、Replication Factorは2とします。

$ bin/kafka-topics.sh --create --zookeeper 172.21.0.2:2181 --replication-factor 2 --partitions 9 --topic my-topic
Created topic "my-topic".

確認。

$ bin/kafka-topics.sh --describe --zookeeper 172.21.0.2:2181 --topic my-topic
Topic:my-topic	PartitionCount:9	ReplicationFactor:2	Configs:
	Topic: my-topic	Partition: 0	Leader: 4	Replicas: 4,3	Isr: 4,3
	Topic: my-topic	Partition: 1	Leader: 3	Replicas: 3,4	Isr: 3,4
	Topic: my-topic	Partition: 2	Leader: 4	Replicas: 4,3	Isr: 4,3
	Topic: my-topic	Partition: 3	Leader: 3	Replicas: 3,4	Isr: 3,4
	Topic: my-topic	Partition: 4	Leader: 4	Replicas: 4,3	Isr: 4,3
	Topic: my-topic	Partition: 5	Leader: 3	Replicas: 3,4	Isr: 3,4
	Topic: my-topic	Partition: 6	Leader: 4	Replicas: 4,3	Isr: 4,3
	Topic: my-topic	Partition: 7	Leader: 3	Replicas: 3,4	Isr: 3,4
	Topic: my-topic	Partition: 8	Leader: 4	Replicas: 4,3	Isr: 4,3

それぞれの意味は、こうでした。

"leader" is the node responsible for all reads and writes for the given partition. Each node will be the leader for a randomly selected portion of the partitions.
"replicas" is the list of nodes that replicate the log for this partition regardless of whether they are the leader or even if they are currently alive.
"isr" is the set of "in-sync" replicas. This is the subset of the replicas list that is currently alive and caught-up to the leader.

https://kafka.apache.org/quickstart#quickstart_multibroker

Isrが、リーダーに追いついているレプリカのセットですね。

メッセージを放り込みます。まず、全部で100メッセージ放り込んでみました。

$ mvn compile exec:java -Dexec.mainClass=org.littlewings.kafka.SimpleLoader -Dexec.args=100

確認。

$ bin/kafka-console-consumer.sh --bootstrap-server 172.21.0092 --topic my-topic --from-beginning | wc -l
101

+1なのは、最後にこんなのが入るからです(Ctrl+Cで止めてます)。

Processed a total of 100 messages

Partition単位で確認。

$ bin/kafka-console-consumer.sh --bootstrap-server 172.21.0.3:9092 --topic my-topic --from-beginning --partition 0 | wc -l
12

$ bin/kafka-console-consumer.sh --bootstrap-server 172.21.0.3:9092 --topic my-topic --from-beginning --partition 3 | wc -l
12

$ bin/kafka-console-consumer.sh --bootstrap-server 172.21.0.3:9092 --topic my-topic --from-beginning --partition 8 | wc -l
13

Nodeを減らしてみる

では次に、Broker Nodeをひとつ落としてみます。

describeしてみましょう。

$ bin/kafka-topics.sh --describe --zookeeper 172.21.0.2:2181 --topic my-topic
Topic:my-topic	PartitionCount:9	ReplicationFactor:2	Configs:
	Topic: my-topic	Partition: 0	Leader: 3	Replicas: 4,3	Isr: 3
	Topic: my-topic	Partition: 1	Leader: 3	Replicas: 3,4	Isr: 3
	Topic: my-topic	Partition: 2	Leader: 3	Replicas: 4,3	Isr: 3
	Topic: my-topic	Partition: 3	Leader: 3	Replicas: 3,4	Isr: 3
	Topic: my-topic	Partition: 4	Leader: 3	Replicas: 4,3	Isr: 3
	Topic: my-topic	Partition: 5	Leader: 3	Replicas: 3,4	Isr: 3
	Topic: my-topic	Partition: 6	Leader: 3	Replicas: 4,3	Isr: 3
	Topic: my-topic	Partition: 7	Leader: 3	Replicas: 3,4	Isr: 3
	Topic: my-topic	Partition: 8	Leader: 3	Replicas: 4,3	Isr: 3

IDが4のBrokerがいなくなりました。

メッセージ自体は、問題なく取得できます。
※要「--new-consumer」?

$ bin/kafka-console-consumer.sh --bootstrap-server 172.21.0.3:9092 --topic my-topic --from-beginning --new-consumer | wc -l
101

Broker Nodeを追加してみる

では、ここで、さらにBroker Nodeを追加してみます。

1台追加しても、describeの結果は変わりません。
クラスタには、Broker idが「5」のNodeが追加されています

bin/kafka-topics.sh --describe --zookeeper 172.21.0.2:2181 --topic my-topic
Topic:my-topic	PartitionCount:9	ReplicationFactor:2	Configs:
	Topic: my-topic	Partition: 0	Leader: 3	Replicas: 4,3	Isr: 3
	Topic: my-topic	Partition: 1	Leader: 3	Replicas: 3,4	Isr: 3
	Topic: my-topic	Partition: 2	Leader: 3	Replicas: 4,3	Isr: 3
	Topic: my-topic	Partition: 3	Leader: 3	Replicas: 3,4	Isr: 3
	Topic: my-topic	Partition: 4	Leader: 3	Replicas: 4,3	Isr: 3
	Topic: my-topic	Partition: 5	Leader: 3	Replicas: 3,4	Isr: 3
	Topic: my-topic	Partition: 6	Leader: 3	Replicas: 4,3	Isr: 3
	Topic: my-topic	Partition: 7	Leader: 3	Replicas: 3,4	Isr: 3
	Topic: my-topic	Partition: 8	Leader: 3	Replicas: 4,3	Isr: 3

追加でメッセージを投げ込んでも

$ mvn compile exec:java -Dexec.mainClass=org.littlewings.kafka.SimpleLoader -Dexec.args=100

特に変わりませんし、

$ bin/kafka-topics.sh --describe --zookeeper 172.21.0.2:2181 --topic my-topic
Topic:my-topic	PartitionCount:9	ReplicationFactor:2	Configs:
	Topic: my-topic	Partition: 0	Leader: 3	Replicas: 4,3	Isr: 3
	Topic: my-topic	Partition: 1	Leader: 3	Replicas: 3,4	Isr: 3
	Topic: my-topic	Partition: 2	Leader: 3	Replicas: 4,3	Isr: 3
	Topic: my-topic	Partition: 3	Leader: 3	Replicas: 3,4	Isr: 3
	Topic: my-topic	Partition: 4	Leader: 3	Replicas: 4,3	Isr: 3
	Topic: my-topic	Partition: 5	Leader: 3	Replicas: 3,4	Isr: 3
	Topic: my-topic	Partition: 6	Leader: 3	Replicas: 4,3	Isr: 3
	Topic: my-topic	Partition: 7	Leader: 3	Replicas: 3,4	Isr: 3
	Topic: my-topic	Partition: 8	Leader: 3	Replicas: 4,3	Isr: 3

メッセージ自体は登録されています。

$ bin/kafka-console-consumer.sh --bootstrap-server 172.21.0.3:9092 --topic my-topic --from-beginning --new-consumer | wc -l
201

Partitionを追加してみる

Partitionを追加してみましょう。

Modifying topics

kafka-topics.shのalterで、Partition数を変更することができます。

$ bin/kafka-topics.sh --zookeeper 172.21.0.2:2181 --topic my-topic --alter --partitions 12
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!

describeすると、ちょっと結果が変わります。

$ bin/kafka-topics.sh --describe --zookeeper 172.21.0.2:2181 --topic my-topic'
Topic:my-topic	PartitionCount:12	ReplicationFactor:2	Configs:
	Topic: my-topic	Partition: 0	Leader: 3	Replicas: 4,3	Isr: 3
	Topic: my-topic	Partition: 1	Leader: 3	Replicas: 3,4	Isr: 3
	Topic: my-topic	Partition: 2	Leader: 3	Replicas: 4,3	Isr: 3
	Topic: my-topic	Partition: 3	Leader: 3	Replicas: 3,4	Isr: 3
	Topic: my-topic	Partition: 4	Leader: 3	Replicas: 4,3	Isr: 3
	Topic: my-topic	Partition: 5	Leader: 3	Replicas: 3,4	Isr: 3
	Topic: my-topic	Partition: 6	Leader: 3	Replicas: 4,3	Isr: 3
	Topic: my-topic	Partition: 7	Leader: 3	Replicas: 3,4	Isr: 3
	Topic: my-topic	Partition: 8	Leader: 3	Replicas: 4,3	Isr: 3
	Topic: my-topic	Partition: 9	Leader: 3	Replicas: 3,5	Isr: 3,5
	Topic: my-topic	Partition: 10	Leader: 5	Replicas: 5,3	Isr: 5,3
	Topic: my-topic	Partition: 11	Leader: 3	Replicas: 3,5	Isr: 3,5

でも、クラスタの様子はなんか偏ったままです。

クラスタをリバランスする

この状態のクラスタをリバランスしてみましょう。

Expanding your cluster

なんとなく、メッセージも追加しておきます。

$ mvn compile exec:java -Dexec.mainClass=org.littlewings.kafka.SimpleLoader -Dexec.args=100

ドキュメントに習い、こんなファイルを用意。
topics-to-move.json

{
  "topics": [{ "topic": "my-topic" }],
  "version": 1
}

「topics」には、リバランス対象のTopic名を指定したオブジェクトを配列で指定するようです。つまり、複数のTopicを1度に扱えそうな
感じですね。「version」については、特に説明がありませんでした…。

「kafka-reassign-partitions.sh」を使用し、「--generate」を実行。「--broker-list」では、対象のBrokerのidを「,」区切りで指定します。

$ bin/kafka-reassign-partitions.sh --zookeeper 172.21.0.2:2181 --topics-to-move-json-file topics-to-move.json --broker-list "3,5" --generate
Current partition replica assignment
{"version":1,"partitions":[{"topic":"my-topic","partition":1,"replicas":[3,4]},{"topic":"my-topic","partition":10,"replicas":[5,3]},{"topic":"my-topic","partition":5,"replicas":[3,4]},{"topic":"my-topic","partition":4,"replicas":[4,3]},{"topic":"my-topic","partition":3,"replicas":[3,4]},{"topic":"my-topic","partition":7,"replicas":[3,4]},{"topic":"my-topic","partition":0,"replicas":[4,3]},{"topic":"my-topic","partition":6,"replicas":[4,3]},{"topic":"my-topic","partition":11,"replicas":[3,5]},{"topic":"my-topic","partition":8,"replicas":[4,3]},{"topic":"my-topic","partition":2,"replicas":[4,3]},{"topic":"my-topic","partition":9,"replicas":[3,5]}]}

Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"my-topic","partition":1,"replicas":[5,3]},{"topic":"my-topic","partition":10,"replicas":[3,5]},{"topic":"my-topic","partition":5,"replicas":[5,3]},{"topic":"my-topic","partition":4,"replicas":[3,5]},{"topic":"my-topic","partition":3,"replicas":[5,3]},{"topic":"my-topic","partition":6,"replicas":[3,5]},{"topic":"my-topic","partition":7,"replicas":[5,3]},{"topic":"my-topic","partition":0,"replicas":[3,5]},{"topic":"my-topic","partition":11,"replicas":[5,3]},{"topic":"my-topic","partition":8,"replicas":[3,5]},{"topic":"my-topic","partition":2,"replicas":[3,5]},{"topic":"my-topic","partition":9,"replicas":[5,3]}]}

この結果のうち、「Proposed partition reassignment configuration」と書かれた方をファイルに保存しておきます。
expand-cluster-reassignment.json

{"version":1,"partitions":[{"topic":"my-topic","partition":1,"replicas":[5,3]},{"topic":"my-topic","partition":10,"replicas":[3,5]},{"topic":"my-topic","partition":5,"replicas":[5,3]},{"topic":"my-topic","partition":4,"replicas":[3,5]},{"topic":"my-topic","partition":3,"replicas":[5,3]},{"topic":"my-topic","partition":6,"replicas":[3,5]},{"topic":"my-topic","partition":7,"replicas":[5,3]},{"topic":"my-topic","partition":0,"replicas":[3,5]},{"topic":"my-topic","partition":11,"replicas":[5,3]},{"topic":"my-topic","partition":8,"replicas":[3,5]},{"topic":"my-topic","partition":2,"replicas":[3,5]},{"topic":"my-topic","partition":9,"replicas":[5,3]}]}

この時点では、特にリバランスは行われていません。

$ bin/kafka-topics.sh --describe --zookeeper 172.21.0.2:2181 --topic my-topic
Topic:my-topic	PartitionCount:12	ReplicationFactor:2	Configs:
	Topic: my-topic	Partition: 0	Leader: 3	Replicas: 4,3	Isr: 3
	Topic: my-topic	Partition: 1	Leader: 3	Replicas: 3,4	Isr: 3
	Topic: my-topic	Partition: 2	Leader: 3	Replicas: 4,3	Isr: 3
	Topic: my-topic	Partition: 3	Leader: 3	Replicas: 3,4	Isr: 3
	Topic: my-topic	Partition: 4	Leader: 3	Replicas: 4,3	Isr: 3
	Topic: my-topic	Partition: 5	Leader: 3	Replicas: 3,4	Isr: 3
	Topic: my-topic	Partition: 6	Leader: 3	Replicas: 4,3	Isr: 3
	Topic: my-topic	Partition: 7	Leader: 3	Replicas: 3,4	Isr: 3
	Topic: my-topic	Partition: 8	Leader: 3	Replicas: 4,3	Isr: 3
	Topic: my-topic	Partition: 9	Leader: 3	Replicas: 3,5	Isr: 3,5
	Topic: my-topic	Partition: 10	Leader: 5	Replicas: 5,3	Isr: 5,3
	Topic: my-topic	Partition: 11	Leader: 3	Replicas: 3,5	Isr: 3,5

では、リバランスしてみましょう。先ほど「--generate」の結果、出力したファイルを今度は「--execute」で指定します。

$ bin/kafka-reassign-partitions.sh --zookeeper 172.21.0.2:2181 --reassignment-json-file expand-cluster-reassignment.json --execute
Current partition replica assignment

{"version":1,"partitions":[{"topic":"my-topic","partition":1,"replicas":[3,4]},{"topic":"my-topic","partition":10,"replicas":[5,3]},{"topic":"my-topic","partition":5,"replicas":[3,4]},{"topic":"my-topic","partition":4,"replicas":[4,3]},{"topic":"my-topic","partition":3,"replicas":[3,4]},{"topic":"my-topic","partition":7,"replicas":[3,4]},{"topic":"my-topic","partition":0,"replicas":[4,3]},{"topic":"my-topic","partition":6,"replicas":[4,3]},{"topic":"my-topic","partition":11,"replicas":[3,5]},{"topic":"my-topic","partition":8,"replicas":[4,3]},{"topic":"my-topic","partition":2,"replicas":[4,3]},{"topic":"my-topic","partition":9,"replicas":[3,5]}]}

Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions.

リバランスされました。

bin/kafka-topics.sh --describe --zookeeper 172.21.0.2:2181 --topic my-topic'
Topic:my-topic	PartitionCount:12	ReplicationFactor:2	Configs:
	Topic: my-topic	Partition: 0	Leader: 3	Replicas: 3,5	Isr: 3,5
	Topic: my-topic	Partition: 1	Leader: 3	Replicas: 5,3	Isr: 3,5
	Topic: my-topic	Partition: 2	Leader: 3	Replicas: 3,5	Isr: 3,5
	Topic: my-topic	Partition: 3	Leader: 3	Replicas: 5,3	Isr: 3,5
	Topic: my-topic	Partition: 4	Leader: 3	Replicas: 3,5	Isr: 3,5
	Topic: my-topic	Partition: 5	Leader: 3	Replicas: 5,3	Isr: 3,5
	Topic: my-topic	Partition: 6	Leader: 3	Replicas: 3,5	Isr: 3,5
	Topic: my-topic	Partition: 7	Leader: 3	Replicas: 5,3	Isr: 3,5
	Topic: my-topic	Partition: 8	Leader: 3	Replicas: 3,5	Isr: 3,5
	Topic: my-topic	Partition: 9	Leader: 3	Replicas: 5,3	Isr: 3,5
	Topic: my-topic	Partition: 10	Leader: 5	Replicas: 3,5	Isr: 5,3
	Topic: my-topic	Partition: 11	Leader: 3	Replicas: 5,3	Isr: 3,5

最後に「--verify」でちゃんとリバランスされたか、確認してみましょう。

$ bin/kafka-reassign-partitions.sh --zookeeper 172.21.0.2:2181 --reassignment-json-file expand-cluster-reassignment.json --verify
Status of partition reassignment: 
Reassignment of partition [my-topic,5] completed successfully
Reassignment of partition [my-topic,0] completed successfully
Reassignment of partition [my-topic,7] completed successfully
Reassignment of partition [my-topic,4] completed successfully
Reassignment of partition [my-topic,6] completed successfully
Reassignment of partition [my-topic,11] completed successfully
Reassignment of partition [my-topic,10] completed successfully
Reassignment of partition [my-topic,9] completed successfully
Reassignment of partition [my-topic,2] completed successfully
Reassignment of partition [my-topic,3] completed successfully
Reassignment of partition [my-topic,1] completed successfully
Reassignment of partition [my-topic,8] completed successfully

OKのようです。

まとめ

Apache Kafkaで、クラスタのメンバーをダウンさせたり追加したり、Partitionの操作などを初めてやってみました。

あとは、Replication Factorを変えたり

Increasing replication factor

データマイグレーション時の帯域制限などもできるようですね。

Limiting Bandwidth Usage during Data Migration

実際にやる時にはもっといろいろ気にするところがあるのだとは思いますが…最初の一歩としては、とりあえずこんな感じでしょう。

参考)
How to Rebalance Topics in a Kafka Cluster – experience@imaginea