CLOVER🍀

That was when it all began.

InfinispanのDistributed Streams APIを使ってWordCount

最近、こういう記事があるのを見つけました。

Red Hat JBoss Data Grid 7 + JBoss BRMSで始めるリアルタイムビッグデータ | RED HAT OPENEYE -レッドハットの情報ポータル
Red Hat JBoss Data Grid 7 + JBoss BRMSで始めるリアルタイムビッグデータ | RED HAT OPENEYE -レッドハットの情報ポータル | Page 2

Infinispan…というより、JBoss Data Grid 7.0(のうち、Distributed Streams APIを使ったWordCount)の紹介となっていますが、見ていてちょっと気になるところなどあったので、自分でもちょっと試してみようかなと思って、少しアレンジしつつ書いてみました。

基本的にやることは変わりませんが、実行環境やライブラリは以下のように変更します。

やりたいこと自体は元の記事と同じで、対象の文書から名詞だけを抜き出してWordCountし、上位N個を抜き出します。ただ、元の記事のようにスケジュール起動ではなく、HTTPリクエストに応じてWordCountするものとします。Cacheの有効期限も特に設けません。

では、やってみましょう。

準備

まずは、Mavenの設定から。
pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.littlewings</groupId>
    <artifactId>embedded-distributed-streams-wordcount</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>embedded-distributed-streams-wordcount</name>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <java.version>1.8</java.version>
        <spring.boot.version>1.4.0.RELEASE</spring.boot.version>
    </properties>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>${spring.boot.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.infinispan</groupId>
            <artifactId>infinispan-core</artifactId>
            <version>8.2.3.Final</version>
        </dependency>

        <dependency>
            <groupId>org.codelibs</groupId>
            <artifactId>lucene-analyzers-kuromoji-ipadic-neologd</artifactId>
            <version>6.1.0-20160714</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <version>${spring.boot.version}</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>repackage</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

    <repositories>
        <repository>
            <id>codelibs.org</id>
            <name>CodeLibs Repository</name>
            <url>http://maven.codelibs.org/</url>
        </repository>
    </repositories>
</project>

初、Spring Boot 1.4です!

あとは、Infinispanのコアライブラリと、CodeLibsの提供するApache Lucene Kuromoji+mecab-ipadic-NEologdを足します。

Infinispanの設定

Infinispanの設定は、Distributed Cacheをひとつ用意します。
src/main/resources/infinispan.xml

<?xml version="1.0" encoding="UTF-8"?>
<infinispan
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="urn:infinispan:config:8.2 http://www.infinispan.org/schemas/infinispan-config-8.2.xsd"
        xmlns="urn:infinispan:config:8.2">
    <jgroups>
        <stack-file name="udp" path="default-configs/default-jgroups-udp.xml"/>
    </jgroups>

    <cache-container default-cache="wordsCache">
        <transport cluster="cluster" stack="udp"/>

        <distributed-cache name="wordsCache"/>
    </cache-container>
</infinispan>

クラスタリングに使うJGroupsの設定は、デフォルトのもの(UDP)を使用します。

形態素解析対象の文書

こちらのページの内容を、テキストファイルとして配置します。

$ head -n 2 src/main/resources/bocchan.txt 
親譲りの無鉄砲で小供の時から損ばかりしている。小学校に居る時分学校の二階から飛び降りて一週間ほど腰を抜かした事がある。なぜそんな無闇をしたと聞く人があるかも知れぬ。別段深い理由でもない。新築の二階から首を出していたら、同級生の一人が冗談に、いくら威張っても、そこから飛び降りる事は出来まい。弱虫やーい。と囃したからである。小使に負ぶさって帰って来た時、おやじが大きな眼をして二階ぐらいから飛び降りて腰を抜かす奴があるかと云ったから、この次は抜かさずに飛んで見せますと答えた。
親類のものから西洋製のナイフを貰って奇麗な刃を日に翳して、友達に見せていたら、一人が光る事は光るが切れそうもないと云った。切れぬ事があるか、何でも切ってみせると受け合った。そんなら君の指を切ってみろと注文したから、何だ指ぐらいこの通りだと右の手の親指の甲をはすに切り込んだ。幸ナイフが小さいのと、親指の骨が堅かったので、今だに親指は手に付いている。しかし創痕は死ぬまで消えぬ。
〜省略〜

これを読み込んで、InfinispanのCacheに加えるコードを書いて、WordCountすることになります。

アプリケーションを書く

それでは、アプリケーションを書いていきます。

まずはアプリケーションのエントリポイント。
src/main/java/org/littlewings/infinispan/wordcount/App.java

package org.littlewings.infinispan.wordcount;

import java.io.IOException;

import org.infinispan.manager.DefaultCacheManager;
import org.infinispan.manager.EmbeddedCacheManager;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
public class App {
    public static void main(String... args) {
        SpringApplication.run(App.class, args);
    }

    @Bean(destroyMethod = "stop")
    public EmbeddedCacheManager embeddedCacheManager() throws IOException {
        EmbeddedCacheManager cacheManager = new DefaultCacheManager("infinispan.xml");
        cacheManager.startCaches(cacheManager.getCacheNames().toArray(new String[cacheManager.getCacheNames().size()]));
        return cacheManager;
    }
}

InfinispanのEmbeddedCacheManagerを作成、Cacheを開始しつつBean定義をします。

また、Atilika Kuromojiの時はTokenという単語や品詞を持ったクラスが存在していたのですが、Apache Lucene Kuromojiを使う場合はそういったものがないので、今回は自作します。
src/main/java/org/littlewings/infinispan/wordcount/Token.java

package org.littlewings.infinispan.wordcount;

import java.io.Serializable;

public class Token implements Serializable {
    private static final long serialVersionUID = 1L;

    String value;
    String partOfSpeech;

    public Token(String value, String partOfSpeech) {
        this.value = value;
        this.partOfSpeech = partOfSpeech;
    }

    public String getValue() {
        return value;
    }

    public String getPartOfSpeech() {
        return partOfSpeech;
    }

    @Override
    public String toString() {
        return "token = " + value + ", partOfSpeech = " + partOfSpeech;
    }
}

では、WordCountするControllerを書きます。RestControllerとして作成します。

まずは大枠から。
src/main/java/org/littlewings/infinispan/wordcount/WordCount.java

package org.littlewings.infinispan.wordcount;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Serializable;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
import org.codelibs.neologd.ipadic.lucene.analysis.ja.JapaneseAnalyzer;
import org.codelibs.neologd.ipadic.lucene.analysis.ja.JapaneseTokenizer;
import org.codelibs.neologd.ipadic.lucene.analysis.ja.tokenattributes.PartOfSpeechAttribute;
import org.infinispan.Cache;
import org.infinispan.CacheStream;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.stream.CacheCollectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class WordCount implements Serializable {
    // Logback's Logger is Serializable
    Logger logger = LoggerFactory.getLogger(getClass());

    @Autowired
    transient EmbeddedCacheManager cacheManager;

    static final JapaneseAnalyzer analyzer =
            new JapaneseAnalyzer(null,
                    JapaneseTokenizer.Mode.NORMAL,
                    JapaneseAnalyzer.getDefaultStopSet(),
                    JapaneseAnalyzer.getDefaultStopTags());

    // 後で
}

Distributed Streams APIでLambdaを使って書いていく関係上、外側のクラスがSerializableになっています。

public class WordCount implements Serializable {

また、SerializableではないEmbeddedCacheManagerやLuceneのJapaneseAnalyzerは、それぞれtransientやstaticな変数としてこちらもシリアライズの対象外としています。

    @Autowired
    transient EmbeddedCacheManager cacheManager;

    static final JapaneseAnalyzer analyzer =
            new JapaneseAnalyzer(null,
                    JapaneseTokenizer.Mode.NORMAL,
                    JapaneseAnalyzer.getDefaultStopSet(),
                    JapaneseAnalyzer.getDefaultStopTags());

LogbackのLoggerはSerializableを実装していたので、問題ありませんでした。

というか、これらの対処をしないと分散実行時にシリアライズ関係のエラーになります…。

データロードを行う箇所。リクエストに応じて「坊ちゃん」のテキストを読み込むようにします。

    @GetMapping("load")
    public String load() {
        Cache<Integer, String> wordsCache = cacheManager.getCache("wordsCache");

        logger.info("start load words");

        try (InputStream is = Thread.currentThread().getContextClassLoader().getResourceAsStream("bocchan.txt");
             InputStreamReader isr = new InputStreamReader(is, StandardCharsets.UTF_8);
             BufferedReader reader = new BufferedReader(isr);
             Stream<String> lines = reader.lines()) {
            AtomicInteger currentLines = new AtomicInteger();
            lines.forEach(line -> wordsCache.put(currentLines.incrementAndGet(), line));
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }

        logger.info("end load words");

        return "Finish!!";
    }

そして、WordCountする箇所。基本的には、元の記事とほぼ同じような処理です。Luceneを使っていたり、ログ出力の関係上、ちょっと長めになっていますが…。

    @GetMapping("wordcount")
    public List<Map.Entry<String, Integer>> wordcount(@RequestParam(name = "limit", defaultValue = "10") int limit) {
        Cache<Integer, String> wordsCache = cacheManager.getCache("wordsCache");

        try (CacheStream<String> stream = wordsCache.values().parallelStream()) {
            logger.info("start wordcount");

            Map<String, Integer> collected =
                    stream
                            .timeout(30, TimeUnit.SECONDS)  // timeout
                            .flatMap((Function<String, Stream<Token>> & Serializable) line -> {
                                List<Token> tokens = new ArrayList<>();

                                try (TokenStream tokenStream = analyzer.tokenStream("", line)) {

                                    CharTermAttribute charTermAttribute = tokenStream.addAttribute(CharTermAttribute.class);
                                    PartOfSpeechAttribute partOfSpeechAttribute = tokenStream.addAttribute(PartOfSpeechAttribute.class);

                                    tokenStream.reset();
                                    while (tokenStream.incrementToken()) {
                                        tokens.add(new Token(charTermAttribute.toString(), partOfSpeechAttribute.getPartOfSpeech()));
                                        tokenStream.end();
                                    }
                                } catch (IOException e) {
                                    throw new UncheckedIOException(e);
                                }

                                logger.info("[{}] -FlatMap- line = {}, tokens = {}", Thread.currentThread().getName(), line, tokens);

                                return tokens.stream();
                            })
                            .filter((Predicate<Token> & Serializable) token -> {
                                logger.info("[{}] -Filter- token = {}, isPartOfSpeech = {}", Thread.currentThread().getName(), token, token.getPartOfSpeech().contains("名詞"));
                                return token.getPartOfSpeech().contains("名詞");
                            })
                            .map((Function<Token, String> & Serializable) token -> {
                                logger.info("[{}] -Map- token = {}", Thread.currentThread().getName(), token);
                                return token.getValue();
                            }).collect(
                            CacheCollectors.serializableCollector(() ->
                                    Collectors.groupingByConcurrent(s -> {
                                                logger.info("[{}] -Identity- word = {}", Thread.currentThread().getName(), s);
                                                return s;
                                            },
                                            ConcurrentHashMap::new,
                                            Collectors.reducing(0,
                                                    s -> {
                                                        logger.info("[{}] -Reduce-Map- {}", Thread.currentThread().getName(), s);
                                                        return 1;
                                                    },
                                                    (c1, c2) -> {
                                                        logger.info("[{}] -Reduce-Op- {}, {}", Thread.currentThread().getName(), c1, c2);
                                                        return c1 + c2;
                                                    }))));

            logger.info("end wordcount");

            logger.info("exract top - {}", limit);

            return collected
                    .entrySet()
                    .stream()
                    .sorted(Map.Entry.<String, Integer>comparingByValue().reversed())
                    .limit(limit)
                    .collect(Collectors.toList());
        } catch (Exception e) {
            logger.error("wordcount, fail", e);
            throw e;
        }
    }

ParallelStreamを使っているので、分散処理+各Nodeでの並列処理になります。

        try (CacheStream<String> stream = wordsCache.values().parallelStream()) {

その様子は、ログ出力して確認するようにしています。

また、元の記事に比べて交差型キャストが入っていて、ちょっとうるさくなっているのですが

                            .flatMap((Function<String, Stream<Token>> & Serializable) line -> {

                            .filter((Predicate<Token> & Serializable) token -> {

                            .map((Function<Token, String> & Serializable) token -> {

これは、JBoss Data Grid 7ではCacheStreamの操作にSerializableなFunctionなどを受け取るメソッドが追加されているからです。

https://access.redhat.com/documentation/en-US/Red_Hat_JBoss_Data_Grid/7.0/html/API_Documentation/files/api/org/infinispan/CacheStream.html

Infinispan 8.2の時点ではまだ入っていないので、9.0になるまではこういう記述が必要になります。

というか、商用版の方が機能が先に行っている…!

また、このデバッグ状態だとタイムアウトをちょっと伸ばしておかないと分散実行時にエラーになってしまうので、設定を入れています。

                            .timeout(30, TimeUnit.SECONDS)  // timeout

動作確認

それでは、動かしてみましょう。

パッケージング。

$ mvn package -DskipTests=true

起動。

$ java -jar target/embedded-distributed-streams-wordcount-0.0.1-SNAPSHOT.jar

データロード。

$ curl http://localhost:8080/load
Finish!!

それでは、WordCountしてみます。

$ time curl http://localhost:8080/wordcount
[{"おれ":469},{"事":282},{"人":193},{"君":182},{"赤シャツ":168},{"山嵐":155},{"一":152},{"何":143},{"二":115},{"方":112}]
real	0m33.293s
user	0m0.009s
sys	0m0.000s

だいたい30秒ほどかかりました。

ひとつ、Nodeを追加してみます。起動ポートはずらしています。

$ java -jar target/embedded-distributed-streams-wordcount-0.0.1-SNAPSHOT.jar --server.port=8180

これで、2 Nodeです。

リバランスも完了したところで

2016-08-07 19:22:16.712  INFO 24513 --- [e-thread--p2-t1] org.infinispan.CLUSTER                   : [Context=wordsCache][Scope=xxxxx-18204]ISPN100002: Started local rebalance
2016-08-07 19:22:16.732  INFO 24513 --- [t-thread--p4-t7] org.infinispan.CLUSTER                   : [Context=wordsCache][Scope=xxxxx-18204]ISPN100003: Finished local rebalance
2016-08-07 19:22:16.999  INFO 24513 --- [e-thread--p2-t1] org.infinispan.CLUSTER                   : [Context=wordsCache][Scope=xxxxx-63104]ISPN100003: Finished local rebalance

再度WordCount。

$ time curl http://localhost:8080/wordcount
[{"おれ":469},{"事":282},{"人":193},{"君":182},{"赤シャツ":168},{"山嵐":155},{"一":152},{"何":143},{"二":115},{"方":112}]
real	0m5.213s
user	0m0.004s
sys	0m0.005s

なんか、めちゃくちゃ速くなりました…。

さらにNode追加。

$ java -jar target/embedded-distributed-streams-wordcount-0.0.1-SNAPSHOT.jar --server.port=8280

リバランスが完了したのを見て

2016-08-07 19:25:14.367  INFO 24513 --- [e-thread--p2-t1] org.infinispan.CLUSTER                   : [Context=wordsCache][Scope=xxxxx-18204]ISPN100002: Started local rebalance
2016-08-07 19:25:14.369  INFO 24513 --- [t-thread--p4-t1] org.infinispan.CLUSTER                   : [Context=wordsCache][Scope=xxxxx-18204]ISPN100003: Finished local rebalance
2016-08-07 19:25:14.378  INFO 24513 --- [e-thread--p2-t1] org.infinispan.CLUSTER                   : [Context=wordsCache][Scope=xxxxx-63104]ISPN100003: Finished local rebalance
2016-08-07 19:25:14.616  INFO 24513 --- [e-thread--p2-t1] org.infinispan.CLUSTER                   : [Context=wordsCache][Scope=xxxxx-32030]ISPN100003: Finished local rebalance
2016-08-07 19:25:14.616  INFO 24513 --- [e-thread--p2-t1] org.infinispan.CLUSTER                   : ISPN000336: Finished cluster-wide rebalance for cache wordsCache, topology id = 7

WordCount。

$ time curl http://localhost:8080/wordcount
[{"おれ":469},{"事":282},{"人":193},{"君":182},{"赤シャツ":168},{"山嵐":155},{"一":152},{"何":143},{"二":115},{"方":112}]
real	0m4.077s
user	0m0.008s
sys	0m0.016s

実行ごとに多少結果がブレていたりしましたが、今回はそこまで速くはなりませんでした。まあ、PC1台だし…。

なお、実行の様子はこういうログで確認できます。
※かなり大量に出力されます

2016-08-07 19:27:18.265  INFO 24513 --- [onPool-worker-3] o.l.infinispan.wordcount.WordCount       : [ForkJoinPool.commonPool-worker-3] -FlatMap- line = 庭は十坪ほどの平庭で、これという植木もない。ただ一本の蜜柑があって、塀のそとから、目標になるほど高い。おれはうちへ帰ると、いつでもこの蜜柑を眺める。東京を出た事のないものには蜜柑の生っているところはすこぶる珍しいものだ。あの青い実がだんだん熟してきて、黄色になるんだろうが、定めて奇麗だろう。今でももう半分色の変ったのがある。婆さんに聞いてみると、すこぶる水気の多い、旨い蜜柑だそうだ。今に熟たら、たんと召し上がれと云ったから、毎日少しずつ食ってやろう。もう三週間もしたら、充分食えるだろう。まさか三週間以内にここを去る事もなかろう。, tokens = [token = 庭, partOfSpeech = 名詞-一般, token = 十, partOfSpeech = 名詞-数, token = 坪, partOfSpeech = 名詞-接尾-助数詞, token = 平, partOfSpeech = 接頭詞-名詞接続, token = 庭, partOfSpeech = 名詞-一般, token = 植木, partOfSpeech = 名詞-一般, token = ただ, partOfSpeech = 名詞-一般, token = 一, partOfSpeech = 名詞-数, token = 本, partOfSpeech = 名詞-接尾-助数詞, token = 蜜柑, partOfSpeech = 名詞-一般, token = 塀, partOfSpeech = 名詞-一般, token = そ, partOfSpeech = 名詞-特殊-助動詞語幹, token = 目標, partOfSpeech = 名詞-一般, token = 高い, partOfSpeech = 形容詞-自立, token = おれ, partOfSpeech = 名詞-代名詞-一般, token = 帰る, partOfSpeech = 動詞-自立, token = いつ, partOfSpeech = 名詞-代名詞-一般, token = 蜜柑, partOfSpeech = 名詞-一般, token = 眺める, partOfSpeech = 動詞-自立, token = 東京, partOfSpeech = 名詞-固有名詞-地域-一般, token = 出る, partOfSpeech = 動詞-自立, token = 事, partOfSpeech = 名詞-非自立-一般, token = 蜜柑, partOfSpeech = 名詞-一般, token = 生る, partOfSpeech = 動詞-自立, token = すこぶる, partOfSpeech = 副詞-一般, token = 珍しい, partOfSpeech = 形容詞-自立, token = あの, partOfSpeech = 連体詞, token = 青い, partOfSpeech = 形容詞-自立, token = 実, partOfSpeech = 名詞-一般, token = だんだん, partOfSpeech = 副詞-助詞類接続, token = 熟す, partOfSpeech = 動詞-自立, token = くる, partOfSpeech = 動詞-非自立, token = 黄色, partOfSpeech = 名詞-一般, token = 定める, partOfSpeech = 動詞-自立, token = 奇麗, partOfSpeech = 名詞-形容動詞語幹, token = 今, partOfSpeech = 名詞-副詞可能, token = もう半分, partOfSpeech = 名詞-固有名詞-一般, token = 色, partOfSpeech = 名詞-接尾-一般, token = 変る, partOfSpeech = 動詞-自立, token = 婆さん, partOfSpeech = 名詞-一般, token = 聞く, partOfSpeech = 動詞-自立, token = みる, partOfSpeech = 動詞-非自立, token = すこぶる, partOfSpeech = 副詞-一般, token = 水気, partOfSpeech = 名詞-一般, token = 多い, partOfSpeech = 形容詞-自立, token = 旨い, partOfSpeech = 形容詞-自立, token = 蜜柑, partOfSpeech = 名詞-一般, token = そう, partOfSpeech = 名詞-特殊-助動詞語幹, token = 今, partOfSpeech = 名詞-副詞可能, token = 熟, partOfSpeech = 名詞-サ変接続, token = たんと, partOfSpeech = 副詞-一般, token = 召し上がる, partOfSpeech = 動詞-自立, token = 云う, partOfSpeech = 動詞-自立, token = 毎日, partOfSpeech = 名詞-副詞可能, token = 少し, partOfSpeech = 副詞-助詞類接続, token = 食う, partOfSpeech = 動詞-自立, token = やる, partOfSpeech = 動詞-非自立, token = もう, partOfSpeech = 副詞-一般, token = 三, partOfSpeech = 名詞-数, token = 週間, partOfSpeech = 名詞-接尾-助数詞, token = 充分, partOfSpeech = 副詞-一般, token = 食える, partOfSpeech = 動詞-自立, token = まさか, partOfSpeech = 副詞-一般, token = 三, partOfSpeech = 名詞-数, token = 週間, partOfSpeech = 名詞-接尾-助数詞, token = 以内, partOfSpeech = 名詞-非自立-副詞可能, token = 去る, partOfSpeech = 動詞-自立, token = 事, partOfSpeech = 名詞-非自立-一般]
2016-08-07 19:27:18.265  INFO 24513 --- [nio-8080-exec-3] o.l.infinispan.wordcount.WordCount       : [http-nio-8080-exec-3] -Map- token = token = お湯, partOfSpeech = 名詞-一般
2016-08-07 19:27:18.265  INFO 24513 --- [onPool-worker-7] o.l.infinispan.wordcount.WordCount       : [ForkJoinPool.commonPool-worker-7] -Reduce-Op- 0, 1
2016-08-07 19:27:18.265  INFO 24513 --- [onPool-worker-0] o.l.infinispan.wordcount.WordCount       : [ForkJoinPool.commonPool-worker-0] -Filter- token = token = 金, partOfSpeech = 名詞-固有名詞-人名-姓, isPartOfSpeech = true
2016-08-07 19:27:18.265  INFO 24513 --- [onPool-worker-2] o.l.infinispan.wordcount.WordCount       : [ForkJoinPool.commonPool-worker-2] -Filter- token = token = 菓子, partOfSpeech = 名詞-一般, isPartOfSpeech = true
2016-08-07 19:27:18.265  INFO 24513 --- [onPool-worker-3] o.l.infinispan.wordcount.WordCount       : [ForkJoinPool.commonPool-worker-3] -Filter- token = token = 庭, partOfSpeech = 名詞-一般, isPartOfSpeech = true
2016-08-07 19:27:18.265  INFO 24513 --- [nio-8080-exec-3] o.l.infinispan.wordcount.WordCount       : [http-nio-8080-exec-3] -Identity- word = お湯
2016-08-07 19:27:18.265  INFO 24513 --- [onPool-worker-7] o.l.infinispan.wordcount.WordCount       : [ForkJoinPool.commonPool-worker-7] -Filter- token = token = やる, partOfSpeech = 動詞-自立, isPartOfSpeech = false
2016-08-07 19:27:18.265  INFO 24513 --- [onPool-worker-0] o.l.infinispan.wordcount.WordCount       : [ForkJoinPool.commonPool-worker-0] -Map- token = token = 金, partOfSpeech = 名詞-固有名詞-人名-姓
2016-08-07 19:27:18.265  INFO 24513 --- [onPool-worker-2] o.l.infinispan.wordcount.WordCount       : [ForkJoinPool.commonPool-worker-2] -Map- token = token = 菓子, partOfSpeech = 名詞-一般
2016-08-07 19:27:18.265  INFO 24513 --- [onPool-worker-3] o.l.infinispan.wordcount.WordCount       : [ForkJoinPool.commonPool-worker-3] -Map- token = token = 庭, partOfSpeech = 名詞-一般
2016-08-07 19:27:18.265  INFO 24513 --- [nio-8080-exec-3] o.l.infinispan.wordcount.WordCount       : [http-nio-8080-exec-3] -Reduce-Map- お湯
2016-08-07 19:27:18.265  INFO 24513 --- [onPool-worker-7] o.l.infinispan.wordcount.WordCount       : [ForkJoinPool.commonPool-worker-7] -Filter- token = token = 二, partOfSpeech = 名詞-数, isPartOfSpeech = true
2016-08-07 19:27:18.265  INFO 24513 --- [onPool-worker-0] o.l.infinispan.wordcount.WordCount       : [ForkJoinPool.commonPool-worker-0] -Identity- word = 金
2016-08-07 19:27:18.265  INFO 24513 --- [onPool-worker-2] o.l.infinispan.wordcount.WordCount       : [ForkJoinPool.commonPool-worker-2] -Identity- word = 菓子
2016-08-07 19:27:18.265  INFO 24513 --- [onPool-worker-3] o.l.infinispan.wordcount.WordCount       : [ForkJoinPool.commonPool-worker-3] -Identity- word = 庭
2016-08-07 19:27:18.265  INFO 24513 --- [nio-8080-exec-3] o.l.infinispan.wordcount.WordCount       : [http-nio-8080-exec-3] -Reduce-Op- 0, 1
2016-08-07 19:27:18.265  INFO 24513 --- [onPool-worker-7] o.l.infinispan.wordcount.WordCount       : [ForkJoinPool.commonPool-worker-7] -Map- token = token = 二, partOfSpeech = 名詞-数
2016-08-07 19:27:18.265  INFO 24513 --- [onPool-worker-0] o.l.infinispan.wordcount.WordCount       : [ForkJoinPool.commonPool-worker-0] -Reduce-Map- 金
2016-08-07 19:27:18.265  INFO 24513 --- [onPool-worker-2] o.l.infinispan.wordcount.WordCount       : [ForkJoinPool.commonPool-worker-2] -Reduce-Map- 菓子
2016-08-07 19:27:18.265  INFO 24513 --- [onPool-worker-3] o.l.infinispan.wordcount.WordCount       : [ForkJoinPool.commonPool-worker-3] -Reduce-Map- 庭
2016-08-07 19:27:18.265  INFO 24513 --- [nio-8080-exec-3] o.l.infinispan.wordcount.WordCount       : [http-nio-8080-exec-3] -Filter- token = token = さあ, partOfSpeech = 感動詞, isPartOfSpeech = false
2016-08-07 19:27:18.266  INFO 24513 --- [onPool-worker-7] o.l.infinispan.wordcount.WordCount       : [ForkJoinPool.commonPool-worker-7] -Identity- word = 二
2016-08-07 19:27:18.266  INFO 24513 --- [onPool-worker-0] o.l.infinispan.wordcount.WordCount       : [ForkJoinPool.commonPool-worker-0] -Reduce-Op- 4, 1
2016-08-07 19:27:18.266  INFO 24513 --- [onPool-worker-2] o.l.infinispan.wordcount.WordCount       : [ForkJoinPool.commonPool-worker-2] -Reduce-Op- 1, 1
2016-08-07 19:27:18.266  INFO 24513 --- [onPool-worker-3] o.l.infinispan.wordcount.WordCount       : [ForkJoinPool.commonPool-worker-3] -Reduce-Op- 2, 1

こういう感じの、3 Nodeそれぞれでわらわらと出力されます。

とりあえず、Nodeを追加すると性能が変わるところは確認できましたね。

オチ)
実はこの結果はログ出力している部分がけっこうネックになっていて、ログ出力を削るとこういう結果になります。

## 1 Node
$ time curl http://localhost:8080/wordcount
[{"おれ":469},{"事":282},{"人":193},{"君":182},{"赤シャツ":168},{"山嵐":155},{"一":152},{"何":143},{"二":115},{"方":112}]
real	0m0.137s
user	0m0.003s
sys	0m0.003s


## 2 Node
$ time curl http://localhost:8080/wordcount
[{"おれ":469},{"事":282},{"人":193},{"君":182},{"赤シャツ":168},{"山嵐":155},{"一":152},{"何":143},{"二":115},{"方":112}]
real	0m0.172s
user	0m0.008s
sys	0m0.001s


## 3 Node
$ time curl http://localhost:8080/wordcount
[{"おれ":469},{"事":282},{"人":193},{"君":182},{"赤シャツ":168},{"山嵐":155},{"一":152},{"何":143},{"二":115},{"方":112}]
real	0m0.069s
user	0m0.007s
sys	0m0.000s

ホントは、さっきみたいな時間はかからない処理でした!

テストコード

最後に、テストコードも書いておいたので載せておきます。
src/test/java/org/littlewings/infinispan/wordcount/WordCountTest.java

package org.littlewings.infinispan.wordcount;

import java.util.List;
import java.util.Map;

import org.assertj.core.api.IntegerAssert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.web.client.TestRestTemplate;
import org.springframework.test.context.junit4.SpringRunner;

import static org.assertj.core.api.Assertions.assertThat;

@RunWith(SpringRunner.class)
@SpringBootTest(classes = App.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class WordCountTest {
    @Autowired
    TestRestTemplate restTemplate;

    @Test
    public void wordcount10() {
        String loadResult = restTemplate.getForObject("/load", String.class);
        assertThat(loadResult).isEqualTo("Finish!!");

        @SuppressWarnings("unchecked")
        List<Map<String, Integer>> wordcount =
                restTemplate.getForObject("/wordcount", List.class);

        assertThat(wordcount)
                .hasSize(10);

        assertThat(wordcount.get(0).get("おれ")).isEqualTo(469);
        assertThat(wordcount.get(1).get("事")).isEqualTo(282);
        assertThat(wordcount.get(2).get("人")).isEqualTo(193);
        assertThat(wordcount.get(3).get("君")).isEqualTo(182);
        assertThat(wordcount.get(4).get("赤シャツ")).isEqualTo(168);
        assertThat(wordcount.get(5).get("山嵐")).isEqualTo(155);
        assertThat(wordcount.get(6).get("一")).isEqualTo(152);
        assertThat(wordcount.get(7).get("何")).isEqualTo(143);
        assertThat(wordcount.get(8).get("二")).isEqualTo(115);
        assertThat(wordcount.get(9).get("方")).isEqualTo(112);
    }

    @Test
    public void wordcount5() {
        String loadResult = restTemplate.getForObject("/load", String.class);
        assertThat(loadResult).isEqualTo("Finish!!");

        @SuppressWarnings("unchecked")
        List<Map<String, Integer>> wordcount =
                restTemplate.getForObject("/wordcount?limit=5", List.class);

        assertThat(wordcount)
                .hasSize(5);

        assertThat(wordcount.get(0).get("おれ")).isEqualTo(469);
        assertThat(wordcount.get(1).get("事")).isEqualTo(282);
        assertThat(wordcount.get(2).get("人")).isEqualTo(193);
        assertThat(wordcount.get(3).get("君")).isEqualTo(182);
        assertThat(wordcount.get(4).get("赤シャツ")).isEqualTo(168);
    }
}

こちらの内容を、がっつり利用。

Testing improvements in Spring Boot 1.4

まとめ

元の記事を参考に、自分で気になるところを確認しつつ試してみました。特にシリアライズまわりが気になって始めたのですが、ある種思ったとおりの結果になりました。

まあ、目的は達成したのでいいかなぁと。

今回作成したコードは、こちらに置いています。
https://github.com/kazuhira-r/infinispan-getting-started/tree/master/embedded-distributed-streams-wordcount