CLOVER🍀

That was when it all began.

Apache Kafkaで、実行中にBrokerを落とした時のProducer/Consumerの挙動を確認する

こちらの続きで、Apache Kafkaで、ProducerやConsumerを実行中にBrokerを落とした時にどうなるか、その挙動を見てみようと
思います。

Apache Kafkaで、実行中にBrokerを追加した時のProducer/Consumerの挙動を確認する - CLOVER

環境とお題

基本的には、前述のエントリと同じです。

利用する、Apache Kafkaのバージョンは1.1.0として、Apache ZooKeeperをひとつ、Apache KafkaのBrokerを以下のような
体系で用意します。

Apache ZooKepper … 172.20.0.2
Apache Kafka … 172.20.0.3〜

前回のエントリとは、IPアドレス体系が少し変わりました…。

Apache KafkaのBrokerは最初は3つで、途中で2つにします。

この時に、Consumerはそのまま動き続けるか?とかを見れたらいいなと。

環境も、前回と同じ。

$ java -version
openjdk version "1.8.0_162"
OpenJDK Runtime Environment (build 1.8.0_162-8u162-b12-0ubuntu0.16.04.2-b12)
OpenJDK 64-Bit Server VM (build 25.162-b12, mixed mode

$ mvn -version
Apache Maven 3.5.3 (3383c37e1f9e9b3bc3df5050c29c8aff9f295297; 2018-02-25T04:49:05+09:00)
Maven home: /usr/local/maven3/current
Java version: 1.8.0_162, vendor: Oracle Corporation
Java home: /usr/lib/jvm/java-8-openjdk-amd64/jre
Default locale: ja_JP, platform encoding: UTF-8
OS name: "linux", version: "4.4.0-104-generic", arch: "amd64", family: "unix"

サンプルプログラム

サンプルプログラムについても、前回のエントリと同じものを使います。

Maven依存関係はこちら。

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>1.1.0</version>
        </dependency>

Producer。
src/main/java/org/littlewings/kafka/SimpleLoader.java

package org.littlewings.kafka;

import java.time.LocalDateTime;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;

public class SimpleLoader {
    public static void main(String... args) throws Exception {
        String bootstrapServers;
        String topicName;

        if (args.length > 1) {
            bootstrapServers = args[0];
            topicName = args[1];
        } else {
            bootstrapServers = "localhost:9092";
            topicName = "my-topic";
        }

        System.out.println("execute info = " + bootstrapServers + ", " + topicName);

        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, "60000");

        try (KafkaProducer<Integer, String> producer =
                     new KafkaProducer<>(properties, new IntegerSerializer(), new StringSerializer())) {

            for (int i = 1; i < Integer.MAX_VALUE; i++) {
                producer.send(new ProducerRecord<>(topicName, null, "message-" + i)).get();

                if (i % 30 == 0) {
                    System.out.println(LocalDateTime.now() + " sleeping...");
                    TimeUnit.SECONDS.sleep(30L);
                }
            }
        }
    }
}

30秒おきに、適当にメッセージを30個放り込むProducerです。起動引数では、「bootstrap.servers」とTopic名を
もらうようにしています。

Consumer側。
src/main/java/org/littlewings/kafka/SimpleConsumer.java

package org.littlewings.kafka;

import java.util.Arrays;
import java.util.Iterator;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords; 
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;

public class SimpleConsumer {
    public static void main(String... args) throws Exception {
        String bootstrapServers;
        String topicName;
        String groupId;

        if (args.length > 2) {
            bootstrapServers = args[0];
            topicName = args[1];
            groupId = args[2];
        } else {
            bootstrapServers = "localhost:9092";
            topicName = "my-topic";
            groupId = "my-group";
        }

        System.out.println("execute info = " + bootstrapServers + ", " + topicName+ ", " + groupId);

        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        properties.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "60000");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        try (KafkaConsumer<Integer, String> consumer =
             new KafkaConsumer<>(properties)) {
            consumer.subscribe(Arrays.asList(topicName));

            while (true) {
                ConsumerRecords<Integer, String> records = consumer.poll(1000L);

                Iterator<ConsumerRecord<Integer, String>> iterator = records.iterator();

                while (iterator.hasNext()) {
                    ConsumerRecord<Integer, String> record = iterator.next();
                    System.out.println(record.partition() + " / "  + record.value());
                }

                TimeUnit.SECONDS.sleep(1L);
            }
        }
    }
}

こちらも、起動引数に「bootstrap.servers」とTopic名、Consumer Group名を受け取るようにしています。起動後は、ひたすら実行を
継続します。

出力するのは、メッセージを読み取ったパーティションのIDと、値です。

前回とまったく同じ。

Topicの作成とプログラム実行。

それでは、まずはTopicの作成。

$ bin/kafka-topics.sh --create --zookeeper 172.20.0.2:2181 --replication-factor 2 --partitions 5 --topic my-topic
Created topic "my-topic".

確認。

$ bin/kafka-topics.sh --describe --zookeeper 172.20.0.2:2181 --topic my-topic
Topic:my-topic	PartitionCount:5	ReplicationFactor:2	Configs:
	Topic: my-topic	Partition: 0	Leader: 5	Replicas: 5,3	Isr: 5,3
	Topic: my-topic	Partition: 1	Leader: 3	Replicas: 3,4	Isr: 3,4
	Topic: my-topic	Partition: 2	Leader: 4	Replicas: 4,5	Isr: 4,5
	Topic: my-topic	Partition: 3	Leader: 5	Replicas: 5,4	Isr: 5,4
	Topic: my-topic	Partition: 4	Leader: 3	Replicas: 3,5	Isr: 3,5

プログラムの実行。Consumer Groupは、「my-group」とします。

## Producer
$ mvn compile exec:java -Dexec.mainClass=org.littlewings.kafka.SimpleLoader -Dexec.args='172.20.0.3:9092 my-topic'

## Consumer-1
$ mvn compile exec:java -Dexec.mainClass=org.littlewings.kafka.SimpleConsumer -Dexec.args='172.20.0.3:9092 my-top my-group'

## Consumer-2
$ mvn compile exec:java -Dexec.mainClass=org.littlewings.kafka.SimpleConsumer -Dexec.args='172.20.0.3:9092 my-top my-group'

## Consumer-3
$ mvn compile exec:java -Dexec.mainClass=org.littlewings.kafka.SimpleConsumer -Dexec.args='172.20.0.3:9092 my-top my-group'

各Consumerの割り当て状況。

$ bin/kafka-consumer-groups.sh --bootstrap-server 172.20.0.3:9092 --describe --group my-group
Note: This will not show information about old Zookeeper-based consumers.

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-ID
my-topic        2          7               12              5               consumer-1-8641226b-4da8-4036-9a96-b4f932ddb7f7 /172.20.0.1     consumer-1
my-topic        3          6               12              6               consumer-1-8641226b-4da8-4036-9a96-b4f932ddb7f7 /172.20.0.1     consumer-1
my-topic        0          6               12              6               consumer-1-3ced027b-fd9e-4eb8-bc6e-df276156401b /172.20.0.1     consumer-1
my-topic        1          6               12              6               consumer-1-3ced027b-fd9e-4eb8-bc6e-df276156401b /172.20.0.1     consumer-1
my-topic        4          6               12              6               consumer-1-b49ad28b-fcff-4986-bffa-b3683e0f0779 /172.20.0.1     consumer-1

Producer側は30秒おきにメッセージをTopicに放り込みつつ

2018-05-03T23:55:47.847 sleeping...
2018-05-03T23:56:18.048 sleeping...

Consumer側では、メッセージが取り出されて標準出力に出力され続けます。

## Consumer-1
0 / message-200
0 / message-205
0 / message-210
1 / message-183
1 / message-188
1 / message-193


## Consumer-2
4 / message-157
4 / message-162
4 / message-167
4 / message-172
4 / message-177


## Consumer-3
2 / message-181
2 / message-186
3 / message-184
2 / message-191
2 / message-196
2 / message-201
2 / message-206
3 / message-189

まあ、動いています、と。

Brokerをダウンさせる

では、ここでBrokerをひとつ落としてみましょう。

$ kill [PID]

Topicの状態を見ると、idが4のBrokerがいなくなりました。

bin/kafka-topics.sh --describe --zookeeper 172.20.0.2:2181 --topic my-topic'
Topic:my-topic	PartitionCount:5	ReplicationFactor:2	Configs:
	Topic: my-topic	Partition: 0	Leader: 5	Replicas: 5,3	Isr: 5,3
	Topic: my-topic	Partition: 1	Leader: 3	Replicas: 3,4	Isr: 3
	Topic: my-topic	Partition: 2	Leader: 5	Replicas: 4,5	Isr: 5
	Topic: my-topic	Partition: 3	Leader: 5	Replicas: 5,4	Isr: 5
	Topic: my-topic	Partition: 4	Leader: 3	Replicas: 3,5	Isr: 3,5

ダウン前は、こちら。

$ bin/kafka-topics.sh --describe --zookeeper 172.20.0.2:2181 --topic my-topic
Topic:my-topic	PartitionCount:5	ReplicationFactor:2	Configs:
	Topic: my-topic	Partition: 0	Leader: 5	Replicas: 5,3	Isr: 5,3
	Topic: my-topic	Partition: 1	Leader: 3	Replicas: 3,4	Isr: 3,4
	Topic: my-topic	Partition: 2	Leader: 4	Replicas: 4,5	Isr: 4,5
	Topic: my-topic	Partition: 3	Leader: 5	Replicas: 5,4	Isr: 5,4
	Topic: my-topic	Partition: 4	Leader: 3	Replicas: 3,5	Isr: 3,5

Consumer Groupにおける、Consumerの割り当ては、特に変わらなかったようです。

$ bin/kafka-consumer-groups.sh --bootstrap-server 172.20.0.3:9092 --describe --group my-group
Note: This will not show information about old Zookeeper-based consumers.

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-ID
my-topic        2          78              78              0               consumer-1-8641226b-4da8-4036-9a96-b4f932ddb7f7 /172.20.0.1     consumer-1
my-topic        3          78              78              0               consumer-1-8641226b-4da8-4036-9a96-b4f932ddb7f7 /172.20.0.1     consumer-1
my-topic        0          78              78              0               consumer-1-3ced027b-fd9e-4eb8-bc6e-df276156401b /172.20.0.1     consumer-1
my-topic        1          78              78              0               consumer-1-3ced027b-fd9e-4eb8-bc6e-df276156401b /172.20.0.1     consumer-1
my-topic        4          78              78              0               consumer-1-b49ad28b-fcff-4986-bffa-b3683e0f0779 /172.20.0.1     consumer-1

なお、BrokerおよびConsumerはふつうに動き続けていました。

## Consumer-1
0 / message-500
0 / message-505
0 / message-510
1 / message-483
1 / message-488
1 / message-493
1 / message-498


## Consumer-2
4 / message-427
4 / message-432
4 / message-437
4 / message-442
4 / message-447
4 / message-452


## Consumer-3
2 / message-491
2 / message-496
2 / message-501
2 / message-506
3 / message-484
3 / message-489
3 / message-494

リバランスする

ところで、Topicの状態を見るとBroker間でパーティションの割り当てが偏ったままです。

$ bin/kafka-topics.sh --describe --zookeeper 172.20.0.2:2181 --topic my-topic
Topic:my-topic	PartitionCount:5	ReplicationFactor:2	Configs:
	Topic: my-topic	Partition: 0	Leader: 5	Replicas: 5,3	Isr: 5,3
	Topic: my-topic	Partition: 1	Leader: 3	Replicas: 3,4	Isr: 3
	Topic: my-topic	Partition: 2	Leader: 5	Replicas: 4,5	Isr: 5
	Topic: my-topic	Partition: 3	Leader: 5	Replicas: 5,4	Isr: 5
	Topic: my-topic	Partition: 4	Leader: 3	Replicas: 3,5	Isr: 3,5

Leaderから4がいなくなったものの、レプリカにはまだ4のBrokerがいる状態になっています。

では、これをリバランスしましょう。

次の内容のJSONを作成。
topics-to-move.json

{
  "topics": [{ "topic": "my-topic" }],
  "version": 1
}

作成したJSONを元に、次のコマンドを実行。「--broker-list」に指定するBrokerは、残った3と5にしています。

$ bin/kafka-reassign-partitions.sh --zookeeper 172.20.0.2:2181 --topics-to-move-json-file topics-to-move.json --broker-list=3,5 --generate
Current partition replica assignment
{"version":1,"partitions":[{"topic":"my-topic","partition":2,"replicas":[4,5],"log_dirs":["any","any"]},{"topic":"my-topic","partition":1,"replicas":[3,4],"log_dirs":["any","any"]},{"topic":"my-topic","partition":0,"replicas":[5,3],"log_dirs":["any","any"]},{"topic":"my-topic","partition":4,"replicas":[3,5],"log_dirs":["any","any"]},{"topic":"my-topic","partition":3,"replicas":[5,4],"log_dirs":["any","any"]}]}

Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"my-topic","partition":2,"replicas":[3,5],"log_dirs":["any","any"]},{"topic":"my-topic","partition":4,"replicas":[3,5],"log_dirs":["any","any"]},{"topic":"my-topic","partition":1,"replicas":[5,3],"log_dirs":["any","any"]},{"topic":"my-topic","partition":3,"replicas":[5,3],"log_dirs":["any","any"]},{"topic":"my-topic","partition":0,"replicas":[3,5],"log_dirs":["any","any"]}]}

で、「Proposed partition reassignment configuration」で表示されたJSONの内容をファイルにして
expand-cluster-reassignment.json

{"version":1,"partitions":[{"topic":"my-topic","partition":2,"replicas":[3,5],"log_dirs":["any","any"]},{"topic":"my-topic","partition":4,"replicas":[3,5],"log_dirs":["any","any"]},{"topic":"my-topic","partition":1,"replicas":[5,3],"log_dirs":["any","any"]},{"topic":"my-topic","partition":3,"replicas":[5,3],"log_dirs":["any","any"]},{"topic":"my-topic","partition":0,"replicas":[3,5],"log_dirs":["any","any"]}]}

実行。

$ bin/kafka-reassign-partitions.sh --zookeeper 172.20.0.2:2181 --reassignment-json-file expand-cluster-reassignment.json --execute
Current partition replica assignment

{"version":1,"partitions":[{"topic":"my-topic","partition":2,"replicas":[4,5],"log_dirs":["any","any"]},{"topic":"my-topic","partition":1,"replicas":[3,4],"log_dirs":["any","any"]},{"topic":"my-topic","partition":0,"replicas":[5,3],"log_dirs":["any","any"]},{"topic":"my-topic","partition":4,"replicas":[3,5],"log_dirs":["any","any"]},{"topic":"my-topic","partition":3,"replicas":[5,4],"log_dirs":["any","any"]}]}

Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions.

再割り当て完了です。

$ bin/kafka-topics.sh --describe --zookeeper 172.20.0.2:2181 --topic my-topic
Topic:my-topic	PartitionCount:5	ReplicationFactor:2	Configs:
	Topic: my-topic	Partition: 0	Leader: 5	Replicas: 3,5	Isr: 5,3
	Topic: my-topic	Partition: 1	Leader: 3	Replicas: 5,3	Isr: 3,5
	Topic: my-topic	Partition: 2	Leader: 5	Replicas: 3,5	Isr: 5,3
	Topic: my-topic	Partition: 3	Leader: 5	Replicas: 5,3	Isr: 5,3
	Topic: my-topic	Partition: 4	Leader: 3	Replicas: 3,5	Isr: 3,5

まとめ

ProducerやConsumerの実行中に、Brokerを落としてみましたが、今回はそのまま動作しました。ただ、時々Consumer側がなにも拾わなくなることがあり、ちょっと
怖い感じがします。

あと、Brokerが落ちてもリバランスが自動で行われるわけではないので、そこをリバランスしないままさらにもうひとつBrokerが落ちたりすると、データの分布に
よってはロストしたりするんでしょうね。

やっぱり、Nodeが落ちた時の挙動は気になるものですね…。