Apache Kafkaには、Consumer Groupという概念があります。
Consumerにラベル付けをすることで、Consumerをグルーピングします。こうすることで、Brokerから配信されるレコードが、そのConsumer Group内の
ひとつのConsumerに配信されるようになります。
上記のドキュメントのように、2つのConsumer Groupがあった場合は、2つのConsumer Group内のどれかひとつずつのConsumerインスタンスが
レコードを受け取ります。
仮に全Consumerが同じConsumer Groupに属して入れば単純に負荷分散、全Consumerが異なるConsumer Groupであれば
ブロードキャストのような扱いになります。
参考)
Java Clientで入門する Apache Kafka #jjug_ccc #ccc_e2
Kafka のトピック・パーティションの決め方を考察する - uchan note
というわけで、ちょっと種類の違うConsumer Groupをつけたアプリケーションを用意して、どんなものか見てみましょう。
構成と準備
確認する構成は、以下とします。
- Apache ZooKeeper × … 172.21.0.2
- Apache Kafka Broker × 3 … 172.21.0.3〜5
- Producerアプリケーション × 1 … ローカル
- Consumerアプリケーションその1 × 2 … ローカル
- Consumerアプリケーションその2 × 2 … ローカル
Apache ZooKeeperおよびApache Kafkaは、起動済み&クラスタ構成済みとします。
Topicは、以下のようにPartition数6、replication-factorは2で作成します。
$ bin/kafka-topics.sh --create --zookeeper 172.21.0.2:2181 --replication-factor 2 --partitions 6 --topic tweet-topic
各Producer/Cosumerアプリケーションは、Mavenのマルチプロジェクト構成とします。アプリケーションは、以下の構成とします。
Producerは、Twitter4jを使って「#nowplaying」ハッシュタグのあるツイートを受信してApache Kafkaに放り込みます。
Consumerは、片方はProducerが作成したレコードを単純に標準出力に、もう片方はApache Lucene Kuromojiで形態素解析して
ある条件に合致すれば標準出力に書き出す、そういったアプリケーションにします。
まず、親pom.xmlはこんな感じで。
<modules> <module>twitter-producer</module> <module>simple-console-consumer</module> <module>tokenize-console-consumer</module> </modules> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <build> <pluginManagement> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <version>1.5.4.RELEASE</version> <executions> <execution> <goals> <goal>repackage</goal> </goals> </execution> </executions> </plugin> </plugins> </pluginManagement> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build>
Uber JARにするのに、Spring BootのMavenプラグインを使うことにしました。
ProducerのMaven依存関係。
twitter-producer/pom.xml
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.11.0.0</version> </dependency> <dependency> <groupId>org.twitter4j</groupId> <artifactId>twitter4j-stream</artifactId> <version>4.0.6</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.8.9</version> </dependency>
Twitter4jを入れているのと、Apache KafkaにデータをJSONで入れようと思ったのでJacksonを加えています。
Consumerその1のMaven依存関係。
simple-console-consumer/pom.xml
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.11.0.0</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.8.9</version> </dependency>
こちらは、Apache Kafkaのクライアント以外はJacksonが入っているだけです。
Consumerその2。
tokenize-console-consumer/pom.xml
<dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.11.0.0</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.8.9</version> </dependency> <dependency> <groupId>org.codelibs</groupId> <artifactId>lucene-analyzers-kuromoji-ipadic-neologd</artifactId> <version>6.6.0-20170703</version> </dependency> </dependencies> <repositories> <repository> <id>codelibs.org</id> <name>CodeLibs Repository</name> <url>http://maven.codelibs.org/</url> </repository> </repositories>
先ほどのConsumerに加えて、Apache Lucene Kuromoji+mecab-ipadic-NEologdを追加します。
Apache Lucene Kuromoji+mecab-ipadic-NEologdは、CodeLibsのものをお借りしました。
Producerが使用する、twitter4j.propertiesは省略します。
ここまでで、だいたい準備が終わったので、アプリケーションを作っていきます。
Producer
では、まずはTwitterから「#nowplaying」ハッシュタグでストリームを取得して、Apache Kafkaに放り込むアプリケーションを書きます。
twitter-producer/src/main/java/org/littlewings/kafka/TwitterStreamProducer.java
package org.littlewings.kafka; import java.util.LinkedHashMap; import java.util.Map; import java.util.Properties; import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import twitter4j.StallWarning; import twitter4j.Status; import twitter4j.StatusDeletionNotice; import twitter4j.StatusListener; import twitter4j.TwitterStream; import twitter4j.TwitterStreamFactory; public class TwitterStreamProducer { public static void main(String... args) { TwitterStreamProducer streamProducer = new TwitterStreamProducer(); streamProducer.streaming(); } public void streaming() { Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.21.0.3:9092,172.21.0.4:9092,172.21.0.5:9092"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JacksonJsonSerializer.class); try (KafkaProducer<String, Map<String, Object>> producer = new KafkaProducer<>(properties)) { TwitterStream twitterStream = new TwitterStreamFactory().getInstance(); twitterStream.addListener(new StatusListener() { @Override public void onStatus(Status status) { Map<String, Object> tweet = new LinkedHashMap<>(); tweet.put("id", status.getId()); tweet.put("screenName", status.getUser().getScreenName()); tweet.put("name", status.getUser().getName()); tweet.put("text", status.getText()); tweet.put("createdAt", status.getCreatedAt()); String key = status.getUser().getScreenName(); Map<String, Object> value = tweet; producer.send(new ProducerRecord<>("tweet-topic", key, value)); } @Override public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) { } @Override public void onTrackLimitationNotice(int numberOfLimitedStatuses) { } @Override public void onScrubGeo(long userId, long upToStatusId) { } @Override public void onStallWarning(StallWarning warning) { } @Override public void onException(Exception e) { e.printStackTrace(); } }); twitterStream.filter("#nowplaying"); try { TimeUnit.MINUTES.sleep(1L); } catch (InterruptedException e) { e.printStackTrace(); } twitterStream.clearListeners(); twitterStream.cleanUp(); } } }
3台のApache KafkaのBrokerと、Serializer/Deserializerの設定をしてKafkaProducerを作成します。
Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.21.0.3:9092,172.21.0.4:9092,172.21.0.5:9092"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JacksonJsonSerializer.class); try (KafkaProducer<String, Map<String, Object>> producer = new KafkaProducer<>(properties)) {
ツイートを受信したら、その内容を取得してApache Kafkaに登録するように実装。
TwitterStream twitterStream = new TwitterStreamFactory().getInstance(); twitterStream.addListener(new StatusListener() { @Override public void onStatus(Status status) { Map<String, Object> tweet = new LinkedHashMap<>(); tweet.put("id", status.getId()); tweet.put("screenName", status.getUser().getScreenName()); tweet.put("name", status.getUser().getName()); tweet.put("text", status.getText()); tweet.put("createdAt", status.getCreatedAt()); String key = status.getUser().getScreenName(); Map<String, Object> value = tweet; producer.send(new ProducerRecord<>("tweet-topic", key, value)); }
絞り込みのフィルタを設定して、1分間動かして、それから終了という構成にします。
twitterStream.filter("#nowplaying"); try { TimeUnit.MINUTES.sleep(1L); } catch (InterruptedException e) { e.printStackTrace(); } twitterStream.clearListeners(); twitterStream.cleanUp();
ところで、このJSONのSerializerは自作です。
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JacksonJsonSerializer.class);
こんな感じで作成。
twitter-producer/src/main/java/org/littlewings/kafka/JacksonJsonSerializer.java
package org.littlewings.kafka; import java.util.Map; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.serialization.Serializer; public class JacksonJsonSerializer<T> implements Serializer<T> { ObjectMapper objectMapper = new ObjectMapper(); @Override public void configure(Map<String, ?> configs, boolean isKey) { // no-op } @Override public byte[] serialize(String topic, T data) { if (data == null) { return null; } try { return objectMapper.writeValueAsBytes(data); } catch (JsonProcessingException e) { throw new SerializationException(e); } } @Override public void close() { // no-op } }
こちらのProducerで使うJSONのSerializer、また後でConsumerで使うJSONのDeserializerは、以下を参考にしました。
https://github.com/confluentinc/schema-registry/tree/v3.2.2/json-serializer/src/main/java/io/confluent/kafka/serializers
https://github.com/confluentinc/schema-registry/blob/v3.2.2/json-serializer/src/main/java/io/confluent/kafka/serializers/KafkaJsonSerializer.java
https://github.com/confluentinc/schema-registry/blob/v3.2.2/json-serializer/src/main/java/io/confluent/kafka/serializers/KafkaJsonDeserializer.java
Consumerその1
続いて、最初のConsumerを作成します。こちらは、ProducerがBrokerに登録したレコードを、若干整形してコンソール出力するものとしましょう。
simple-console-consumer/src/main/java/org/littlewings/kafka/SimpleConsoleConsumer.java
package org.littlewings.kafka; import java.text.SimpleDateFormat; import java.util.Arrays; import java.util.Date; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; public class SimpleConsoleConsumer { public static void main(String... args) { SimpleConsoleConsumer consoleConsumer = new SimpleConsoleConsumer(); consoleConsumer.consuming(); } public void consuming() { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.21.0.3:9092,172.21.0.4:9092,172.21.0.5:9092"); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "simple-console-consumer"); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JacksonJsonDeserializer.class); properties.put("jackson.json.deserializer.type", LinkedHashMap.class); try (KafkaConsumer<String, Map<String, Object>> consumer = new KafkaConsumer<>(properties)) { consumer.subscribe(Arrays.asList("tweet-topic")); Set<String> collectIds = new TreeSet<>(); long startTime = System.nanoTime(); while (true) { ConsumerRecords<String, Map<String, Object>> records = consumer.poll(1000L); records.forEach(record -> { Map<String, Object> value = record.value(); List<String> values = Arrays.asList( "id=" + value.get("id"), "createdAt=" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date((long) value.get("createdAt"))), // "screenName=" + value.get("screenName"), // "name=" + value.get("name"), "text=" + value.get("text") ); System.out.println("=== received record: "); System.out.println( values .stream() .collect(Collectors.joining( System.lineSeparator() + " ", " ", " "))); collectIds.add( new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") .format(new Date((long) value.get("createdAt"))) + " / " + value.get("id") ); }); long elapsedTime = System.nanoTime() - startTime; if (TimeUnit.MINUTES.convert(elapsedTime, TimeUnit.NANOSECONDS) >= 1L) { break; } } System.out.println(); System.out.println("===== collected time / id"); collectIds.forEach(System.out::println); } } }
BrokerやSerializer/Deserializerを指定して、KafkaConsumerを作成します。
Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.21.0.3:9092,172.21.0.4:9092,172.21.0.5:9092"); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "simple-console-consumer"); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JacksonJsonDeserializer.class); properties.put("jackson.json.deserializer.type", LinkedHashMap.class); try (KafkaConsumer<String, Map<String, Object>> consumer = new KafkaConsumer<>(properties)) { consumer.subscribe(Arrays.asList("tweet-topic"));
こちらのConsumer Groupは、「simple-console-consumer」としました。
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "simple-console-consumer");
あと、以下の1文は、自作のDeserializer向けの設定です。
properties.put("jackson.json.deserializer.type", LinkedHashMap.class);
本体の動作自体は、1分間、受け取ったレコードをコンソールに出力しているだけです。
この時の、ツイート日時とidを覚えておき、最後に出力します。
System.out.println();
System.out.println("===== collected time / id");
collectIds.forEach(System.out::println);
Consumer Group内で、重複していないか見るためですが、もちろん1分しか動作しない前提でこういうのは置いています。
自作のJSON Deserializer。
simple-console-consumer/src/main/java/org/littlewings/kafka/JacksonJsonDeserializer.java
package org.littlewings.kafka; import java.io.IOException; import java.util.Map; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.serialization.Deserializer; public class JacksonJsonDeserializer<T> implements Deserializer<T> { ObjectMapper objectMapper = new ObjectMapper(); Class<T> type; @SuppressWarnings("unchecked") @Override public void configure(Map<String, ?> configs, boolean isKey) { // always value deserializer type = (Class<T>) configs.get("jackson.json.deserializer.type"); } @Override public T deserialize(String topic, byte[] data) { if (data == null) { return null; } try { return objectMapper.readValue(data, type); } catch (IOException e) { throw new SerializationException(e); } } @Override public void close() { // no-op } }
KafkaConsumerを作成する時に、Deserializerをコンストラクタで渡さず、Propertiesにクラスを入れることでconfigureメソッドが呼び出されます。
@SuppressWarnings("unchecked") @Override public void configure(Map<String, ?> configs, boolean isKey) { // always value deserializer type = (Class<T>) configs.get("jackson.json.deserializer.type"); }
今回は、ここでデシリアライズするクラスを受け取ることにしました。なお、isKeyはキーのシリアライズ向けかどうかを表すのですが、今回は
値用で固定としたので単に無視することに。
Consumerその2
もうひとつのConsumerアプリケーション。こちらは、形態素解析します。
tokenize-console-consumer/src/main/java/kafka/TokenizeConsoleConsumer.java
package kafka; import java.io.IOException; import java.io.UncheckedIOException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Arrays; import java.util.Date; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.analysis.TokenStream; import org.apache.lucene.analysis.tokenattributes.CharTermAttribute; import org.codelibs.neologd.ipadic.lucene.analysis.ja.JapaneseAnalyzer; import org.codelibs.neologd.ipadic.lucene.analysis.ja.JapaneseTokenizer; import org.codelibs.neologd.ipadic.lucene.analysis.ja.tokenattributes.PartOfSpeechAttribute; public class TokenizeConsoleConsumer { Analyzer analyzer = new JapaneseAnalyzer( null, JapaneseTokenizer.Mode.NORMAL, JapaneseAnalyzer.getDefaultStopSet(), JapaneseAnalyzer.getDefaultStopTags() ); public static void main(String... args) { TokenizeConsoleConsumer consoleConsumer = new TokenizeConsoleConsumer(); consoleConsumer.consuming(); } public void consuming() { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.21.0.3:9092,172.21.0.4:9092,172.21.0.5:9092"); properties.put(ConsumerConfig.GROUP_ID_CONFIG, "tokenize-console-consumer"); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JacksonJsonDeserializer.class); properties.put("jackson.json.deserializer.type", LinkedHashMap.class); try (KafkaConsumer<String, Map<String, Object>> consumer = new KafkaConsumer<>(properties)) { consumer.subscribe(Arrays.asList("tweet-topic")); Set<String> collectIds = new TreeSet<>(); long startTime = System.nanoTime(); while (true) { ConsumerRecords<String, Map<String, Object>> records = consumer.poll(1000L); records.forEach(record -> { Map<String, Object> value = record.value(); List<String> values = Arrays.asList( "id=" + value.get("id"), "createdAt=" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date((long) value.get("createdAt"))) // "screenName=" + value.get("screenName"), // "name=" + value.get("name"), // "text=" + value.get("text") ); List<String> tokens = filterNoun((String) value.get("text")); if (!tokens.isEmpty()) { System.out.println("=== received record: "); System.out.println( values .stream() .collect(Collectors.joining( System.lineSeparator() + " ", " ", " "))); System.out.println(" tokens:"); System.out.println( filterNoun((String) value.get("text")) .stream() .collect(Collectors.joining( System.lineSeparator() + " ", " ", " ")) ); collectIds.add( new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") .format(new Date((long) value.get("createdAt"))) + " / " + value.get("id") ); } }); long elapsedTime = System.nanoTime() - startTime; if (TimeUnit.MINUTES.convert(elapsedTime, TimeUnit.NANOSECONDS) >= 1L) { break; } } System.out.println(); System.out.println("===== collected time / id"); collectIds.forEach(System.out::println); } } List<String> filterNoun(String text) { try (TokenStream stream = analyzer.tokenStream("", text)) { List<String> tokens = new ArrayList<>(); PartOfSpeechAttribute partOfSpeechAttr = stream.addAttribute(PartOfSpeechAttribute.class); CharTermAttribute charTermAttr = stream.addAttribute(CharTermAttribute.class); stream.reset(); while (stream.incrementToken()) { String token = charTermAttr.toString(); String partOfSpeech = partOfSpeechAttr.getPartOfSpeech(); if (partOfSpeech.contains("名詞") && !token.replaceAll("[a-zA-Z0-9'/: ]+", "").isEmpty() && token.length() >= 3) { tokens.add(token + " - " + partOfSpeech); } } stream.end(); return tokens; } catch (IOException e) { throw new UncheckedIOException(e); } } }
設定自体は、先ほどのConsumerと同じなので割愛。
あ、Consumer Groupは「tokenize-console-consumer」としました。
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "tokenize-console-consumer");
こちらのConsumerでは、形態素解析して適当にフィルタリング(名詞、英数字のみのものは極力省く、単語が3文字以上)したものを集めて
while (stream.incrementToken()) { String token = charTermAttr.toString(); String partOfSpeech = partOfSpeechAttr.getPartOfSpeech(); if (partOfSpeech.contains("名詞") && !token.replaceAll("[a-zA-Z0-9'/: ]+", "").isEmpty() && token.length() >= 3) { tokens.add(token + " - " + partOfSpeech); } }
トークンが残ればそれをコンソール出力するようにしました。
List<String> tokens = filterNoun((String) value.get("text")); if (!tokens.isEmpty()) { System.out.println("=== received record: "); System.out.println( values .stream() .collect(Collectors.joining( System.lineSeparator() + " ", " ", " "))); System.out.println(" tokens:"); System.out.println( filterNoun((String) value.get("text")) .stream() .collect(Collectors.joining( System.lineSeparator() + " ", " ", " ")) );
JSONのDeserializerは先ほどと同じなので、割愛。
ここまでで、アプリケーションの実装はおしまいです。
動作確認
それでは、動作確認してみます。
$ mvn package
実行。
## Producer $ java -jar twitter-producer/target/twitter-producer-0.0.1-SNAPSHOT.jar ## Consumer(Simple) - 1 $ java -jar simple-console-consumer/target/simple-console-consumer-0.0.1-SNAPSHOT.jar ## Consumer(Simple) - 2 $ java -jar simple-console-consumer/target/simple-console-consumer-0.0.1-SNAPSHOT.jar ## Consumer(Tokenize) - 1 $ java -jar simple-console-consumer/target/simple-console-consumer-0.0.1-SNAPSHOT.jar ## Consumer(Tokenize) - 2 $ java -jar simple-console-consumer/target/simple-console-consumer-0.0.1-SNAPSHOT.jar
どのアプリケーションも、1分間動き続けて終了します。
Producer。
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. [Mon Jul 24 23:12:26 JST 2017]Establishing connection. [Mon Jul 24 23:12:28 JST 2017]Connection established. [Mon Jul 24 23:12:28 JST 2017]Receiving status stream.
Consumer(Simple)側。2つインスタンスがありますが、それぞれこんな感じで出力されます。
=== received record: id=889488530648440832 createdAt=2017-07-24 23:12:46 text=♪ せせらぎ - 遊佐未森(:せせらぎ) #nowplaying https://t.co/BvJ6VAfLb4 === received record: id=889488531311218692 createdAt=2017-07-24 23:12:46 text="What Are You On?" by Roy Woods #nowplaying #beats1 https://t.co/aBG30To1AE https://t.co/eKBIAyRSRm === received record: id=889488534415056898 createdAt=2017-07-24 23:12:47 text=https://t.co/wutU83dBXk #nowplaying #enreproduccion Shaggy Feat. Pitbull & Gene Noble - "Only Love" #RadioDance === received record: id=889488536541573121 createdAt=2017-07-24 23:12:47 text=[soundtrack] - the sweetest thing - lauryn hi https://t.co/aSirRWgt9e #nowplaying #listenlive === received record: id=889488536617050113 createdAt=2017-07-24 23:12:47 text=#ToyotaofBoerne is #nowplaying Small Y'All (Duet with George Jones) by #KennyChesney #cubevenue === received record: id=889488539448037376 createdAt=2017-07-24 23:12:48 text=#Nowplaying 『ダッシュ!ポケモンガオーレ!』 - 高取ヒデアキ/池田彩/小林竜之 (【公式】ポケモンガオーレ) 〜省略〜
最後に、ツイートした時間とidのペアが出力されます。2つのインスタンスで、同じidのものは受け取っていないはずです。
===== collected time / id 2017-07-24 23:12:31 / 889488466928476161 2017-07-24 23:12:31 / 889488467813519360 2017-07-24 23:12:32 / 889488474918801409 2017-07-24 23:12:33 / 889488475782828034 2017-07-24 23:12:33 / 889488478236426240 2017-07-24 23:12:33 / 889488478638964738 2017-07-24 23:12:35 / 889488485811224577 2017-07-24 23:12:35 / 889488486620950528 2017-07-24 23:12:35 / 889488487195521024 2017-07-24 23:12:36 / 889488489997361157 2017-07-24 23:12:37 / 889488493281456129 2017-07-24 23:12:37 / 889488494913097729 2017-07-24 23:12:38 / 889488496016187396 2017-07-24 23:12:38 / 889488497869860867 〜省略〜
続いて、Consumer(Tokenize)側。
=== received record: id=889488519697014785 createdAt=2017-07-24 23:12:43 tokens: サザーランド - 名詞-固有名詞-人名-姓 スクリーン - 名詞-一般 === received record: id=889488530648440832 createdAt=2017-07-24 23:12:46 tokens: せせらぎ - 名詞-一般 遊佐未森 - 名詞-固有名詞-人名-一般 せせらぎ - 名詞-一般 === received record: id=889488539448037376 createdAt=2017-07-24 23:12:48 tokens: ダッシュ - 名詞-一般 ポケモンガオーレ - 名詞-固有名詞-一般 高取ヒデアキ - 名詞-固有名詞-一般 小林竜之 - 名詞-固有名詞-一般 ポケモンガオーレ - 名詞-固有名詞-一般 === received record: id=889488549820604418 createdAt=2017-07-24 23:12:50 tokens: monday満ちる - 名詞-固有名詞-人名-一般 〜省略〜
同じく、ツイートした時間とidのペア。同じアプリケーション内で同じidは出ていないと思いますが、Consumer Groupが異なるアプリケーションとは、
どちらかのインスタンスが同じidを出力していると思います。
===== collected time / id 2017-07-24 23:12:33 / 889488478638964738 2017-07-24 23:12:35 / 889488485811224577 2017-07-24 23:12:37 / 889488494913097729 2017-07-24 23:12:38 / 889488497869860867 2017-07-24 23:12:38 / 889488499631468544 2017-07-24 23:12:39 / 889488501242183681 2017-07-24 23:12:41 / 889488512097034241 2017-07-24 23:12:41 / 889488512143196161 2017-07-24 23:12:41 / 889488512281530368 2017-07-24 23:12:42 / 889488513774891008 2017-07-24 23:12:42 / 889488514848612353 2017-07-24 23:12:43 / 889488520456400896 〜省略〜
こちらは、フィルタリングしているのでだいぶ数が少ないですけれど。
まとめ
というわけで、同じConsumer Groupに属するアプリケーションにはひとつのConsumerインスタンスにメッセージが届き、別のConsumer Groupであれば
それぞれメッセージを受信できることが確認できましたよ、と。