こちらの続きで、Apache Kafkaで、ProducerやConsumerを実行中にBrokerを落とした時にどうなるか、その挙動を見てみようと
思います。
Apache Kafkaで、実行中にBrokerを追加した時のProducer/Consumerの挙動を確認する - CLOVER
環境とお題
基本的には、前述のエントリと同じです。
利用する、Apache Kafkaのバージョンは1.1.0として、Apache ZooKeeperをひとつ、Apache KafkaのBrokerを以下のような
体系で用意します。
Apache ZooKepper … 172.20.0.2
Apache Kafka … 172.20.0.3〜
前回のエントリとは、IPアドレス体系が少し変わりました…。
Apache KafkaのBrokerは最初は3つで、途中で2つにします。
この時に、Consumerはそのまま動き続けるか?とかを見れたらいいなと。
環境も、前回と同じ。
$ java -version openjdk version "1.8.0_162" OpenJDK Runtime Environment (build 1.8.0_162-8u162-b12-0ubuntu0.16.04.2-b12) OpenJDK 64-Bit Server VM (build 25.162-b12, mixed mode $ mvn -version Apache Maven 3.5.3 (3383c37e1f9e9b3bc3df5050c29c8aff9f295297; 2018-02-25T04:49:05+09:00) Maven home: /usr/local/maven3/current Java version: 1.8.0_162, vendor: Oracle Corporation Java home: /usr/lib/jvm/java-8-openjdk-amd64/jre Default locale: ja_JP, platform encoding: UTF-8 OS name: "linux", version: "4.4.0-104-generic", arch: "amd64", family: "unix"
サンプルプログラム
サンプルプログラムについても、前回のエントリと同じものを使います。
Maven依存関係はこちら。
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>1.1.0</version> </dependency>
Producer。
src/main/java/org/littlewings/kafka/SimpleLoader.java
package org.littlewings.kafka; import java.time.LocalDateTime; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.stream.IntStream; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.StringSerializer; public class SimpleLoader { public static void main(String... args) throws Exception { String bootstrapServers; String topicName; if (args.length > 1) { bootstrapServers = args[0]; topicName = args[1]; } else { bootstrapServers = "localhost:9092"; topicName = "my-topic"; } System.out.println("execute info = " + bootstrapServers + ", " + topicName); Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, "60000"); try (KafkaProducer<Integer, String> producer = new KafkaProducer<>(properties, new IntegerSerializer(), new StringSerializer())) { for (int i = 1; i < Integer.MAX_VALUE; i++) { producer.send(new ProducerRecord<>(topicName, null, "message-" + i)).get(); if (i % 30 == 0) { System.out.println(LocalDateTime.now() + " sleeping..."); TimeUnit.SECONDS.sleep(30L); } } } } }
30秒おきに、適当にメッセージを30個放り込むProducerです。起動引数では、「bootstrap.servers」とTopic名を
もらうようにしています。
Consumer側。
src/main/java/org/littlewings/kafka/SimpleConsumer.java
package org.littlewings.kafka; import java.util.Arrays; import java.util.Iterator; import java.util.Properties; import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; public class SimpleConsumer { public static void main(String... args) throws Exception { String bootstrapServers; String topicName; String groupId; if (args.length > 2) { bootstrapServers = args[0]; topicName = args[1]; groupId = args[2]; } else { bootstrapServers = "localhost:9092"; topicName = "my-topic"; groupId = "my-group"; } System.out.println("execute info = " + bootstrapServers + ", " + topicName+ ", " + groupId); Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); properties.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "60000"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); try (KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(properties)) { consumer.subscribe(Arrays.asList(topicName)); while (true) { ConsumerRecords<Integer, String> records = consumer.poll(1000L); Iterator<ConsumerRecord<Integer, String>> iterator = records.iterator(); while (iterator.hasNext()) { ConsumerRecord<Integer, String> record = iterator.next(); System.out.println(record.partition() + " / " + record.value()); } TimeUnit.SECONDS.sleep(1L); } } } }
こちらも、起動引数に「bootstrap.servers」とTopic名、Consumer Group名を受け取るようにしています。起動後は、ひたすら実行を
継続します。
出力するのは、メッセージを読み取ったパーティションのIDと、値です。
前回とまったく同じ。
Topicの作成とプログラム実行。
それでは、まずはTopicの作成。
$ bin/kafka-topics.sh --create --zookeeper 172.20.0.2:2181 --replication-factor 2 --partitions 5 --topic my-topic Created topic "my-topic".
確認。
$ bin/kafka-topics.sh --describe --zookeeper 172.20.0.2:2181 --topic my-topic Topic:my-topic PartitionCount:5 ReplicationFactor:2 Configs: Topic: my-topic Partition: 0 Leader: 5 Replicas: 5,3 Isr: 5,3 Topic: my-topic Partition: 1 Leader: 3 Replicas: 3,4 Isr: 3,4 Topic: my-topic Partition: 2 Leader: 4 Replicas: 4,5 Isr: 4,5 Topic: my-topic Partition: 3 Leader: 5 Replicas: 5,4 Isr: 5,4 Topic: my-topic Partition: 4 Leader: 3 Replicas: 3,5 Isr: 3,5
プログラムの実行。Consumer Groupは、「my-group」とします。
## Producer $ mvn compile exec:java -Dexec.mainClass=org.littlewings.kafka.SimpleLoader -Dexec.args='172.20.0.3:9092 my-topic' ## Consumer-1 $ mvn compile exec:java -Dexec.mainClass=org.littlewings.kafka.SimpleConsumer -Dexec.args='172.20.0.3:9092 my-top my-group' ## Consumer-2 $ mvn compile exec:java -Dexec.mainClass=org.littlewings.kafka.SimpleConsumer -Dexec.args='172.20.0.3:9092 my-top my-group' ## Consumer-3 $ mvn compile exec:java -Dexec.mainClass=org.littlewings.kafka.SimpleConsumer -Dexec.args='172.20.0.3:9092 my-top my-group'
各Consumerの割り当て状況。
$ bin/kafka-consumer-groups.sh --bootstrap-server 172.20.0.3:9092 --describe --group my-group Note: This will not show information about old Zookeeper-based consumers. TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID my-topic 2 7 12 5 consumer-1-8641226b-4da8-4036-9a96-b4f932ddb7f7 /172.20.0.1 consumer-1 my-topic 3 6 12 6 consumer-1-8641226b-4da8-4036-9a96-b4f932ddb7f7 /172.20.0.1 consumer-1 my-topic 0 6 12 6 consumer-1-3ced027b-fd9e-4eb8-bc6e-df276156401b /172.20.0.1 consumer-1 my-topic 1 6 12 6 consumer-1-3ced027b-fd9e-4eb8-bc6e-df276156401b /172.20.0.1 consumer-1 my-topic 4 6 12 6 consumer-1-b49ad28b-fcff-4986-bffa-b3683e0f0779 /172.20.0.1 consumer-1
Producer側は30秒おきにメッセージをTopicに放り込みつつ
2018-05-03T23:55:47.847 sleeping... 2018-05-03T23:56:18.048 sleeping...
Consumer側では、メッセージが取り出されて標準出力に出力され続けます。
## Consumer-1 0 / message-200 0 / message-205 0 / message-210 1 / message-183 1 / message-188 1 / message-193 ## Consumer-2 4 / message-157 4 / message-162 4 / message-167 4 / message-172 4 / message-177 ## Consumer-3 2 / message-181 2 / message-186 3 / message-184 2 / message-191 2 / message-196 2 / message-201 2 / message-206 3 / message-189
まあ、動いています、と。
Brokerをダウンさせる
では、ここでBrokerをひとつ落としてみましょう。
$ kill [PID]
Topicの状態を見ると、idが4のBrokerがいなくなりました。
bin/kafka-topics.sh --describe --zookeeper 172.20.0.2:2181 --topic my-topic' Topic:my-topic PartitionCount:5 ReplicationFactor:2 Configs: Topic: my-topic Partition: 0 Leader: 5 Replicas: 5,3 Isr: 5,3 Topic: my-topic Partition: 1 Leader: 3 Replicas: 3,4 Isr: 3 Topic: my-topic Partition: 2 Leader: 5 Replicas: 4,5 Isr: 5 Topic: my-topic Partition: 3 Leader: 5 Replicas: 5,4 Isr: 5 Topic: my-topic Partition: 4 Leader: 3 Replicas: 3,5 Isr: 3,5
ダウン前は、こちら。
$ bin/kafka-topics.sh --describe --zookeeper 172.20.0.2:2181 --topic my-topic Topic:my-topic PartitionCount:5 ReplicationFactor:2 Configs: Topic: my-topic Partition: 0 Leader: 5 Replicas: 5,3 Isr: 5,3 Topic: my-topic Partition: 1 Leader: 3 Replicas: 3,4 Isr: 3,4 Topic: my-topic Partition: 2 Leader: 4 Replicas: 4,5 Isr: 4,5 Topic: my-topic Partition: 3 Leader: 5 Replicas: 5,4 Isr: 5,4 Topic: my-topic Partition: 4 Leader: 3 Replicas: 3,5 Isr: 3,5
Consumer Groupにおける、Consumerの割り当ては、特に変わらなかったようです。
$ bin/kafka-consumer-groups.sh --bootstrap-server 172.20.0.3:9092 --describe --group my-group Note: This will not show information about old Zookeeper-based consumers. TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID my-topic 2 78 78 0 consumer-1-8641226b-4da8-4036-9a96-b4f932ddb7f7 /172.20.0.1 consumer-1 my-topic 3 78 78 0 consumer-1-8641226b-4da8-4036-9a96-b4f932ddb7f7 /172.20.0.1 consumer-1 my-topic 0 78 78 0 consumer-1-3ced027b-fd9e-4eb8-bc6e-df276156401b /172.20.0.1 consumer-1 my-topic 1 78 78 0 consumer-1-3ced027b-fd9e-4eb8-bc6e-df276156401b /172.20.0.1 consumer-1 my-topic 4 78 78 0 consumer-1-b49ad28b-fcff-4986-bffa-b3683e0f0779 /172.20.0.1 consumer-1
なお、BrokerおよびConsumerはふつうに動き続けていました。
## Consumer-1 0 / message-500 0 / message-505 0 / message-510 1 / message-483 1 / message-488 1 / message-493 1 / message-498 ## Consumer-2 4 / message-427 4 / message-432 4 / message-437 4 / message-442 4 / message-447 4 / message-452 ## Consumer-3 2 / message-491 2 / message-496 2 / message-501 2 / message-506 3 / message-484 3 / message-489 3 / message-494
リバランスする
ところで、Topicの状態を見るとBroker間でパーティションの割り当てが偏ったままです。
$ bin/kafka-topics.sh --describe --zookeeper 172.20.0.2:2181 --topic my-topic Topic:my-topic PartitionCount:5 ReplicationFactor:2 Configs: Topic: my-topic Partition: 0 Leader: 5 Replicas: 5,3 Isr: 5,3 Topic: my-topic Partition: 1 Leader: 3 Replicas: 3,4 Isr: 3 Topic: my-topic Partition: 2 Leader: 5 Replicas: 4,5 Isr: 5 Topic: my-topic Partition: 3 Leader: 5 Replicas: 5,4 Isr: 5 Topic: my-topic Partition: 4 Leader: 3 Replicas: 3,5 Isr: 3,5
Leaderから4がいなくなったものの、レプリカにはまだ4のBrokerがいる状態になっています。
では、これをリバランスしましょう。
次の内容のJSONを作成。
topics-to-move.json
{ "topics": [{ "topic": "my-topic" }], "version": 1 }
作成したJSONを元に、次のコマンドを実行。「--broker-list」に指定するBrokerは、残った3と5にしています。
$ bin/kafka-reassign-partitions.sh --zookeeper 172.20.0.2:2181 --topics-to-move-json-file topics-to-move.json --broker-list=3,5 --generate Current partition replica assignment {"version":1,"partitions":[{"topic":"my-topic","partition":2,"replicas":[4,5],"log_dirs":["any","any"]},{"topic":"my-topic","partition":1,"replicas":[3,4],"log_dirs":["any","any"]},{"topic":"my-topic","partition":0,"replicas":[5,3],"log_dirs":["any","any"]},{"topic":"my-topic","partition":4,"replicas":[3,5],"log_dirs":["any","any"]},{"topic":"my-topic","partition":3,"replicas":[5,4],"log_dirs":["any","any"]}]} Proposed partition reassignment configuration {"version":1,"partitions":[{"topic":"my-topic","partition":2,"replicas":[3,5],"log_dirs":["any","any"]},{"topic":"my-topic","partition":4,"replicas":[3,5],"log_dirs":["any","any"]},{"topic":"my-topic","partition":1,"replicas":[5,3],"log_dirs":["any","any"]},{"topic":"my-topic","partition":3,"replicas":[5,3],"log_dirs":["any","any"]},{"topic":"my-topic","partition":0,"replicas":[3,5],"log_dirs":["any","any"]}]}
で、「Proposed partition reassignment configuration」で表示されたJSONの内容をファイルにして
expand-cluster-reassignment.json
{"version":1,"partitions":[{"topic":"my-topic","partition":2,"replicas":[3,5],"log_dirs":["any","any"]},{"topic":"my-topic","partition":4,"replicas":[3,5],"log_dirs":["any","any"]},{"topic":"my-topic","partition":1,"replicas":[5,3],"log_dirs":["any","any"]},{"topic":"my-topic","partition":3,"replicas":[5,3],"log_dirs":["any","any"]},{"topic":"my-topic","partition":0,"replicas":[3,5],"log_dirs":["any","any"]}]}
実行。
$ bin/kafka-reassign-partitions.sh --zookeeper 172.20.0.2:2181 --reassignment-json-file expand-cluster-reassignment.json --execute Current partition replica assignment {"version":1,"partitions":[{"topic":"my-topic","partition":2,"replicas":[4,5],"log_dirs":["any","any"]},{"topic":"my-topic","partition":1,"replicas":[3,4],"log_dirs":["any","any"]},{"topic":"my-topic","partition":0,"replicas":[5,3],"log_dirs":["any","any"]},{"topic":"my-topic","partition":4,"replicas":[3,5],"log_dirs":["any","any"]},{"topic":"my-topic","partition":3,"replicas":[5,4],"log_dirs":["any","any"]}]} Save this to use as the --reassignment-json-file option during rollback Successfully started reassignment of partitions.
再割り当て完了です。
$ bin/kafka-topics.sh --describe --zookeeper 172.20.0.2:2181 --topic my-topic Topic:my-topic PartitionCount:5 ReplicationFactor:2 Configs: Topic: my-topic Partition: 0 Leader: 5 Replicas: 3,5 Isr: 5,3 Topic: my-topic Partition: 1 Leader: 3 Replicas: 5,3 Isr: 3,5 Topic: my-topic Partition: 2 Leader: 5 Replicas: 3,5 Isr: 5,3 Topic: my-topic Partition: 3 Leader: 5 Replicas: 5,3 Isr: 5,3 Topic: my-topic Partition: 4 Leader: 3 Replicas: 3,5 Isr: 3,5
まとめ
ProducerやConsumerの実行中に、Brokerを落としてみましたが、今回はそのまま動作しました。ただ、時々Consumer側がなにも拾わなくなることがあり、ちょっと
怖い感じがします。
あと、Brokerが落ちてもリバランスが自動で行われるわけではないので、そこをリバランスしないままさらにもうひとつBrokerが落ちたりすると、データの分布に
よってはロストしたりするんでしょうね。
やっぱり、Nodeが落ちた時の挙動は気になるものですね…。