最近、こういう記事があるのを見つけました。
Red Hat JBoss Data Grid 7 + JBoss BRMSで始めるリアルタイムビッグデータ | RED HAT OPENEYE -レッドハットの情報ポータル
Red Hat JBoss Data Grid 7 + JBoss BRMSで始めるリアルタイムビッグデータ | RED HAT OPENEYE -レッドハットの情報ポータル | Page 2
Infinispan…というより、JBoss Data Grid 7.0(のうち、Distributed Streams APIを使ったWordCount)の紹介となっていますが、見ていてちょっと気になるところなどあったので、自分でもちょっと試してみようかなと思って、少しアレンジしつつ書いてみました。
基本的にやることは変わりませんが、実行環境やライブラリは以下のように変更します。
- ライブラリ:JBoss Data Grid 7.0 → Infinispan 8.2.3.Final
- 実行環境:JBoss EAP → Spring Boot
- 形態素解析器:Atilika Kuromoji → Apache Lucene Kuromoji + mecab-ipadic-NEologd(https://github.com/codelibs/elasticsearch-analysis-kuromoji-neologd#use-lucene-kuromoji-for-neologd)
- WordCountする対象の文書:夏目漱石「こころ」 → 夏目漱石「坊ちゃん」(http://www.aozora.gr.jp/cards/000148/files/752_14964.html)
やりたいこと自体は元の記事と同じで、対象の文書から名詞だけを抜き出してWordCountし、上位N個を抜き出します。ただ、元の記事のようにスケジュール起動ではなく、HTTPリクエストに応じてWordCountするものとします。Cacheの有効期限も特に設けません。
では、やってみましょう。
準備
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.littlewings</groupId> <artifactId>embedded-distributed-streams-wordcount</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>embedded-distributed-streams-wordcount</name> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <java.version>1.8</java.version> <spring.boot.version>1.4.0.RELEASE</spring.boot.version> </properties> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-dependencies</artifactId> <version>${spring.boot.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.infinispan</groupId> <artifactId>infinispan-core</artifactId> <version>8.2.3.Final</version> </dependency> <dependency> <groupId>org.codelibs</groupId> <artifactId>lucene-analyzers-kuromoji-ipadic-neologd</artifactId> <version>6.1.0-20160714</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <version>${spring.boot.version}</version> <executions> <execution> <goals> <goal>repackage</goal> </goals> </execution> </executions> </plugin> </plugins> </build> <repositories> <repository> <id>codelibs.org</id> <name>CodeLibs Repository</name> <url>http://maven.codelibs.org/</url> </repository> </repositories> </project>
初、Spring Boot 1.4です!
あとは、Infinispanのコアライブラリと、CodeLibsの提供するApache Lucene Kuromoji+mecab-ipadic-NEologdを足します。
Infinispanの設定
Infinispanの設定は、Distributed Cacheをひとつ用意します。
src/main/resources/infinispan.xml
<?xml version="1.0" encoding="UTF-8"?> <infinispan xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:infinispan:config:8.2 http://www.infinispan.org/schemas/infinispan-config-8.2.xsd" xmlns="urn:infinispan:config:8.2"> <jgroups> <stack-file name="udp" path="default-configs/default-jgroups-udp.xml"/> </jgroups> <cache-container default-cache="wordsCache"> <transport cluster="cluster" stack="udp"/> <distributed-cache name="wordsCache"/> </cache-container> </infinispan>
形態素解析対象の文書
こちらのページの内容を、テキストファイルとして配置します。
$ head -n 2 src/main/resources/bocchan.txt 親譲りの無鉄砲で小供の時から損ばかりしている。小学校に居る時分学校の二階から飛び降りて一週間ほど腰を抜かした事がある。なぜそんな無闇をしたと聞く人があるかも知れぬ。別段深い理由でもない。新築の二階から首を出していたら、同級生の一人が冗談に、いくら威張っても、そこから飛び降りる事は出来まい。弱虫やーい。と囃したからである。小使に負ぶさって帰って来た時、おやじが大きな眼をして二階ぐらいから飛び降りて腰を抜かす奴があるかと云ったから、この次は抜かさずに飛んで見せますと答えた。 親類のものから西洋製のナイフを貰って奇麗な刃を日に翳して、友達に見せていたら、一人が光る事は光るが切れそうもないと云った。切れぬ事があるか、何でも切ってみせると受け合った。そんなら君の指を切ってみろと注文したから、何だ指ぐらいこの通りだと右の手の親指の甲をはすに切り込んだ。幸ナイフが小さいのと、親指の骨が堅かったので、今だに親指は手に付いている。しかし創痕は死ぬまで消えぬ。 〜省略〜
これを読み込んで、InfinispanのCacheに加えるコードを書いて、WordCountすることになります。
アプリケーションを書く
それでは、アプリケーションを書いていきます。
まずはアプリケーションのエントリポイント。
src/main/java/org/littlewings/infinispan/wordcount/App.java
package org.littlewings.infinispan.wordcount; import java.io.IOException; import org.infinispan.manager.DefaultCacheManager; import org.infinispan.manager.EmbeddedCacheManager; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; @SpringBootApplication public class App { public static void main(String... args) { SpringApplication.run(App.class, args); } @Bean(destroyMethod = "stop") public EmbeddedCacheManager embeddedCacheManager() throws IOException { EmbeddedCacheManager cacheManager = new DefaultCacheManager("infinispan.xml"); cacheManager.startCaches(cacheManager.getCacheNames().toArray(new String[cacheManager.getCacheNames().size()])); return cacheManager; } }
InfinispanのEmbeddedCacheManagerを作成、Cacheを開始しつつBean定義をします。
また、Atilika Kuromojiの時はTokenという単語や品詞を持ったクラスが存在していたのですが、Apache Lucene Kuromojiを使う場合はそういったものがないので、今回は自作します。
src/main/java/org/littlewings/infinispan/wordcount/Token.java
package org.littlewings.infinispan.wordcount; import java.io.Serializable; public class Token implements Serializable { private static final long serialVersionUID = 1L; String value; String partOfSpeech; public Token(String value, String partOfSpeech) { this.value = value; this.partOfSpeech = partOfSpeech; } public String getValue() { return value; } public String getPartOfSpeech() { return partOfSpeech; } @Override public String toString() { return "token = " + value + ", partOfSpeech = " + partOfSpeech; } }
では、WordCountするControllerを書きます。RestControllerとして作成します。
まずは大枠から。
src/main/java/org/littlewings/infinispan/wordcount/WordCount.java
package org.littlewings.infinispan.wordcount; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.Serializable; import java.io.UncheckedIOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.lucene.analysis.TokenStream; import org.apache.lucene.analysis.tokenattributes.CharTermAttribute; import org.codelibs.neologd.ipadic.lucene.analysis.ja.JapaneseAnalyzer; import org.codelibs.neologd.ipadic.lucene.analysis.ja.JapaneseTokenizer; import org.codelibs.neologd.ipadic.lucene.analysis.ja.tokenattributes.PartOfSpeechAttribute; import org.infinispan.Cache; import org.infinispan.CacheStream; import org.infinispan.manager.EmbeddedCacheManager; import org.infinispan.stream.CacheCollectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; @RestController public class WordCount implements Serializable { // Logback's Logger is Serializable Logger logger = LoggerFactory.getLogger(getClass()); @Autowired transient EmbeddedCacheManager cacheManager; static final JapaneseAnalyzer analyzer = new JapaneseAnalyzer(null, JapaneseTokenizer.Mode.NORMAL, JapaneseAnalyzer.getDefaultStopSet(), JapaneseAnalyzer.getDefaultStopTags()); // 後で }
Distributed Streams APIでLambdaを使って書いていく関係上、外側のクラスがSerializableになっています。
public class WordCount implements Serializable {
また、SerializableではないEmbeddedCacheManagerやLuceneのJapaneseAnalyzerは、それぞれtransientやstaticな変数としてこちらもシリアライズの対象外としています。
@Autowired transient EmbeddedCacheManager cacheManager; static final JapaneseAnalyzer analyzer = new JapaneseAnalyzer(null, JapaneseTokenizer.Mode.NORMAL, JapaneseAnalyzer.getDefaultStopSet(), JapaneseAnalyzer.getDefaultStopTags());
LogbackのLoggerはSerializableを実装していたので、問題ありませんでした。
というか、これらの対処をしないと分散実行時にシリアライズ関係のエラーになります…。
データロードを行う箇所。リクエストに応じて「坊ちゃん」のテキストを読み込むようにします。
@GetMapping("load") public String load() { Cache<Integer, String> wordsCache = cacheManager.getCache("wordsCache"); logger.info("start load words"); try (InputStream is = Thread.currentThread().getContextClassLoader().getResourceAsStream("bocchan.txt"); InputStreamReader isr = new InputStreamReader(is, StandardCharsets.UTF_8); BufferedReader reader = new BufferedReader(isr); Stream<String> lines = reader.lines()) { AtomicInteger currentLines = new AtomicInteger(); lines.forEach(line -> wordsCache.put(currentLines.incrementAndGet(), line)); } catch (IOException e) { throw new UncheckedIOException(e); } logger.info("end load words"); return "Finish!!"; }
そして、WordCountする箇所。基本的には、元の記事とほぼ同じような処理です。Luceneを使っていたり、ログ出力の関係上、ちょっと長めになっていますが…。
@GetMapping("wordcount") public List<Map.Entry<String, Integer>> wordcount(@RequestParam(name = "limit", defaultValue = "10") int limit) { Cache<Integer, String> wordsCache = cacheManager.getCache("wordsCache"); try (CacheStream<String> stream = wordsCache.values().parallelStream()) { logger.info("start wordcount"); Map<String, Integer> collected = stream .timeout(30, TimeUnit.SECONDS) // timeout .flatMap((Function<String, Stream<Token>> & Serializable) line -> { List<Token> tokens = new ArrayList<>(); try (TokenStream tokenStream = analyzer.tokenStream("", line)) { CharTermAttribute charTermAttribute = tokenStream.addAttribute(CharTermAttribute.class); PartOfSpeechAttribute partOfSpeechAttribute = tokenStream.addAttribute(PartOfSpeechAttribute.class); tokenStream.reset(); while (tokenStream.incrementToken()) { tokens.add(new Token(charTermAttribute.toString(), partOfSpeechAttribute.getPartOfSpeech())); tokenStream.end(); } } catch (IOException e) { throw new UncheckedIOException(e); } logger.info("[{}] -FlatMap- line = {}, tokens = {}", Thread.currentThread().getName(), line, tokens); return tokens.stream(); }) .filter((Predicate<Token> & Serializable) token -> { logger.info("[{}] -Filter- token = {}, isPartOfSpeech = {}", Thread.currentThread().getName(), token, token.getPartOfSpeech().contains("名詞")); return token.getPartOfSpeech().contains("名詞"); }) .map((Function<Token, String> & Serializable) token -> { logger.info("[{}] -Map- token = {}", Thread.currentThread().getName(), token); return token.getValue(); }).collect( CacheCollectors.serializableCollector(() -> Collectors.groupingByConcurrent(s -> { logger.info("[{}] -Identity- word = {}", Thread.currentThread().getName(), s); return s; }, ConcurrentHashMap::new, Collectors.reducing(0, s -> { logger.info("[{}] -Reduce-Map- {}", Thread.currentThread().getName(), s); return 1; }, (c1, c2) -> { logger.info("[{}] -Reduce-Op- {}, {}", Thread.currentThread().getName(), c1, c2); return c1 + c2; })))); logger.info("end wordcount"); logger.info("exract top - {}", limit); return collected .entrySet() .stream() .sorted(Map.Entry.<String, Integer>comparingByValue().reversed()) .limit(limit) .collect(Collectors.toList()); } catch (Exception e) { logger.error("wordcount, fail", e); throw e; } }
ParallelStreamを使っているので、分散処理+各Nodeでの並列処理になります。
try (CacheStream<String> stream = wordsCache.values().parallelStream()) {
その様子は、ログ出力して確認するようにしています。
また、元の記事に比べて交差型キャストが入っていて、ちょっとうるさくなっているのですが
.flatMap((Function<String, Stream<Token>> & Serializable) line -> { .filter((Predicate<Token> & Serializable) token -> { .map((Function<Token, String> & Serializable) token -> {
これは、JBoss Data Grid 7ではCacheStreamの操作にSerializableなFunctionなどを受け取るメソッドが追加されているからです。
Infinispan 8.2の時点ではまだ入っていないので、9.0になるまではこういう記述が必要になります。
というか、商用版の方が機能が先に行っている…!
また、このデバッグ状態だとタイムアウトをちょっと伸ばしておかないと分散実行時にエラーになってしまうので、設定を入れています。
.timeout(30, TimeUnit.SECONDS) // timeout
動作確認
それでは、動かしてみましょう。
パッケージング。
$ mvn package -DskipTests=true
起動。
$ java -jar target/embedded-distributed-streams-wordcount-0.0.1-SNAPSHOT.jar
データロード。
$ curl http://localhost:8080/load
Finish!!
それでは、WordCountしてみます。
$ time curl http://localhost:8080/wordcount [{"おれ":469},{"事":282},{"人":193},{"君":182},{"赤シャツ":168},{"山嵐":155},{"一":152},{"何":143},{"二":115},{"方":112}] real 0m33.293s user 0m0.009s sys 0m0.000s
だいたい30秒ほどかかりました。
ひとつ、Nodeを追加してみます。起動ポートはずらしています。
$ java -jar target/embedded-distributed-streams-wordcount-0.0.1-SNAPSHOT.jar --server.port=8180
これで、2 Nodeです。
リバランスも完了したところで
2016-08-07 19:22:16.712 INFO 24513 --- [e-thread--p2-t1] org.infinispan.CLUSTER : [Context=wordsCache][Scope=xxxxx-18204]ISPN100002: Started local rebalance 2016-08-07 19:22:16.732 INFO 24513 --- [t-thread--p4-t7] org.infinispan.CLUSTER : [Context=wordsCache][Scope=xxxxx-18204]ISPN100003: Finished local rebalance 2016-08-07 19:22:16.999 INFO 24513 --- [e-thread--p2-t1] org.infinispan.CLUSTER : [Context=wordsCache][Scope=xxxxx-63104]ISPN100003: Finished local rebalance
再度WordCount。
$ time curl http://localhost:8080/wordcount [{"おれ":469},{"事":282},{"人":193},{"君":182},{"赤シャツ":168},{"山嵐":155},{"一":152},{"何":143},{"二":115},{"方":112}] real 0m5.213s user 0m0.004s sys 0m0.005s
なんか、めちゃくちゃ速くなりました…。
さらにNode追加。
$ java -jar target/embedded-distributed-streams-wordcount-0.0.1-SNAPSHOT.jar --server.port=8280
リバランスが完了したのを見て
2016-08-07 19:25:14.367 INFO 24513 --- [e-thread--p2-t1] org.infinispan.CLUSTER : [Context=wordsCache][Scope=xxxxx-18204]ISPN100002: Started local rebalance 2016-08-07 19:25:14.369 INFO 24513 --- [t-thread--p4-t1] org.infinispan.CLUSTER : [Context=wordsCache][Scope=xxxxx-18204]ISPN100003: Finished local rebalance 2016-08-07 19:25:14.378 INFO 24513 --- [e-thread--p2-t1] org.infinispan.CLUSTER : [Context=wordsCache][Scope=xxxxx-63104]ISPN100003: Finished local rebalance 2016-08-07 19:25:14.616 INFO 24513 --- [e-thread--p2-t1] org.infinispan.CLUSTER : [Context=wordsCache][Scope=xxxxx-32030]ISPN100003: Finished local rebalance 2016-08-07 19:25:14.616 INFO 24513 --- [e-thread--p2-t1] org.infinispan.CLUSTER : ISPN000336: Finished cluster-wide rebalance for cache wordsCache, topology id = 7
WordCount。
$ time curl http://localhost:8080/wordcount [{"おれ":469},{"事":282},{"人":193},{"君":182},{"赤シャツ":168},{"山嵐":155},{"一":152},{"何":143},{"二":115},{"方":112}] real 0m4.077s user 0m0.008s sys 0m0.016s
実行ごとに多少結果がブレていたりしましたが、今回はそこまで速くはなりませんでした。まあ、PC1台だし…。
なお、実行の様子はこういうログで確認できます。
※かなり大量に出力されます
2016-08-07 19:27:18.265 INFO 24513 --- [onPool-worker-3] o.l.infinispan.wordcount.WordCount : [ForkJoinPool.commonPool-worker-3] -FlatMap- line = 庭は十坪ほどの平庭で、これという植木もない。ただ一本の蜜柑があって、塀のそとから、目標になるほど高い。おれはうちへ帰ると、いつでもこの蜜柑を眺める。東京を出た事のないものには蜜柑の生っているところはすこぶる珍しいものだ。あの青い実がだんだん熟してきて、黄色になるんだろうが、定めて奇麗だろう。今でももう半分色の変ったのがある。婆さんに聞いてみると、すこぶる水気の多い、旨い蜜柑だそうだ。今に熟たら、たんと召し上がれと云ったから、毎日少しずつ食ってやろう。もう三週間もしたら、充分食えるだろう。まさか三週間以内にここを去る事もなかろう。, tokens = [token = 庭, partOfSpeech = 名詞-一般, token = 十, partOfSpeech = 名詞-数, token = 坪, partOfSpeech = 名詞-接尾-助数詞, token = 平, partOfSpeech = 接頭詞-名詞接続, token = 庭, partOfSpeech = 名詞-一般, token = 植木, partOfSpeech = 名詞-一般, token = ただ, partOfSpeech = 名詞-一般, token = 一, partOfSpeech = 名詞-数, token = 本, partOfSpeech = 名詞-接尾-助数詞, token = 蜜柑, partOfSpeech = 名詞-一般, token = 塀, partOfSpeech = 名詞-一般, token = そ, partOfSpeech = 名詞-特殊-助動詞語幹, token = 目標, partOfSpeech = 名詞-一般, token = 高い, partOfSpeech = 形容詞-自立, token = おれ, partOfSpeech = 名詞-代名詞-一般, token = 帰る, partOfSpeech = 動詞-自立, token = いつ, partOfSpeech = 名詞-代名詞-一般, token = 蜜柑, partOfSpeech = 名詞-一般, token = 眺める, partOfSpeech = 動詞-自立, token = 東京, partOfSpeech = 名詞-固有名詞-地域-一般, token = 出る, partOfSpeech = 動詞-自立, token = 事, partOfSpeech = 名詞-非自立-一般, token = 蜜柑, partOfSpeech = 名詞-一般, token = 生る, partOfSpeech = 動詞-自立, token = すこぶる, partOfSpeech = 副詞-一般, token = 珍しい, partOfSpeech = 形容詞-自立, token = あの, partOfSpeech = 連体詞, token = 青い, partOfSpeech = 形容詞-自立, token = 実, partOfSpeech = 名詞-一般, token = だんだん, partOfSpeech = 副詞-助詞類接続, token = 熟す, partOfSpeech = 動詞-自立, token = くる, partOfSpeech = 動詞-非自立, token = 黄色, partOfSpeech = 名詞-一般, token = 定める, partOfSpeech = 動詞-自立, token = 奇麗, partOfSpeech = 名詞-形容動詞語幹, token = 今, partOfSpeech = 名詞-副詞可能, token = もう半分, partOfSpeech = 名詞-固有名詞-一般, token = 色, partOfSpeech = 名詞-接尾-一般, token = 変る, partOfSpeech = 動詞-自立, token = 婆さん, partOfSpeech = 名詞-一般, token = 聞く, partOfSpeech = 動詞-自立, token = みる, partOfSpeech = 動詞-非自立, token = すこぶる, partOfSpeech = 副詞-一般, token = 水気, partOfSpeech = 名詞-一般, token = 多い, partOfSpeech = 形容詞-自立, token = 旨い, partOfSpeech = 形容詞-自立, token = 蜜柑, partOfSpeech = 名詞-一般, token = そう, partOfSpeech = 名詞-特殊-助動詞語幹, token = 今, partOfSpeech = 名詞-副詞可能, token = 熟, partOfSpeech = 名詞-サ変接続, token = たんと, partOfSpeech = 副詞-一般, token = 召し上がる, partOfSpeech = 動詞-自立, token = 云う, partOfSpeech = 動詞-自立, token = 毎日, partOfSpeech = 名詞-副詞可能, token = 少し, partOfSpeech = 副詞-助詞類接続, token = 食う, partOfSpeech = 動詞-自立, token = やる, partOfSpeech = 動詞-非自立, token = もう, partOfSpeech = 副詞-一般, token = 三, partOfSpeech = 名詞-数, token = 週間, partOfSpeech = 名詞-接尾-助数詞, token = 充分, partOfSpeech = 副詞-一般, token = 食える, partOfSpeech = 動詞-自立, token = まさか, partOfSpeech = 副詞-一般, token = 三, partOfSpeech = 名詞-数, token = 週間, partOfSpeech = 名詞-接尾-助数詞, token = 以内, partOfSpeech = 名詞-非自立-副詞可能, token = 去る, partOfSpeech = 動詞-自立, token = 事, partOfSpeech = 名詞-非自立-一般] 2016-08-07 19:27:18.265 INFO 24513 --- [nio-8080-exec-3] o.l.infinispan.wordcount.WordCount : [http-nio-8080-exec-3] -Map- token = token = お湯, partOfSpeech = 名詞-一般 2016-08-07 19:27:18.265 INFO 24513 --- [onPool-worker-7] o.l.infinispan.wordcount.WordCount : [ForkJoinPool.commonPool-worker-7] -Reduce-Op- 0, 1 2016-08-07 19:27:18.265 INFO 24513 --- [onPool-worker-0] o.l.infinispan.wordcount.WordCount : [ForkJoinPool.commonPool-worker-0] -Filter- token = token = 金, partOfSpeech = 名詞-固有名詞-人名-姓, isPartOfSpeech = true 2016-08-07 19:27:18.265 INFO 24513 --- [onPool-worker-2] o.l.infinispan.wordcount.WordCount : [ForkJoinPool.commonPool-worker-2] -Filter- token = token = 菓子, partOfSpeech = 名詞-一般, isPartOfSpeech = true 2016-08-07 19:27:18.265 INFO 24513 --- [onPool-worker-3] o.l.infinispan.wordcount.WordCount : [ForkJoinPool.commonPool-worker-3] -Filter- token = token = 庭, partOfSpeech = 名詞-一般, isPartOfSpeech = true 2016-08-07 19:27:18.265 INFO 24513 --- [nio-8080-exec-3] o.l.infinispan.wordcount.WordCount : [http-nio-8080-exec-3] -Identity- word = お湯 2016-08-07 19:27:18.265 INFO 24513 --- [onPool-worker-7] o.l.infinispan.wordcount.WordCount : [ForkJoinPool.commonPool-worker-7] -Filter- token = token = やる, partOfSpeech = 動詞-自立, isPartOfSpeech = false 2016-08-07 19:27:18.265 INFO 24513 --- [onPool-worker-0] o.l.infinispan.wordcount.WordCount : [ForkJoinPool.commonPool-worker-0] -Map- token = token = 金, partOfSpeech = 名詞-固有名詞-人名-姓 2016-08-07 19:27:18.265 INFO 24513 --- [onPool-worker-2] o.l.infinispan.wordcount.WordCount : [ForkJoinPool.commonPool-worker-2] -Map- token = token = 菓子, partOfSpeech = 名詞-一般 2016-08-07 19:27:18.265 INFO 24513 --- [onPool-worker-3] o.l.infinispan.wordcount.WordCount : [ForkJoinPool.commonPool-worker-3] -Map- token = token = 庭, partOfSpeech = 名詞-一般 2016-08-07 19:27:18.265 INFO 24513 --- [nio-8080-exec-3] o.l.infinispan.wordcount.WordCount : [http-nio-8080-exec-3] -Reduce-Map- お湯 2016-08-07 19:27:18.265 INFO 24513 --- [onPool-worker-7] o.l.infinispan.wordcount.WordCount : [ForkJoinPool.commonPool-worker-7] -Filter- token = token = 二, partOfSpeech = 名詞-数, isPartOfSpeech = true 2016-08-07 19:27:18.265 INFO 24513 --- [onPool-worker-0] o.l.infinispan.wordcount.WordCount : [ForkJoinPool.commonPool-worker-0] -Identity- word = 金 2016-08-07 19:27:18.265 INFO 24513 --- [onPool-worker-2] o.l.infinispan.wordcount.WordCount : [ForkJoinPool.commonPool-worker-2] -Identity- word = 菓子 2016-08-07 19:27:18.265 INFO 24513 --- [onPool-worker-3] o.l.infinispan.wordcount.WordCount : [ForkJoinPool.commonPool-worker-3] -Identity- word = 庭 2016-08-07 19:27:18.265 INFO 24513 --- [nio-8080-exec-3] o.l.infinispan.wordcount.WordCount : [http-nio-8080-exec-3] -Reduce-Op- 0, 1 2016-08-07 19:27:18.265 INFO 24513 --- [onPool-worker-7] o.l.infinispan.wordcount.WordCount : [ForkJoinPool.commonPool-worker-7] -Map- token = token = 二, partOfSpeech = 名詞-数 2016-08-07 19:27:18.265 INFO 24513 --- [onPool-worker-0] o.l.infinispan.wordcount.WordCount : [ForkJoinPool.commonPool-worker-0] -Reduce-Map- 金 2016-08-07 19:27:18.265 INFO 24513 --- [onPool-worker-2] o.l.infinispan.wordcount.WordCount : [ForkJoinPool.commonPool-worker-2] -Reduce-Map- 菓子 2016-08-07 19:27:18.265 INFO 24513 --- [onPool-worker-3] o.l.infinispan.wordcount.WordCount : [ForkJoinPool.commonPool-worker-3] -Reduce-Map- 庭 2016-08-07 19:27:18.265 INFO 24513 --- [nio-8080-exec-3] o.l.infinispan.wordcount.WordCount : [http-nio-8080-exec-3] -Filter- token = token = さあ, partOfSpeech = 感動詞, isPartOfSpeech = false 2016-08-07 19:27:18.266 INFO 24513 --- [onPool-worker-7] o.l.infinispan.wordcount.WordCount : [ForkJoinPool.commonPool-worker-7] -Identity- word = 二 2016-08-07 19:27:18.266 INFO 24513 --- [onPool-worker-0] o.l.infinispan.wordcount.WordCount : [ForkJoinPool.commonPool-worker-0] -Reduce-Op- 4, 1 2016-08-07 19:27:18.266 INFO 24513 --- [onPool-worker-2] o.l.infinispan.wordcount.WordCount : [ForkJoinPool.commonPool-worker-2] -Reduce-Op- 1, 1 2016-08-07 19:27:18.266 INFO 24513 --- [onPool-worker-3] o.l.infinispan.wordcount.WordCount : [ForkJoinPool.commonPool-worker-3] -Reduce-Op- 2, 1
こういう感じの、3 Nodeそれぞれでわらわらと出力されます。
とりあえず、Nodeを追加すると性能が変わるところは確認できましたね。
オチ)
実はこの結果はログ出力している部分がけっこうネックになっていて、ログ出力を削るとこういう結果になります。
## 1 Node $ time curl http://localhost:8080/wordcount [{"おれ":469},{"事":282},{"人":193},{"君":182},{"赤シャツ":168},{"山嵐":155},{"一":152},{"何":143},{"二":115},{"方":112}] real 0m0.137s user 0m0.003s sys 0m0.003s ## 2 Node $ time curl http://localhost:8080/wordcount [{"おれ":469},{"事":282},{"人":193},{"君":182},{"赤シャツ":168},{"山嵐":155},{"一":152},{"何":143},{"二":115},{"方":112}] real 0m0.172s user 0m0.008s sys 0m0.001s ## 3 Node $ time curl http://localhost:8080/wordcount [{"おれ":469},{"事":282},{"人":193},{"君":182},{"赤シャツ":168},{"山嵐":155},{"一":152},{"何":143},{"二":115},{"方":112}] real 0m0.069s user 0m0.007s sys 0m0.000s
ホントは、さっきみたいな時間はかからない処理でした!
テストコード
最後に、テストコードも書いておいたので載せておきます。
src/test/java/org/littlewings/infinispan/wordcount/WordCountTest.java
package org.littlewings.infinispan.wordcount; import java.util.List; import java.util.Map; import org.assertj.core.api.IntegerAssert; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.web.client.TestRestTemplate; import org.springframework.test.context.junit4.SpringRunner; import static org.assertj.core.api.Assertions.assertThat; @RunWith(SpringRunner.class) @SpringBootTest(classes = App.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) public class WordCountTest { @Autowired TestRestTemplate restTemplate; @Test public void wordcount10() { String loadResult = restTemplate.getForObject("/load", String.class); assertThat(loadResult).isEqualTo("Finish!!"); @SuppressWarnings("unchecked") List<Map<String, Integer>> wordcount = restTemplate.getForObject("/wordcount", List.class); assertThat(wordcount) .hasSize(10); assertThat(wordcount.get(0).get("おれ")).isEqualTo(469); assertThat(wordcount.get(1).get("事")).isEqualTo(282); assertThat(wordcount.get(2).get("人")).isEqualTo(193); assertThat(wordcount.get(3).get("君")).isEqualTo(182); assertThat(wordcount.get(4).get("赤シャツ")).isEqualTo(168); assertThat(wordcount.get(5).get("山嵐")).isEqualTo(155); assertThat(wordcount.get(6).get("一")).isEqualTo(152); assertThat(wordcount.get(7).get("何")).isEqualTo(143); assertThat(wordcount.get(8).get("二")).isEqualTo(115); assertThat(wordcount.get(9).get("方")).isEqualTo(112); } @Test public void wordcount5() { String loadResult = restTemplate.getForObject("/load", String.class); assertThat(loadResult).isEqualTo("Finish!!"); @SuppressWarnings("unchecked") List<Map<String, Integer>> wordcount = restTemplate.getForObject("/wordcount?limit=5", List.class); assertThat(wordcount) .hasSize(5); assertThat(wordcount.get(0).get("おれ")).isEqualTo(469); assertThat(wordcount.get(1).get("事")).isEqualTo(282); assertThat(wordcount.get(2).get("人")).isEqualTo(193); assertThat(wordcount.get(3).get("君")).isEqualTo(182); assertThat(wordcount.get(4).get("赤シャツ")).isEqualTo(168); } }
こちらの内容を、がっつり利用。
まとめ
元の記事を参考に、自分で気になるところを確認しつつ試してみました。特にシリアライズまわりが気になって始めたのですが、ある種思ったとおりの結果になりました。
まあ、目的は達成したのでいいかなぁと。
今回作成したコードは、こちらに置いています。
https://github.com/kazuhira-r/infinispan-getting-started/tree/master/embedded-distributed-streams-wordcount