CLOVER🍀

That was when it all began.

Apache Kafkaで、実行中にBrokerを追加した時のProducer/Consumerの挙動を確認する

Apache Kafkaで、ProducerやConsumerを実行中にBrokerを追加した時にどうなるか、その挙動を見てみようと思います。

環境とお題

利用する、Apache Kafkaのバージョンは1.1.0とします。

Apache ZooKeeperをひとつ、Apache KafkaのBrokerを以下のような体系で用意します。

  • Apache ZooKepper … 172.21.0.2
  • Apache Kafka … 172.21.0.3〜

Apache KafkaのBrokerは最初は2つで、あとで3つに増やします。この時のバリエーションとして、

という感じでやってみようかと。

その他、環境的なところはこんな感じです。

$ java -version
openjdk version "1.8.0_162"
OpenJDK Runtime Environment (build 1.8.0_162-8u162-b12-0ubuntu0.16.04.2-b12)
OpenJDK 64-Bit Server VM (build 25.162-b12, mixed mode

$ mvn -version
Apache Maven 3.5.3 (3383c37e1f9e9b3bc3df5050c29c8aff9f295297; 2018-02-25T04:49:05+09:00)
Maven home: /usr/local/maven3/current
Java version: 1.8.0_162, vendor: Oracle Corporation
Java home: /usr/lib/jvm/java-8-openjdk-amd64/jre
Default locale: ja_JP, platform encoding: UTF-8
OS name: "linux", version: "4.4.0-104-generic", arch: "amd64", family: "unix"

サンプルプログラム

今回は、簡単なProducer、Consumerのプログラムを用意します。

Maven依存関係は、こちら。

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>1.1.0</version>
        </dependency>

Producer。
src/main/java/org/littlewings/kafka/SimpleLoader.java

package org.littlewings.kafka;

import java.time.LocalDateTime;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;

public class SimpleLoader {
    public static void main(String... args) throws Exception {
        String bootstrapServers;
        String topicName;

        if (args.length > 1) {
            bootstrapServers = args[0];
            topicName = args[1];
        } else {
            bootstrapServers = "localhost:9092";
            topicName = "my-topic";
        }

        System.out.println("execute info = " + bootstrapServers + ", " + topicName);

        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, "60000");

        try (KafkaProducer<Integer, String> producer =
                     new KafkaProducer<>(properties, new IntegerSerializer(), new StringSerializer())) {

            for (int i = 1; i < Integer.MAX_VALUE; i++) {
                producer.send(new ProducerRecord<>(topicName, null, "message-" + i)).get();

                if (i % 30 == 0) {
                    System.out.println(LocalDateTime.now() + " sleeping...");
                    TimeUnit.SECONDS.sleep(30L);
                }
            }
        }
    }
}

30秒おきに、適当にメッセージを30個放り込むProducerです。

起動引数では、「bootstrap.servers」とTopic名をもらうようにしています。
ProducerConfig.METADATA_MAX_AGE_CONFIG(metadata.max.age.ms)については、後述。

Consumer側。
src/main/java/org/littlewings/kafka/SimpleConsumer.java

package org.littlewings.kafka;

import java.util.Arrays;
import java.util.Iterator;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords; 
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;

public class SimpleConsumer {
    public static void main(String... args) throws Exception {
        String bootstrapServers;
        String topicName;
        String groupId;

        if (args.length > 2) {
            bootstrapServers = args[0];
            topicName = args[1];
            groupId = args[2];
        } else {
            bootstrapServers = "localhost:9092";
            topicName = "my-topic";
            groupId = "my-group";
        }

        System.out.println("execute info = " + bootstrapServers + ", " + topicName+ ", " + groupId);

        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        properties.put(ConsumerConfig.METADATA_MAX_AGE_CONFIG, "60000");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        try (KafkaConsumer<Integer, String> consumer =
             new KafkaConsumer<>(properties)) {
            consumer.subscribe(Arrays.asList(topicName));

            while (true) {
                ConsumerRecords<Integer, String> records = consumer.poll(1000L);

                Iterator<ConsumerRecord<Integer, String>> iterator = records.iterator();

                while (iterator.hasNext()) {
                    ConsumerRecord<Integer, String> record = iterator.next();
                    System.out.println(record.partition() + " / "  + record.value());
                }

                TimeUnit.SECONDS.sleep(1L);
            }
        }
    }
}

こちらも、起動引数に「bootstrap.servers」とTopic名、Consumer Group名を受け取るようにしています。起動後は、ひたすら実行を
継続します。

こんな感じのプログラムでスタート。

Brokerを追加してパーティションを追加してみる

では、最初にBrokerを追加してパーティションを追加してみるパターンから。

とりあえず、Apache KafkaのBrokerを2つ起動しておきます。

Topic作成。名前は「my-topic」、パーティション数は3、Replication Factorは2としておきました。

$ bin/kafka-topics.sh --create --zookeeper 172.21.0.2:2181 --replication-factor 2 --partitions 3 --topic my-topic
Created topic "my-topic".

describe。

$ bin/kafka-topics.sh --describe --zookeeper 172.21.0.2:2181 --topic my-topic
Topic:my-topic	PartitionCount:3	ReplicationFactor:2	Configs:
	Topic: my-topic	Partition: 0	Leader: 4	Replicas: 4,3	Isr: 4,3
	Topic: my-topic	Partition: 1	Leader: 3	Replicas: 3,4	Isr: 3,4
	Topic: my-topic	Partition: 2	Leader: 4	Replicas: 4,3	Isr: 4,3

各BrokerのIDが、3と4になっている(1からとかじゃない)のは気にしないでください…。

ここで、Consumerを3つ起動してみます。Consumer Group名は「my-group」で。

## 1
$ mvn compile exec:java -Dexec.mainClass=org.littlewings.kafka.SimpleConsumer -Dexec.args='172.21.0.3:9092 my-topic my-group'

## 2
$ mvn compile exec:java -Dexec.mainClass=org.littlewings.kafka.SimpleConsumer -Dexec.args='172.21.0.3:9092 my-topic my-group'

## 3
$ mvn compile exec:java -Dexec.mainClass=org.littlewings.kafka.SimpleConsumer -Dexec.args='172.21.0.3:9092 my-topic my-group'

「bootstrap.servers」には、全部のBrokerを並べなくても良いようなので、今回はこちらを少なく指定してみました。

A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The client will make use of all servers irrespective of which servers are specified here for bootstrapping—this list only impacts the initial hosts used to discover the full set of servers. This list should be in the form host1:port1,host2:port2,.... Since these servers are just used for the initial connection to discover the full cluster membership (which may change dynamically), this list need not contain the full set of servers (you may want more than one, though, in case a server is down).

https://kafka.apache.org/11/documentation.html#newconsumerconfigs

これは、Producerにも同じことが書いています。
Producer Configs

Consumerの割り当てを確認。

$ bin/kafka-consumer-groups.sh --bootstrap-server 172.21.0.3:9092 --describe --group my-group
Note: This will not show information about old Zookeeper-based consumers.

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-ID
my-topic        0          0               0               0               consumer-1-2ad94e90-0616-46ee-b6df-423efeea97d3 /172.21.0.1     consumer-1
my-topic        1          0               0               0               consumer-1-5e101ced-d713-475b-91ff-bd4b9480ac0b /172.21.0.1     consumer-1
my-topic        2          0               0               0               consumer-1-66d3320c-b3dd-4017-9c27-6c8dd88b265e /172.21.0.1     consumer-1

はい。

それでは、Producerを起動してみます。

$ mvn compile exec:java -Dexec.mainClass=org.littlewings.kafka.SimpleLoader -Dexec.args='172.21.0.3:9092 my-topic

30秒ごとにメッセージが30通ずつ送られ

2018-04-15T22:07:29.386 sleeping...

各Consumerがメッセージを受け取ります。

## 1
2 / message-3
2 / message-6
2 / message-9
2 / message-12
2 / message-15
2 / message-18
2 / message-21
2 / message-24
2 / message-27
2 / message-30


## 2
0 / message-2
0 / message-5
0 / message-8
0 / message-11
0 / message-14
0 / message-17
0 / message-20
0 / message-23
0 / message-26
0 / message-29

## 3
1 / message-1
1 / message-4
1 / message-7
1 / message-10
1 / message-13
1 / message-16
1 / message-19
1 / message-22
1 / message-25
1 / message-28

同じConsumer Groupなので、重複なく受け取っていますね。

それでは、ここでBrokerを追加してみます。

$ bin/kafka-server-start.sh -daemon config/server.properties

Brokerを追加しただけではなにも起こらないので、

$ bin/kafka-topics.sh --describe --zookeeper 172.21.0.2:2181 --topic my-topic
Topic:my-topic	PartitionCount:3	ReplicationFactor:2	Configs:
	Topic: my-topic	Partition: 0	Leader: 4	Replicas: 4,3	Isr: 4,3
	Topic: my-topic	Partition: 1	Leader: 3	Replicas: 3,4	Isr: 3,4
	Topic: my-topic	Partition: 2	Leader: 4	Replicas: 4,3	Isr: 4,3

$ bin/kafka-consumer-groups.sh --bootstrap-server 172.21.0.3:9092 --describe --group my-group
Note: This will not show information about old Zookeeper-based consumers.

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-ID
my-topic        0          50              50              0               consumer-1-2ad94e90-0616-46ee-b6df-423efeea97d3 /172.21.0.1     consumer-1
my-topic        1          50              50              0               consumer-1-5e101ced-d713-475b-91ff-bd4b9480ac0b /172.21.0.1     consumer-1
my-topic        2          50              50              0               consumer-1-66d3320c-b3dd-4017-9c27-6c8dd88b265e /172.21.0.1     consumer-1

パーティションを追加してみましょう。5個まで増やしてみます。
Modifying topics

$ bin/kafka-topics.sh --zookeeper 172.21.0.2:2181 --topic my-topic --alter --partitions 5
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!

LeaderやConsumerなどの割り当ては、このように。

$ bin/kafka-topics.sh --describe --zookeeper 172.21.0.2:2181 --topic my-topic
Topic:my-topic	PartitionCount:5	ReplicationFactor:2	Configs:
	Topic: my-topic	Partition: 0	Leader: 4	Replicas: 4,3	Isr: 4,3
	Topic: my-topic	Partition: 1	Leader: 3	Replicas: 3,4	Isr: 3,4
	Topic: my-topic	Partition: 2	Leader: 4	Replicas: 4,3	Isr: 4,3
	Topic: my-topic	Partition: 3	Leader: 4	Replicas: 4,5	Isr: 4,5
	Topic: my-topic	Partition: 4	Leader: 5	Replicas: 5,3	Isr: 5,3


$ bin/kafka-consumer-groups.sh --bootstrap-server 172.21.0.3:9092 --describe --group my-group
Note: This will not show information about old Zookeeper-based consumers.

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-ID
my-topic        0          80              80              0               consumer-1-2ad94e90-0616-46ee-b6df-423efeea97d3 /172.21.0.1     consumer-1
my-topic        1          80              80              0               consumer-1-2ad94e90-0616-46ee-b6df-423efeea97d3 /172.21.0.1     consumer-1
my-topic        2          80              80              0               consumer-1-5e101ced-d713-475b-91ff-bd4b9480ac0b /172.21.0.1     consumer-1
my-topic        3          -               0               -               consumer-1-5e101ced-d713-475b-91ff-bd4b9480ac0b /172.21.0.1     consumer-1
my-topic        4          -               0               -               consumer-1-66d3320c-b3dd-4017-9c27-6c8dd88b265e /172.21.0.1     consumer-1

Consumer Group内での際割り当ては、割とすぐに行われます。

ただ、実際のProducerやConsumerが追加されたパーティションに反応するのには、少し時間がかかります。

最終的には、Consumerがちゃんとメッセージを読め、Producerも新しいパーティションにメッセージを送るようになります。

## 1
4 / message-244
4 / message-249
4 / message-254
4 / message-259
4 / message-264
4 / message-269
4 / message-274
4 / message-279
4 / message-284
4 / message-289
4 / message-294
4 / message-299


## 2
0 / message-252
0 / message-257
0 / message-262
0 / message-267
1 / message-250
1 / message-255
1 / message-260
1 / message-265
1 / message-270
1 / message-275
1 / message-280
1 / message-285
1 / message-290
1 / message-295
1 / message-300
0 / message-272
0 / message-277
0 / message-282
0 / message-287
0 / message-292
0 / message-297


## 3
2 / message-248
2 / message-253
2 / message-258
2 / message-263
2 / message-268
3 / message-246
3 / message-251
3 / message-256
3 / message-261
3 / message-266
2 / message-273
2 / message-278
2 / message-283
2 / message-288
2 / message-293
2 / message-298
3 / message-271
3 / message-276
3 / message-281
3 / message-286
3 / message-291
3 / message-296
3 / message-301

この「少し時間がかかる」と言った部分が「metadata.max.age.ms」です。

Producer Configs

New Consumer Configs

Producer、Consumerそれぞれに設定し、ここで設定された時間(ms)だけメタデータを保持するため、指定時間経過後に新しいLeaderやBrokerを検出します。

The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions.

https://kafka.apache.org/11/documentation.html#producerconfigs

デフォルト値は「300000」(5分)なので、けっこう待ちます…。

なので、最初はパーティションを追加したりしても、Produerが新しいパーティションにメッセージを追加してくれなかったり、Consumerが読んでくれなかったり
するように見えてしまいます。

今回は、1分に縮めておきました。

        properties.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, "60000");

これで、「bootstrap.servers」に記載していないBrokerについてもProducerやConsumerが認識してくれること(あくまで起動時のシードであること)、
後からBrokerを追加(パーティションも追加)しても一定時間後に処理対象として認識してくれることがわかりました。

Brokerを追加して、パーティション配置を再割り当て

では、続いてBrokerを追加して、パーティション配置を再割り当てというパターンを。

ここまでの環境はいったん忘れて、再度Brokerを2つ起動した後から始めます。

Topicの作成。今回は、パーティション数を5とします。Brokerを追加して再割り当てするのみなので。

$ bin/kafka-topics.sh --create --zookeeper 172.21.0.2:2181 --replication-factor 2 --partitions 5 --topic my-topic
Created topic "my-topic".

Leaderなどのマッピング

bin/kafka-topics.sh --describe --zookeeper 172.21.0.2:2181 --topic my-topic
Topic:my-topic	PartitionCount:5	ReplicationFactor:2	Configs:
	Topic: my-topic	Partition: 0	Leader: 3	Replicas: 3,4	Isr: 3,4
	Topic: my-topic	Partition: 1	Leader: 4	Replicas: 4,3	Isr: 4,3
	Topic: my-topic	Partition: 2	Leader: 3	Replicas: 3,4	Isr: 3,4
	Topic: my-topic	Partition: 3	Leader: 4	Replicas: 4,3	Isr: 4,3
	Topic: my-topic	Partition: 4	Leader: 3	Replicas: 3,4	Isr: 3,4

先ほどのパターンと同じく、Consumerを3つ起動。Consumer Groupの名前は、「my-group」のままで。

## 1
$ mvn compile exec:java -Dexec.mainClass=org.littlewings.kafka.SimpleConsumer -Dexec.args='172.21.0.3:9092 my-topic my-group'

## 2
$ mvn compile exec:java -Dexec.mainClass=org.littlewings.kafka.SimpleConsumer -Dexec.args='172.21.0.3:9092 my-topic my-group'

## 3
$ mvn compile exec:java -Dexec.mainClass=org.littlewings.kafka.SimpleConsumer -Dexec.args='172.21.0.3:9092 my-topic my-group'

Consumer Group内での、パーティションとConsumerの割り当てを確認。

$ bash -c 'bin/kafka-consumer-groups.sh --bootstrap-server 172.21.0.3:9092 --describe --group my-group
Note: This will not show information about old Zookeeper-based consumers.

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-ID
my-topic        0          0               0               0               consumer-1-abe7445f-253d-487c-980d-8fc43436b5f6 /172.21.0.1     consumer-1
my-topic        1          0               0               0               consumer-1-abe7445f-253d-487c-980d-8fc43436b5f6 /172.21.0.1     consumer-1
my-topic        2          0               0               0               consumer-1-b764a1de-193c-4276-9bee-679c385a7e35 /172.21.0.1     consumer-1
my-topic        3          0               0               0               consumer-1-b764a1de-193c-4276-9bee-679c385a7e35 /172.21.0.1     consumer-1
my-topic        4          0               0               0               consumer-1-bb3b4321-a7b4-43fa-b982-2175328b46da /172.21.0.1     consumer-1

Producerも起動します。

$ mvn compile exec:java -Dexec.mainClass=org.littlewings.kafka.SimpleLoader -Dexec.args='172.21.0.3:9092 my-topic

Producerによるメッセージの登録と、Consumerのメッセージの読み取りが始まります。

## 1
0 / message-5
0 / message-10
0 / message-15
0 / message-20
0 / message-25
0 / message-30
1 / message-3
1 / message-8
1 / message-13
1 / message-18
1 / message-23
1 / message-28


## 2
4 / message-2
4 / message-7
4 / message-12
4 / message-17
4 / message-22
4 / message-27


## 3
2 / message-1
2 / message-6
2 / message-11
2 / message-16
2 / message-21
2 / message-26
3 / message-4
3 / message-9
3 / message-14
3 / message-19
3 / message-24
3 / message-29

ここでBrokerを追加してみます。

$ bin/kafka-server-start.sh -daemon config/server.properties

この時点では、特に変化はありません。

$ bin/kafka-topics.sh --describe --zookeeper 172.21.0.2:2181 --topic my-topic
Topic:my-topic	PartitionCount:5	ReplicationFactor:2	Configs:
	Topic: my-topic	Partition: 0	Leader: 3	Replicas: 3,4	Isr: 3,4
	Topic: my-topic	Partition: 1	Leader: 4	Replicas: 4,3	Isr: 4,3
	Topic: my-topic	Partition: 2	Leader: 3	Replicas: 3,4	Isr: 3,4
	Topic: my-topic	Partition: 3	Leader: 4	Replicas: 4,3	Isr: 4,3
	Topic: my-topic	Partition: 4	Leader: 3	Replicas: 3,4	Isr: 3,4


$bin/kafka-consumer-groups.sh --bootstrap-server 172.21.0.3:9092 --describe --group my-group
Note: This will not show information about old Zookeeper-based consumers.

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-ID
my-topic        0          24              24              0               consumer-1-abe7445f-253d-487c-980d-8fc43436b5f6 /172.21.0.1     consumer-1
my-topic        1          24              24              0               consumer-1-abe7445f-253d-487c-980d-8fc43436b5f6 /172.21.0.1     consumer-1
my-topic        2          24              24              0               consumer-1-b764a1de-193c-4276-9bee-679c385a7e35 /172.21.0.1     consumer-1
my-topic        3          24              24              0               consumer-1-b764a1de-193c-4276-9bee-679c385a7e35 /172.21.0.1     consumer-1
my-topic        4          24              24              0               consumer-1-bb3b4321-a7b4-43fa-b982-2175328b46da /172.21.0.1     consumer-1

では、Brokerに対するパーティションの再割り当てを行ってみます。
Expanding your cluster

JSONファイルを作成。
topics-to-move.json

{
  "topics": [{ "topic": "my-topic" }],
  "version": 1
}

再分配のための、情報を作成。今回は、「--broker-list」ですべてのBrokerで再割り当てするようにしてみましょう。

$ bin/kafka-reassign-partitions.sh --zookeeper 172.21.0.2:2181 --topics-to-move-json-file topics-to-move.json --broker-list=3,4,5 --generate
Current partition replica assignment
{"version":1,"partitions":[{"topic":"my-topic","partition":2,"replicas":[3,4],"log_dirs":["any","any"]},{"topic":"my-topic","partition":1,"replicas":[4,3],"log_dirs":["any","any"]},{"topic":"my-topic","partition":0,"replicas":[3,4],"log_dirs":["any","any"]},{"topic":"my-topic","partition":4,"replicas":[3,4],"log_dirs":["any","any"]},{"topic":"my-topic","partition":3,"replicas":[4,3],"log_dirs":["any","any"]}]}

Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"my-topic","partition":2,"replicas":[5,4],"log_dirs":["any","any"]},{"topic":"my-topic","partition":4,"replicas":[4,5],"log_dirs":["any","any"]},{"topic":"my-topic","partition":1,"replicas":[4,3],"log_dirs":["any","any"]},{"topic":"my-topic","partition":3,"replicas":[3,4],"log_dirs":["any","any"]},{"topic":"my-topic","partition":0,"replicas":[3,5],"log_dirs":["any","any"]}]}

「Proposed partition reassignment configuration」以下に表示された内容をJSONファイルに保存し
expand-cluster-reassignment.json

{"version":1,"partitions":[{"topic":"my-topic","partition":2,"replicas":[5,4],"log_dirs":["any","any"]},{"topic":"my-topic","partition":4,"replicas":[4,5],"log_dirs":["any","any"]},{"topic":"my-topic","partition":1,"replicas":[4,3],"log_dirs":["any","any"]},{"topic":"my-topic","partition":3,"replicas":[3,4],"log_dirs":["any","any"]},{"topic":"my-topic","partition":0,"replicas":[3,5],"log_dirs":["any","any"]}]}

実行。

$ bin/kafka-reassign-partitions.sh --zookeeper 172.21.0.2:2181 --reassignment-json-file expand-cluster-reassignment.json --execute
Current partition replica assignment

{"version":1,"partitions":[{"topic":"my-topic","partition":2,"replicas":[3,4],"log_dirs":["any","any"]},{"topic":"my-topic","partition":1,"replicas":[4,3],"log_dirs":["any","any"]},{"topic":"my-topic","partition":0,"replicas":[3,4],"log_dirs":["any","any"]},{"topic":"my-topic","partition":4,"replicas":[3,4],"log_dirs":["any","any"]},{"topic":"my-topic","partition":3,"replicas":[4,3],"log_dirs":["any","any"]}]}

Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions.

再分配が完了。

$ bin/kafka-topics.sh --describe --zookeeper 172.21.0.2:2181 --topic my-topic
Topic:my-topic	PartitionCount:5	ReplicationFactor:2	Configs:
	Topic: my-topic	Partition: 0	Leader: 3	Replicas: 3,5	Isr: 3,5
	Topic: my-topic	Partition: 1	Leader: 4	Replicas: 4,3	Isr: 4,3
	Topic: my-topic	Partition: 2	Leader: 5	Replicas: 5,4	Isr: 4,5
	Topic: my-topic	Partition: 3	Leader: 4	Replicas: 3,4	Isr: 4,3
	Topic: my-topic	Partition: 4	Leader: 4	Replicas: 4,5	Isr: 4,5

Consumer Group内での割り当ては、変わらない感じです。

$ bin/kafka-consumer-groups.sh --bootstrap-server 172.21.0.3:9092 --describe --group my-group'
Note: This will not show information about old Zookeeper-based consumers.

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-ID
my-topic        0          90              90              0               consumer-1-abe7445f-253d-487c-980d-8fc43436b5f6 /172.21.0.1     consumer-1
my-topic        1          90              90              0               consumer-1-abe7445f-253d-487c-980d-8fc43436b5f6 /172.21.0.1     consumer-1
my-topic        2          90              90              0               consumer-1-b764a1de-193c-4276-9bee-679c385a7e35 /172.21.0.1     consumer-1
my-topic        3          90              90              0               consumer-1-b764a1de-193c-4276-9bee-679c385a7e35 /172.21.0.1     consumer-1
my-topic        4          90              90              0               consumer-1-bb3b4321-a7b4-43fa-b982-2175328b46da /172.21.0.1     consumer-1

ところがですね、先ほどのProducerのコードだと

        try (KafkaProducer<Integer, String> producer =
                     new KafkaProducer<>(properties, new IntegerSerializer(), new StringSerializer())) {

            for (int i = 1; i < Integer.MAX_VALUE; i++) {
                producer.send(new ProducerRecord<>(topicName, null, "message-" + i)).get();

                if (i % 30 == 0) {
                    System.out.println(LocalDateTime.now() + " sleeping...");
                    TimeUnit.SECONDS.sleep(30L);
                }
            }
        }

このオペレーションを行うと次の実行でエラーになります。

java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke (Method.java:498)
    at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:294)
    at java.lang.Thread.run (Thread.java:748)
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition.
    at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError (FutureRecordMetadata.java:94)
    at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get (FutureRecordMetadata.java:64)
    at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get (FutureRecordMetadata.java:29)
    at org.littlewings.kafka.SimpleLoader.main (SimpleLoader.java:57)
    at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke (Method.java:498)
    at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:294)
    at java.lang.Thread.run (Thread.java:748)

この場合、例えばこういう感じとかでメッセージを送ったら接続させ直す方がいいんでしょうか…。
*つなぎ直しているので、これでエラーにはならなくなります、が…

        KafkaProducer<Integer, String> producer =
            new KafkaProducer<>(properties, new IntegerSerializer(), new StringSerializer());

        for (int i = 1; i < Integer.MAX_VALUE; i++) {
            producer.send(new ProducerRecord<>(topicName, null, "message-" + i)).get();

            if (i % 30 == 0) {
                System.out.println(LocalDateTime.now() + " sleeping...");
                TimeUnit.MINUTES.sleep(1L);

                producer.close();
                producer = new KafkaProducer<>(properties, new IntegerSerializer(), new StringSerializer());
            }
        }

        producer.close();

今回は、単純にProducerを再起動します。

$ mvn compile exec:java -Dexec.mainClass=org.littlewings.kafka.SimpleLoader -Dexec.args='172.21.0.3:9092 my-topic'

Consumer側はBrokerの追加および、パーティションの再割り当てを行っても、そのままでメッセージが読めます。

## 1
0 / message-2
0 / message-7
0 / message-12
0 / message-17
0 / message-22
0 / message-27
1 / message-5
1 / message-10
1 / message-15
1 / message-20
1 / message-25
1 / message-30


## 2
4 / message-4
4 / message-9
4 / message-14
4 / message-19
4 / message-24
4 / message-29


## 3
2 / message-3
2 / message-8
2 / message-13
2 / message-18
2 / message-23
2 / message-28
3 / message-1
3 / message-6
3 / message-11
3 / message-16
3 / message-21
3 / message-26

Producer側だけは、ちょっと注意が必要な感じが…。まあ、気楽にパーティションの再割り当てなんて行わない方がよい、という気はしますけどね。

まとめ

Apache Kafkaを使って、ProducerやConsumerの実行途中にBrokerやパーティションの構成を変更してみました。

ProducerやConsumer側に反映されるまでラグがあったり、パーティション再割り当てをした時の挙動などいろいろ知らないことや遭遇したことがあったので、
いい確認にはなったかなと。