CLOVER🍀

That was when it all began.

Apache KafkaのConsumerを、特定のパーティションに手動で割り当てる

前に、Apache KafkaのConsumer Groupを試して動きを確認したことがあるのですが、ちょっと趣向を変えて、Consumerに対してパーティション
指定して割り当ててみたいと思います。

きっかけは、ドキュメントのこの絵。

Consumers

この絵のように、Consumer Groupの中でもパーティションごとに割り当てが可能なようなのですが、このパターンを
自分で試したことがなかったので、やってみようかと。

環境

動作確認した環境は、こちら。

$ java -version
openjdk version "1.8.0_162"
OpenJDK Runtime Environment (build 1.8.0_162-8u162-b12-0ubuntu0.16.04.2-b12)
OpenJDK 64-Bit Server VM (build 25.162-b12, mixed mode)

$ mvn -version
Apache Maven 3.5.3 (3383c37e1f9e9b3bc3df5050c29c8aff9f295297; 2018-02-25T04:49:05+09:00)
Maven home: /usr/local/maven3/current
Java version: 1.8.0_162, vendor: Oracle Corporation
Java home: /usr/lib/jvm/java-8-openjdk-amd64/jre
Default locale: ja_JP, platform encoding: UTF-8
OS name: "linux", version: "4.4.0-104-generic", arch: "amd64", family: "unix"

使用するApache Kafkaのバージョンは、1.1.0とします。

構成と準備

確認する構成は、以下とします。

  • Apache ZooKeeper × … 172.21.0.2
  • Apache Kafka Broker × 3 … 172.21.0.3〜5
  • Producerアプリケーション × 1 … ローカル
  • Consumerアプリケーションその1 × 2 … ローカル
  • Consumerアプリケーションその2 × 2 … ローカル
  • Consumerアプリケーションその3 × 2 … ローカル

Apache ZooKeeperおよびApache Kafkaは、起動済み&クラスタ構成済みとします。

Topicは、以下のようにPartition数6、replication-factorは2で作成します。

$ bin/kafka-topics.sh --create --zookeeper 172.21.0.2:2181 --replication-factor 2 --partitions 6 --topic tweet-topic

各Producer/Cosumerアプリケーションは、Mavenのマルチプロジェクト構成とします。アプリケーションは、以下の構成とします。

Producerは、Twitter4jを使って「#nowplaying」ハッシュタグのあるツイートを受信してApache Kafkaに放り込みます。
Consumerは、Producerが作成したレコードを標準出力に出力し、最後に受け取ったレコードの時刻、パーティションのID、
ツイートのidをまとめて出力します。

まずは、親pom.xmlはこんな感じで。

    <modules>
        <module>twitter-producer</module>
        <module>simple-console-consumer</module>
    </modules>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
    </properties>

    <build>
        <pluginManagement>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                    <version>2.0.1.RELEASE</version>
                    <executions>
                        <execution>
                            <goals>
                                <goal>repackage</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </pluginManagement>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

Uber JARにするのに、Spring BootのMavenプラグインを使うことにしました。

ProducerのMaven依存関係。
twitter-producer/pom.xml

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>1.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.twitter4j</groupId>
            <artifactId>twitter4j-stream</artifactId>
            <version>4.0.6</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.9.5</version>
        </dependency>

Twitter4jを入れているのと、Apache KafkaにデータをJSONで入れようと思ったのでJacksonを加えています。

ConsumerのMaven依存関係。
simple-console-consumer/pom.xml

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>1.1.0</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.9.5</version>
        </dependency>

こちらは、Apache Kafkaのクライアント以外はJacksonが入っているだけです。

ここまでで、だいたい準備が終わったので、アプリケーションを作っていきます。

Producer

では、まずはTwitterから「#nowplaying」ハッシュタグでストリームを取得して、Apache Kafkaに放り込むアプリケーションを書きます。
twitter-producer/src/main/java/org/littlewings/kafka/TwitterStreamProducer.java

package org.littlewings.kafka;

import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import twitter4j.StallWarning;
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;
import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;

public class TwitterStreamProducer {
    public static void main(String... args) {
        TwitterStreamProducer streamProducer = new TwitterStreamProducer();
        streamProducer.streaming();
    }

    public void streaming() {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.21.0.3:9092,172.21.0.4:9092,172.21.0.5:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JacksonJsonSerializer.class);

        Random random = new Random();
        random.nextInt(6);

        try (KafkaProducer<String, Map<String, Object>> producer = new KafkaProducer<>(properties)) {
            TwitterStream twitterStream = new TwitterStreamFactory().getInstance();
            twitterStream.addListener(new StatusListener() {
                @Override
                public void onStatus(Status status) {
                    Map<String, Object> tweet = new LinkedHashMap<>();
                    tweet.put("id", status.getId());
                    tweet.put("screenName", status.getUser().getScreenName());
                    tweet.put("name", status.getUser().getName());
                    tweet.put("text", status.getText());
                    tweet.put("createdAt", status.getCreatedAt());

                    String key = "key-" + Math.abs(random.nextInt());
                    Map<String, Object> value = tweet;

                    producer.send(new ProducerRecord<>("tweet-topic", key, value));
                }

                @Override
                public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {
                }

                @Override
                public void onTrackLimitationNotice(int numberOfLimitedStatuses) {
                }

                @Override
                public void onScrubGeo(long userId, long upToStatusId) {
                }

                @Override
                public void onStallWarning(StallWarning warning) {
                }

                @Override
                public void onException(Exception e) {
                    e.printStackTrace();
                }
            });
            twitterStream.filter("#nowplaying");

            try {
                TimeUnit.MINUTES.sleep(1L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            twitterStream.clearListeners();
            twitterStream.cleanUp();
        }
    }
}

3台のApache KafkaのBrokerと、Serializer/Deserializerの設定をしてKafkaProducerを作成します。

        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.21.0.3:9092,172.21.0.4:9092,172.21.0.5:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JacksonJsonSerializer.class);

ツイートを受信したら、その内容を取得してApache Kafkaに登録するように実装。キーは指定しなくてもいいのですが、今回は適当にランダムで設定しておきました。

        try (KafkaProducer<String, Map<String, Object>> producer = new KafkaProducer<>(properties)) {
            TwitterStream twitterStream = new TwitterStreamFactory().getInstance();
            twitterStream.addListener(new StatusListener() {
                @Override
                public void onStatus(Status status) {
                    Map<String, Object> tweet = new LinkedHashMap<>();
                    tweet.put("id", status.getId());
                    tweet.put("screenName", status.getUser().getScreenName());
                    tweet.put("name", status.getUser().getName());
                    tweet.put("text", status.getText());
                    tweet.put("createdAt", status.getCreatedAt());

                    String key = "key-" + Math.abs(random.nextInt());
                    Map<String, Object> value = tweet;

                    producer.send(new ProducerRecord<>("tweet-topic", key, value));
                }

絞り込みのフィルタを設定して、1分間動かして、それから終了という構成にします。

            twitterStream.filter("#nowplaying");

            try {
                TimeUnit.MINUTES.sleep(1L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            twitterStream.clearListeners();
            twitterStream.cleanUp();

Twitte4jを使って、Twitterからツイートをストリーム取得するためのConsumer KeyやAccess Tokenなどは、今回は
twitter4j.propertiesに記述しました。
twitter-producer/src/main/resources/twitter4j.properties

JSONのSerializerについては、自作です。
twitter-producer/src/main/java/org/littlewings/kafka/JacksonJsonSerializer.java

package org.littlewings.kafka;

import java.util.Map;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;

public class JacksonJsonSerializer<T> implements Serializer<T> {
    ObjectMapper objectMapper = new ObjectMapper();

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        // no-op
    }

    @Override
    public byte[] serialize(String topic, T data) {
        if (data == null) {
            return null;
        }

        try {
            return objectMapper.writeValueAsBytes(data);
        } catch (JsonProcessingException e) {
            throw new SerializationException(e);
        }
    }

    @Override
    public void close() {
        // no-op
    }
}

Consumer

続いて、Consuemr側。ProducerがBrokerに登録したレコードを、若干整形してコンソール出力する感じの作りにしましょう。
simple-console-consumer/src/main/java/org/littlewings/kafka/SimpleConsoleConsumer.java

package org.littlewings.kafka;

import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

public class SimpleConsoleConsumer {
    public static void main(String... args) {
        SimpleConsoleConsumer consoleConsumer = new SimpleConsoleConsumer();

        int[] partitions = Arrays.stream(args).mapToInt(Integer::parseInt).toArray();

        consoleConsumer.consuming(partitions);
    }

    public void consuming(int... partitions) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.21.0.3:9092,172.21.0.4:9092,172.21.0.5:9092");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "simple-console-consumer");
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JacksonJsonDeserializer.class);
        properties.put("jackson.json.deserializer.type", LinkedHashMap.class);

        try (KafkaConsumer<String, Map<String, Object>> consumer =
                     new KafkaConsumer<>(properties)) {

            List<TopicPartition> topicPartitions =
                    Arrays
                            .stream(partitions)
                            .mapToObj(partitionId -> new TopicPartition("tweet-topic", partitionId))
                            .collect(Collectors.toList());

            consumer.assign(topicPartitions);

            Set<String> collectIds = new TreeSet<>();

            long startTime = System.nanoTime();

            while (true) {
                ConsumerRecords<String, Map<String, Object>> records = consumer.poll(1000L);

                records.forEach(record -> {
                    Map<String, Object> value = record.value();
                    List<String> values =
                            Arrays.asList(
                                    "partition-id=" + record.partition(),
                                    "key=" + record.key(),
                                    "id=" + value.get("id"),
                                    "createdAt=" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date((long) value.get("createdAt"))),
                                    // "screenName=" + value.get("screenName"),
                                    // "name=" + value.get("name"),
                                    "text=" + value.get("text")
                            );

                    System.out.println("=== received record: ");
                    System.out.println(
                            values
                                    .stream()
                                    .collect(Collectors.joining(
                                            System.lineSeparator() + "  ",
                                            "  ",
                                            "  ")));

                    collectIds.add(
                            new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
                                    .format(new Date((long) value.get("createdAt")))
                                    + " / " + record.partition()
                                    + " / " + value.get("id")
                    );
                });

                long elapsedTime = System.nanoTime() - startTime;

                if (TimeUnit.MINUTES.convert(elapsedTime, TimeUnit.NANOSECONDS) >= 1L) {
                    break;
                }
            }

            System.out.println();
            System.out.println("===== collected time / partition-id / id");
            collectIds.forEach(System.out::println);
        }
    }
}

Consumerが対象とするパーティションのIDを、起動引数でもらうように実装します。

    public static void main(String... args) {
        SimpleConsoleConsumer consoleConsumer = new SimpleConsoleConsumer();

        int[] partitions = Arrays.stream(args).mapToInt(Integer::parseInt).toArray();

        consoleConsumer.consuming(partitions);
    }

    public void consuming(int... partitions) {

BrokerやSerializer/Deserializerを指定して、Consumerを作成。

        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.21.0.3:9092,172.21.0.4:9092,172.21.0.5:9092");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "simple-console-consumer");
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JacksonJsonDeserializer.class);
        properties.put("jackson.json.deserializer.type", LinkedHashMap.class);

        try (KafkaConsumer<String, Map<String, Object>> consumer =
                     new KafkaConsumer<>(properties)) {

パーティションの割り当て指定は、このように行います。TopicPartitionのコンストラクタへの指定と、KafkaConsumer#assignで行います。

            List<TopicPartition> topicPartitions =
                    Arrays
                            .stream(partitions)
                            .mapToObj(partitionId -> new TopicPartition("tweet-topic", partitionId))
                            .collect(Collectors.toList());

            consumer.assign(topicPartitions);

これが、パーティションを指定しない場合だとこのようになります。

            consumer.subscribe(Arrays.asList("tweet-topic"));

KafkaConsumer#subscribeですね。

このやり方については、KafkaConsumerのJavadocに記載があります。
*「Manual Partition Assignment」を参照
KafkaConsumer (kafka 1.1.0 API)

なお、KafkaConsumer#subscribe(動的な割り当て)とKafkaConsumer#assign(手動での割り当て)を混在させることはできないようです。

Consumer Groupについては、今回は全部同じにしています。

        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "simple-console-consumer");

本体の動作自体は、1分間、受け取ったレコードをコンソールに出力しているだけです。

この時の、ツイート日時とパーティションID、ツイートのidを覚えておき、最後に出力します。

                    collectIds.add(
                            new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
                                    .format(new Date((long) value.get("createdAt")))
                                    + " / " + record.partition()
                                    + " / " + value.get("id")

Consumer Group内で、同じパーティションIDを割り当てたConsumer間での挙動を確認することが目的です。もちろん、こういうのは1分しか動かさないことが
前提の作りになっています。

最後は、自作のJSON Deserializer。
simple-console-consumer/src/main/java/org/littlewings/kafka/JacksonJsonDeserializer.java

package org.littlewings.kafka;

import java.io.IOException;
import java.util.Map;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;

public class JacksonJsonDeserializer<T> implements Deserializer<T> {
    ObjectMapper objectMapper = new ObjectMapper();
    Class<T> type;

    @SuppressWarnings("unchecked")
    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        // always value deserializer
        type = (Class<T>) configs.get("jackson.json.deserializer.type");
    }

    @Override
    public T deserialize(String topic, byte[] data) {
        if (data == null) {
            return null;
        }

        try {
            return objectMapper.readValue(data, type);
        } catch (IOException e) {
            throw new SerializationException(e);
        }
    }

    @Override
    public void close() {
        // no-op
    }
}

動作確認

それでは、パッケージングして動作確認してみます。

$ mvn package

まずは、Consumerを起動させておきます。同じパーティションを担当するConsumerを2つずつ3組、計6インスタンス起動します。

## パーティション0、3を担当 - 1
$ java -jar simple-console-consumer/target/simple-console-consumer-0.0.1-SNAPSHOT.jar 0 3

## パーティション0、3を担当 - 2
$ java -jar simple-console-consumer/target/simple-console-consumer-0.0.1-SNAPSHOT.jar 0 3


## パーティション1、4を担当 - 1
$ java -jar simple-console-consumer/target/simple-console-consumer-0.0.1-SNAPSHOT.jar 1 4

## パーティション1、4を担当 - 2
$ java -jar simple-console-consumer/target/simple-console-consumer-0.0.1-SNAPSHOT.jar 1 4


## パーティション2、5を担当 - 1
$ java -jar simple-console-consumer/target/simple-console-consumer-0.0.1-SNAPSHOT.jar 2 5

## パーティション2、5を担当 - 2
$ java -jar simple-console-consumer/target/simple-console-consumer-0.0.1-SNAPSHOT.jar 2 5

パーティションの指定は、「0」から始まるようです。

Producerを開始。

$ java -jar twitter-producer/target/twitter-producer-0.0.1-SNAPSHOT.jar

ツイートを受信中は、Consumer側にはこんな感じでログが流れます。

=== received record: 
  partition-id=3
  key=key-1599040444
  id=984798192687632384
  createdAt=2018-04-13 23:19:20
  text=#NowPlaying Preashea Hilliard - No God Like Our God on Power of Worship Radio  https://t.co/yUD7dEcULg  
=== received record: 
  partition-id=3
  key=key-436836550
  id=984798196990955520
  createdAt=2018-04-13 23:19:21
  text=#NowPlaying 04 If Snakes Could Talk, My Words Would Slither - NilExistence  
=== received record: 
  partition-id=0
  key=key-1758941679
  id=984798197150253056
  createdAt=2018-04-13 23:19:21
  text=21 Savage & Metro Boomin - Bad Guy https://t.co/1DXgbquYGi #nowplaying #listenlive  
=== received record: 
  partition-id=3
  key=key-1364280988
  id=984798200606244869
  createdAt=2018-04-13 23:19:22
  text=#NowPlaying Kool The Gang - Jungle Boogie :: Tune In: https://t.co/9DatBtw16L  
=== received record: 
  partition-id=0
  key=key-1923096726
  id=984798201541636098
  createdAt=2018-04-13 23:19:22
  text=&#9836;Love Thing / Cymbals #nowplaying

最後に、まとめた結果。長くなるので、それぞれ5レコードずつ書いていきます。

パーティション、0と3を担当分。

## 1
===== collected time / partition-id / id
2018-04-13 23:19:20 / 3 / 984798192687632384
2018-04-13 23:19:21 / 0 / 984798197150253056
2018-04-13 23:19:21 / 3 / 984798196990955520
2018-04-13 23:19:22 / 0 / 984798201541636098
2018-04-13 23:19:22 / 3 / 984798200606244869


## 2
===== collected time / partition-id / id
2018-04-13 23:19:20 / 3 / 984798192687632384
2018-04-13 23:19:21 / 0 / 984798197150253056
2018-04-13 23:19:21 / 3 / 984798196990955520
2018-04-13 23:19:22 / 0 / 984798201541636098
2018-04-13 23:19:22 / 3 / 984798200606244869

パーティション、1と4を担当分。

## 1
===== collected time / partition-id / id
2018-04-13 23:19:20 / 1 / 984798194168188929
2018-04-13 23:19:20 / 4 / 984798194881257473
2018-04-13 23:19:21 / 1 / 984798198752411648
2018-04-13 23:19:21 / 4 / 984798197527793665
2018-04-13 23:19:23 / 1 / 984798205518008320

## 2
===== collected time / partition-id / id
2018-04-13 23:19:20 / 1 / 984798194168188929
2018-04-13 23:19:20 / 4 / 984798194881257473
2018-04-13 23:19:21 / 1 / 984798198752411648
2018-04-13 23:19:21 / 4 / 984798197527793665
2018-04-13 23:19:23 / 1 / 984798205518008320


パーティション、2と5を担当分。

## 1
===== collected time / partition-id / id
2018-04-13 23:19:20 / 5 / 984798194902151170
2018-04-13 23:19:21 / 2 / 984798196655308801
2018-04-13 23:19:23 / 2 / 984798204410474497
2018-04-13 23:19:23 / 5 / 984798207271026688
2018-04-13 23:19:25 / 5 / 984798212652457984

## 2
===== collected time / partition-id / id
2018-04-13 23:19:20 / 5 / 984798194902151170
2018-04-13 23:19:21 / 2 / 984798196655308801
2018-04-13 23:19:23 / 2 / 984798204410474497
2018-04-13 23:19:23 / 5 / 984798207271026688
2018-04-13 23:19:25 / 5 / 984798212652457984

それぞれ、割り当てられているパーティションに対するデータが取得できていることがわかります。

が、各パーティションに割り当てた2つのインスタンスが、同じデータを受け取っています…。さて、これはどういうことでしょうね…。

ここで、もう1度KafkaConsumerの「Manual Partition Assignment」の部分を読んでみます。

Once assigned, you can call poll in a loop, just as in the preceding examples to consume records. The group that the consumer specifies is still used for committing offsets, but now the set of partitions will only change with another call to assign. Manual partition assignment does not use group coordination, so consumer failures will not cause assigned partitions to be rebalanced. Each consumer acts independently even if it shares a groupId with another consumer. To avoid offset commit conflicts, you should usually ensure that the groupId is unique for each consumer instance.

https://kafka.apache.org/11/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html

「Manual partition assignment does not use group coordination」、「Each consumer acts independently even if it shares a groupId with another consumer.」

なんか、Consumer Groupを設定してもそれぞれ独立して動くみたいですね。

あんまり深く考えていなかったので、驚きました。覚えておきましょう。

Producerに対するパーティションを手動で割り当てる場合は?

Producerに対してパーティションを指定する場合は、KafkaProducer#sendでレコードを呼び出す際に渡す、ProducerRecordのコンストラクタでパーティションのIDを
指定します。
KafkaProducer (kafka 1.1.0 API)
ProducerRecord (kafka 1.1.0 API)