CLOVER🍀

That was when it all began.

Apache KafkaのConsumer Groupを試す

Apache Kafkaには、Consumer Groupという概念があります。

Introduction / Consumers

Consumerにラベル付けをすることで、Consumerをグルーピングします。こうすることで、Brokerから配信されるレコードが、そのConsumer Group内の
ひとつのConsumerに配信されるようになります。

上記のドキュメントのように、2つのConsumer Groupがあった場合は、2つのConsumer Group内のどれかひとつずつのConsumerインスタンス
レコードを受け取ります。

仮に全Consumerが同じConsumer Groupに属して入れば単純に負荷分散、全Consumerが異なるConsumer Groupであれば
ブロードキャストのような扱いになります。

参考)
Java Clientで入門する Apache Kafka #jjug_ccc #ccc_e2

Apache Kafkaに入門した | SOTA

Kafka のトピック・パーティションの決め方を考察する - uchan note

というわけで、ちょっと種類の違うConsumer Groupをつけたアプリケーションを用意して、どんなものか見てみましょう。

構成と準備

確認する構成は、以下とします。

  • Apache ZooKeeper × … 172.21.0.2
  • Apache Kafka Broker × 3 … 172.21.0.3〜5
  • Producerアプリケーション × 1 … ローカル
  • Consumerアプリケーションその1 × 2 … ローカル
  • Consumerアプリケーションその2 × 2 … ローカル

Apache ZooKeeperおよびApache Kafkaは、起動済み&クラスタ構成済みとします。

Topicは、以下のようにPartition数6、replication-factorは2で作成します。

$ bin/kafka-topics.sh --create --zookeeper 172.21.0.2:2181 --replication-factor 2 --partitions 6 --topic tweet-topic

各Producer/Cosumerアプリケーションは、Mavenのマルチプロジェクト構成とします。アプリケーションは、以下の構成とします。

Producerは、Twitter4jを使って「#nowplaying」ハッシュタグのあるツイートを受信してApache Kafkaに放り込みます。
Consumerは、片方はProducerが作成したレコードを単純に標準出力に、もう片方はApache Lucene Kuromojiで形態素解析して
ある条件に合致すれば標準出力に書き出す、そういったアプリケーションにします。

まず、親pom.xmlはこんな感じで。

    <modules>
        <module>twitter-producer</module>
        <module>simple-console-consumer</module>
        <module>tokenize-console-consumer</module>
    </modules>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
    </properties>

    <build>
        <pluginManagement>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                    <version>1.5.4.RELEASE</version>
                    <executions>
                        <execution>
                            <goals>
                                <goal>repackage</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </pluginManagement>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

Uber JARにするのに、Spring BootのMavenプラグインを使うことにしました。

ProducerのMaven依存関係。
twitter-producer/pom.xml

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.11.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.twitter4j</groupId>
            <artifactId>twitter4j-stream</artifactId>
            <version>4.0.6</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.8.9</version>
        </dependency>

Twitter4jを入れているのと、Apache KafkaにデータをJSONで入れようと思ったのでJacksonを加えています。

Consumerその1のMaven依存関係。
simple-console-consumer/pom.xml

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.11.0.0</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.8.9</version>
        </dependency>

こちらは、Apache Kafkaのクライアント以外はJacksonが入っているだけです。

Consumerその2。
tokenize-console-consumer/pom.xml

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.11.0.0</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.8.9</version>
        </dependency>
        <dependency>
            <groupId>org.codelibs</groupId>
            <artifactId>lucene-analyzers-kuromoji-ipadic-neologd</artifactId>
            <version>6.6.0-20170703</version>
        </dependency>
    </dependencies>

    <repositories>
        <repository>
            <id>codelibs.org</id>
            <name>CodeLibs Repository</name>
            <url>http://maven.codelibs.org/</url>
        </repository>
    </repositories>

先ほどのConsumerに加えて、Apache Lucene Kuromoji+mecab-ipadic-NEologdを追加します。

Apache Lucene Kuromoji+mecab-ipadic-NEologdは、CodeLibsのものをお借りしました。

Producerが使用する、twitter4j.propertiesは省略します。

ここまでで、だいたい準備が終わったので、アプリケーションを作っていきます。

Producer

では、まずはTwitterから「#nowplaying」ハッシュタグでストリームを取得して、Apache Kafkaに放り込むアプリケーションを書きます。
twitter-producer/src/main/java/org/littlewings/kafka/TwitterStreamProducer.java

package org.littlewings.kafka;

import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import twitter4j.StallWarning;
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;
import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;

public class TwitterStreamProducer {
    public static void main(String... args) {
        TwitterStreamProducer streamProducer = new TwitterStreamProducer();
        streamProducer.streaming();
    }

    public void streaming() {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.21.0.3:9092,172.21.0.4:9092,172.21.0.5:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JacksonJsonSerializer.class);

        try (KafkaProducer<String, Map<String, Object>> producer = new KafkaProducer<>(properties)) {
            TwitterStream twitterStream = new TwitterStreamFactory().getInstance();
            twitterStream.addListener(new StatusListener() {
                @Override
                public void onStatus(Status status) {
                    Map<String, Object> tweet = new LinkedHashMap<>();
                    tweet.put("id", status.getId());
                    tweet.put("screenName", status.getUser().getScreenName());
                    tweet.put("name", status.getUser().getName());
                    tweet.put("text", status.getText());
                    tweet.put("createdAt", status.getCreatedAt());

                    String key = status.getUser().getScreenName();
                    Map<String, Object> value = tweet;

                    producer.send(new ProducerRecord<>("tweet-topic", key, value));
                }

                @Override
                public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {
                }

                @Override
                public void onTrackLimitationNotice(int numberOfLimitedStatuses) {
                }

                @Override
                public void onScrubGeo(long userId, long upToStatusId) {
                }

                @Override
                public void onStallWarning(StallWarning warning) {
                }

                @Override
                public void onException(Exception e) {
                    e.printStackTrace();
                }
            });
            twitterStream.filter("#nowplaying");

            try {
                TimeUnit.MINUTES.sleep(1L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            twitterStream.clearListeners();
            twitterStream.cleanUp();
        }
    }
}

3台のApache KafkaのBrokerと、Serializer/Deserializerの設定をしてKafkaProducerを作成します。

        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.21.0.3:9092,172.21.0.4:9092,172.21.0.5:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JacksonJsonSerializer.class);

        try (KafkaProducer<String, Map<String, Object>> producer = new KafkaProducer<>(properties)) {

ツイートを受信したら、その内容を取得してApache Kafkaに登録するように実装。

            TwitterStream twitterStream = new TwitterStreamFactory().getInstance();
            twitterStream.addListener(new StatusListener() {
                @Override
                public void onStatus(Status status) {
                    Map<String, Object> tweet = new LinkedHashMap<>();
                    tweet.put("id", status.getId());
                    tweet.put("screenName", status.getUser().getScreenName());
                    tweet.put("name", status.getUser().getName());
                    tweet.put("text", status.getText());
                    tweet.put("createdAt", status.getCreatedAt());

                    String key = status.getUser().getScreenName();
                    Map<String, Object> value = tweet;

                    producer.send(new ProducerRecord<>("tweet-topic", key, value));
                }

絞り込みのフィルタを設定して、1分間動かして、それから終了という構成にします。

            twitterStream.filter("#nowplaying");

            try {
                TimeUnit.MINUTES.sleep(1L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            twitterStream.clearListeners();
            twitterStream.cleanUp();

ところで、このJSONのSerializerは自作です。

        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JacksonJsonSerializer.class);

こんな感じで作成。
twitter-producer/src/main/java/org/littlewings/kafka/JacksonJsonSerializer.java

package org.littlewings.kafka;

import java.util.Map;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;

public class JacksonJsonSerializer<T> implements Serializer<T> {
    ObjectMapper objectMapper = new ObjectMapper();

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        // no-op
    }

    @Override
    public byte[] serialize(String topic, T data) {
        if (data == null) {
            return null;
        }

        try {
            return objectMapper.writeValueAsBytes(data);
        } catch (JsonProcessingException e) {
            throw new SerializationException(e);
        }
    }

    @Override
    public void close() {
        // no-op
    }
}

こちらのProducerで使うJSONのSerializer、また後でConsumerで使うJSONのDeserializerは、以下を参考にしました。
https://github.com/confluentinc/schema-registry/tree/v3.2.2/json-serializer/src/main/java/io/confluent/kafka/serializers
https://github.com/confluentinc/schema-registry/blob/v3.2.2/json-serializer/src/main/java/io/confluent/kafka/serializers/KafkaJsonSerializer.java
https://github.com/confluentinc/schema-registry/blob/v3.2.2/json-serializer/src/main/java/io/confluent/kafka/serializers/KafkaJsonDeserializer.java

Consumerその1

続いて、最初のConsumerを作成します。こちらは、ProducerがBrokerに登録したレコードを、若干整形してコンソール出力するものとしましょう。
simple-console-consumer/src/main/java/org/littlewings/kafka/SimpleConsoleConsumer.java

package org.littlewings.kafka;

import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

public class SimpleConsoleConsumer {
    public static void main(String... args) {
        SimpleConsoleConsumer consoleConsumer = new SimpleConsoleConsumer();
        consoleConsumer.consuming();
    }

    public void consuming() {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.21.0.3:9092,172.21.0.4:9092,172.21.0.5:9092");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "simple-console-consumer");
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JacksonJsonDeserializer.class);
        properties.put("jackson.json.deserializer.type", LinkedHashMap.class);

        try (KafkaConsumer<String, Map<String, Object>> consumer =
                     new KafkaConsumer<>(properties)) {
            consumer.subscribe(Arrays.asList("tweet-topic"));

            Set<String> collectIds = new TreeSet<>();

            long startTime = System.nanoTime();

            while (true) {
                ConsumerRecords<String, Map<String, Object>> records = consumer.poll(1000L);

                records.forEach(record -> {
                    Map<String, Object> value = record.value();
                    List<String> values =
                            Arrays.asList(
                                    "id=" + value.get("id"),
                                    "createdAt=" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date((long) value.get("createdAt"))),
                                    // "screenName=" + value.get("screenName"),
                                    // "name=" + value.get("name"),
                                    "text=" + value.get("text")
                            );

                    System.out.println("=== received record: ");
                    System.out.println(
                            values
                                    .stream()
                                    .collect(Collectors.joining(
                                            System.lineSeparator() + "  ",
                                            "  ",
                                            "  ")));

                    collectIds.add(
                            new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
                                    .format(new Date((long) value.get("createdAt"))) + " / " + value.get("id")
                    );
                });

                long elapsedTime = System.nanoTime() - startTime;

                if (TimeUnit.MINUTES.convert(elapsedTime, TimeUnit.NANOSECONDS) >= 1L) {
                    break;
                }
            }

            System.out.println();
            System.out.println("===== collected time / id");
            collectIds.forEach(System.out::println);
        }
    }
}

BrokerやSerializer/Deserializerを指定して、KafkaConsumerを作成します。

        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.21.0.3:9092,172.21.0.4:9092,172.21.0.5:9092");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "simple-console-consumer");
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JacksonJsonDeserializer.class);
        properties.put("jackson.json.deserializer.type", LinkedHashMap.class);

        try (KafkaConsumer<String, Map<String, Object>> consumer =
                     new KafkaConsumer<>(properties)) {
            consumer.subscribe(Arrays.asList("tweet-topic"));

こちらのConsumer Groupは、「simple-console-consumer」としました。

        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "simple-console-consumer");

あと、以下の1文は、自作のDeserializer向けの設定です。

        properties.put("jackson.json.deserializer.type", LinkedHashMap.class);

本体の動作自体は、1分間、受け取ったレコードをコンソールに出力しているだけです。

この時の、ツイート日時とidを覚えておき、最後に出力します。

            System.out.println();
            System.out.println("===== collected time / id");
            collectIds.forEach(System.out::println);

Consumer Group内で、重複していないか見るためですが、もちろん1分しか動作しない前提でこういうのは置いています。

自作のJSON Deserializer。
simple-console-consumer/src/main/java/org/littlewings/kafka/JacksonJsonDeserializer.java

package org.littlewings.kafka;

import java.io.IOException;
import java.util.Map;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;

public class JacksonJsonDeserializer<T> implements Deserializer<T> {
    ObjectMapper objectMapper = new ObjectMapper();
    Class<T> type;

    @SuppressWarnings("unchecked")
    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        // always value deserializer
        type = (Class<T>) configs.get("jackson.json.deserializer.type");
    }

    @Override
    public T deserialize(String topic, byte[] data) {
        if (data == null) {
            return null;
        }

        try {
            return objectMapper.readValue(data, type);
        } catch (IOException e) {
            throw new SerializationException(e);
        }
    }

    @Override
    public void close() {
        // no-op
    }
}

KafkaConsumerを作成する時に、Deserializerをコンストラクタで渡さず、Propertiesにクラスを入れることでconfigureメソッドが呼び出されます。

    @SuppressWarnings("unchecked")
    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        // always value deserializer
        type = (Class<T>) configs.get("jackson.json.deserializer.type");
    }

今回は、ここでデシリアライズするクラスを受け取ることにしました。なお、isKeyはキーのシリアライズ向けかどうかを表すのですが、今回は
値用で固定としたので単に無視することに。

Consumerその2

もうひとつのConsumerアプリケーション。こちらは、形態素解析します。
tokenize-console-consumer/src/main/java/kafka/TokenizeConsoleConsumer.java

package kafka;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
import org.codelibs.neologd.ipadic.lucene.analysis.ja.JapaneseAnalyzer;
import org.codelibs.neologd.ipadic.lucene.analysis.ja.JapaneseTokenizer;
import org.codelibs.neologd.ipadic.lucene.analysis.ja.tokenattributes.PartOfSpeechAttribute;

public class TokenizeConsoleConsumer {
    Analyzer analyzer =
            new JapaneseAnalyzer(
                    null,
                    JapaneseTokenizer.Mode.NORMAL,
                    JapaneseAnalyzer.getDefaultStopSet(),
                    JapaneseAnalyzer.getDefaultStopTags()
            );

    public static void main(String... args) {
        TokenizeConsoleConsumer consoleConsumer = new TokenizeConsoleConsumer();
        consoleConsumer.consuming();
    }

    public void consuming() {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.21.0.3:9092,172.21.0.4:9092,172.21.0.5:9092");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "tokenize-console-consumer");
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JacksonJsonDeserializer.class);
        properties.put("jackson.json.deserializer.type", LinkedHashMap.class);

        try (KafkaConsumer<String, Map<String, Object>> consumer =
                     new KafkaConsumer<>(properties)) {
            consumer.subscribe(Arrays.asList("tweet-topic"));

            Set<String> collectIds = new TreeSet<>();

            long startTime = System.nanoTime();

            while (true) {
                ConsumerRecords<String, Map<String, Object>> records = consumer.poll(1000L);

                records.forEach(record -> {
                    Map<String, Object> value = record.value();
                    List<String> values =
                            Arrays.asList(
                                    "id=" + value.get("id"),
                                    "createdAt=" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date((long) value.get("createdAt")))
                                    // "screenName=" + value.get("screenName"),
                                    // "name=" + value.get("name"),
                                    // "text=" + value.get("text")
                            );

                    List<String> tokens = filterNoun((String) value.get("text"));
                    if (!tokens.isEmpty()) {
                        System.out.println("=== received record: ");
                        System.out.println(
                                values
                                        .stream()
                                        .collect(Collectors.joining(
                                                System.lineSeparator() + "  ",
                                                "  ",
                                                "  ")));
                        System.out.println("  tokens:");
                        System.out.println(
                                filterNoun((String) value.get("text"))
                                        .stream()
                                        .collect(Collectors.joining(
                                                System.lineSeparator() + "    ",
                                                "    ",
                                                "    "))
                        );

                        collectIds.add(
                                new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
                                        .format(new Date((long) value.get("createdAt"))) + " / " + value.get("id")
                        );
                    }
                });

                long elapsedTime = System.nanoTime() - startTime;

                if (TimeUnit.MINUTES.convert(elapsedTime, TimeUnit.NANOSECONDS) >= 1L) {
                    break;
                }
            }

            System.out.println();
            System.out.println("===== collected time / id");
            collectIds.forEach(System.out::println);
        }
    }

    List<String> filterNoun(String text) {
        try (TokenStream stream = analyzer.tokenStream("", text)) {
            List<String> tokens = new ArrayList<>();

            PartOfSpeechAttribute partOfSpeechAttr = stream.addAttribute(PartOfSpeechAttribute.class);
            CharTermAttribute charTermAttr = stream.addAttribute(CharTermAttribute.class);

            stream.reset();

            while (stream.incrementToken()) {
                String token = charTermAttr.toString();
                String partOfSpeech = partOfSpeechAttr.getPartOfSpeech();

                if (partOfSpeech.contains("名詞") &&
                        !token.replaceAll("[a-zA-Z0-9'/: ]+", "").isEmpty() &&
                        token.length() >= 3) {
                    tokens.add(token + " - " + partOfSpeech);
                }
            }

            stream.end();

            return tokens;
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }
}

設定自体は、先ほどのConsumerと同じなので割愛。

あ、Consumer Groupは「tokenize-console-consumer」としました。

        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "tokenize-console-consumer");

こちらのConsumerでは、形態素解析して適当にフィルタリング(名詞、英数字のみのものは極力省く、単語が3文字以上)したものを集めて

            while (stream.incrementToken()) {
                String token = charTermAttr.toString();
                String partOfSpeech = partOfSpeechAttr.getPartOfSpeech();

                if (partOfSpeech.contains("名詞") &&
                        !token.replaceAll("[a-zA-Z0-9'/: ]+", "").isEmpty() &&
                        token.length() >= 3) {
                    tokens.add(token + " - " + partOfSpeech);
                }
            }

トークンが残ればそれをコンソール出力するようにしました。

                    List<String> tokens = filterNoun((String) value.get("text"));
                    if (!tokens.isEmpty()) {
                        System.out.println("=== received record: ");
                        System.out.println(
                                values
                                        .stream()
                                        .collect(Collectors.joining(
                                                System.lineSeparator() + "  ",
                                                "  ",
                                                "  ")));
                        System.out.println("  tokens:");
                        System.out.println(
                                filterNoun((String) value.get("text"))
                                        .stream()
                                        .collect(Collectors.joining(
                                                System.lineSeparator() + "    ",
                                                "    ",
                                                "    "))
                        );

JSONのDeserializerは先ほどと同じなので、割愛。

ここまでで、アプリケーションの実装はおしまいです。

動作確認

それでは、動作確認してみます。

$ mvn package

実行。

## Producer
$ java -jar twitter-producer/target/twitter-producer-0.0.1-SNAPSHOT.jar

## Consumer(Simple) - 1
$ java -jar simple-console-consumer/target/simple-console-consumer-0.0.1-SNAPSHOT.jar

## Consumer(Simple) - 2
$ java -jar simple-console-consumer/target/simple-console-consumer-0.0.1-SNAPSHOT.jar

## Consumer(Tokenize) - 1
$ java -jar simple-console-consumer/target/simple-console-consumer-0.0.1-SNAPSHOT.jar

## Consumer(Tokenize) - 2
$ java -jar simple-console-consumer/target/simple-console-consumer-0.0.1-SNAPSHOT.jar

どのアプリケーションも、1分間動き続けて終了します。

Producer。

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
[Mon Jul 24 23:12:26 JST 2017]Establishing connection.
[Mon Jul 24 23:12:28 JST 2017]Connection established.
[Mon Jul 24 23:12:28 JST 2017]Receiving status stream.

Consumer(Simple)側。2つインスタンスがありますが、それぞれこんな感じで出力されます。

=== received record: 
  id=889488530648440832
  createdAt=2017-07-24 23:12:46
  text=♪ せせらぎ - 遊佐未森(:せせらぎ) #nowplaying https://t.co/BvJ6VAfLb4  
=== received record: 
  id=889488531311218692
  createdAt=2017-07-24 23:12:46
  text="What Are You On?" by Roy Woods #nowplaying #beats1 https://t.co/aBG30To1AE https://t.co/eKBIAyRSRm  
=== received record: 
  id=889488534415056898
  createdAt=2017-07-24 23:12:47
  text=https://t.co/wutU83dBXk #nowplaying #enreproduccion Shaggy Feat. Pitbull & Gene Noble - "Only Love" #RadioDance  
=== received record: 
  id=889488536541573121
  createdAt=2017-07-24 23:12:47
  text=[soundtrack] - the sweetest thing - lauryn hi https://t.co/aSirRWgt9e #nowplaying #listenlive  
=== received record: 
  id=889488536617050113
  createdAt=2017-07-24 23:12:47
  text=#ToyotaofBoerne is #nowplaying Small Y'All (Duet with George Jones) by #KennyChesney #cubevenue  
=== received record: 
  id=889488539448037376
  createdAt=2017-07-24 23:12:48
  text=#Nowplaying 『ダッシュ!ポケモンガオーレ!』 - 高取ヒデアキ/池田彩/小林竜之 (【公式】ポケモンガオーレ) 

〜省略〜

最後に、ツイートした時間とidのペアが出力されます。2つのインスタンスで、同じidのものは受け取っていないはずです。

===== collected time / id
2017-07-24 23:12:31 / 889488466928476161
2017-07-24 23:12:31 / 889488467813519360
2017-07-24 23:12:32 / 889488474918801409
2017-07-24 23:12:33 / 889488475782828034
2017-07-24 23:12:33 / 889488478236426240
2017-07-24 23:12:33 / 889488478638964738
2017-07-24 23:12:35 / 889488485811224577
2017-07-24 23:12:35 / 889488486620950528
2017-07-24 23:12:35 / 889488487195521024
2017-07-24 23:12:36 / 889488489997361157
2017-07-24 23:12:37 / 889488493281456129
2017-07-24 23:12:37 / 889488494913097729
2017-07-24 23:12:38 / 889488496016187396
2017-07-24 23:12:38 / 889488497869860867

〜省略〜

続いて、Consumer(Tokenize)側。

=== received record: 
  id=889488519697014785
  createdAt=2017-07-24 23:12:43  
  tokens:
    サザーランド - 名詞-固有名詞-人名-姓
    スクリーン - 名詞-一般    
=== received record: 
  id=889488530648440832
  createdAt=2017-07-24 23:12:46  
  tokens:
    せせらぎ - 名詞-一般
    遊佐未森 - 名詞-固有名詞-人名-一般
    せせらぎ - 名詞-一般    
=== received record: 
  id=889488539448037376
  createdAt=2017-07-24 23:12:48  
  tokens:
    ダッシュ - 名詞-一般
    ポケモンガオーレ - 名詞-固有名詞-一般
    高取ヒデアキ - 名詞-固有名詞-一般
    小林竜之 - 名詞-固有名詞-一般
    ポケモンガオーレ - 名詞-固有名詞-一般    
=== received record: 
  id=889488549820604418
  createdAt=2017-07-24 23:12:50  
  tokens:
    monday満ちる - 名詞-固有名詞-人名-一般

〜省略〜

同じく、ツイートした時間とidのペア。同じアプリケーション内で同じidは出ていないと思いますが、Consumer Groupが異なるアプリケーションとは、
どちらかのインスタンスが同じidを出力していると思います。

===== collected time / id
2017-07-24 23:12:33 / 889488478638964738
2017-07-24 23:12:35 / 889488485811224577
2017-07-24 23:12:37 / 889488494913097729
2017-07-24 23:12:38 / 889488497869860867
2017-07-24 23:12:38 / 889488499631468544
2017-07-24 23:12:39 / 889488501242183681
2017-07-24 23:12:41 / 889488512097034241
2017-07-24 23:12:41 / 889488512143196161
2017-07-24 23:12:41 / 889488512281530368
2017-07-24 23:12:42 / 889488513774891008
2017-07-24 23:12:42 / 889488514848612353
2017-07-24 23:12:43 / 889488520456400896

〜省略〜

こちらは、フィルタリングしているのでだいぶ数が少ないですけれど。

まとめ

というわけで、同じConsumer Groupに属するアプリケーションにはひとつのConsumerインスタンスにメッセージが届き、別のConsumer Groupであれば
それぞれメッセージを受信できることが確認できましたよ、と。