CLOVER🍀

That was when it all began.

Infinispan Map Reduce Framework

Distributed Execution Frameworkに続いて、今度はMapReduceです。

Map Reduce Framework
https://docs.jboss.org/author/display/ISPN/Map+Reduce+framework

名前はGoogleやらHadoopやらで有名ですが、これをサポートしたプロダクトをあんまり触ったことがありません…。3年くらい前に、Hadoop StreamingでWordCountしたことがあるくらいかな…?

概要

オフィシャルドキュメントのIntroductionからすると、だいたいこんな感じみたいです。

  • MapReduceは、Data Grid上の巨大なデータを分散されていることを意識せずに、分散処理できることを可能にする
  • MapフェーズとReduceフェーズの、2つの異なる計算処理の考え方が名前の由来
  • Mapフェーズでは、Master Nodeが入力を受け取って、Grid上でMapフェーズを実行するために、分割して送信することでタスクを開始する
  • 各NodeでのMap関数の実行結果が、中間結果としてMaster Nodeに戻ってくる
  • Master NodeのタスクはMapフェーズの実行結果を全て集めて、中間結果のKeyで結合して、KeyとValueをGrid上でReductionフェーズを行うために送信する
  • 最後にMaster Nodeは全てのReductionフェーズの結果を受信して、MapReduceタスクの呼び出し元に最終的な結果を返却する

まあ、言っていることは普通にMapReduceの話だと思います…。Master Nodeというのは、MapReduceタスクを開始したNodeのことだと思いますが。

InfinispanのMapReduce APIでは、主なコンポーネントは4つだと言っています。

  • Mapper
  • Reducer
  • Collator
  • MapReduceTask

Mapper/Reducer/Collatorはそれぞれインターフェースで、以下の様に宣言されています。
*ついでに、Collectorも載せておきました

// KInは、CacheのKeyの型
// VInは、CacheのValueの型
// KOutは、Mapperの結果のKeyの型
// VOutは、Mapperの結果のValueの型
public interface Mapper<KIn, VIn, KOut, VOut> extends Serializable {
 
   /**
    * Invoked once for each input cache entry KIn,VOut pair.
    */
   void map(KIn key, VIn value, Collector<KOut, VOut> collector);
}

// KOutは、Mapperの結果のKeyの型
// VOutは、Mapperの結果のValueの型
public interface Reducer<KOut, VOut> extends Serializable {
 
   /**
    * Combines/reduces all intermediate values for a particular intermediate key to a single value.
    * <p>
    *
    */
   VOut reduce(KOut reducedKey, Iterator<VOut> iter);
}


public interface Collator<KOut,VOut,R> {
   /**
    * Collates all reduced results and returns R to invoker of distributed task.
    * <p>
    *
    */
    // Map<KOut, VOut>には、Reducerの処理結果をまとめたMapが渡ってくる
    R collate(Map<KOut,VOut> reducedResults);
}

public interface Collector<K,V> {
 
   /**
    * Intermediate key/value callback used by Mapper implementor
    * <p>
    *
    */
   void emit(K key, V value);
}

MapperはKey/Valueのペアを受け取り、処理の中間結果を再びKey/Valueの形でCollectorに渡します。Mapperのインスタンスは、InfinispanのNodeを移動していくそうですが…?たぶん、Nodeごとに送信すると思ってていい気がしますが。

Reducerは、Mapperが作成した中間結果のKeyと対するCollectionをIteratorという形で受け取ります。結果は、単一の戻り値として返却します。インスタンスは、分散実行する場合はNodeごとに作られるそうです。

Collatorは、Reducerの実行結果となったMapを、呼び出し元に返却する際の集約処理を行うことができます。要は、MapReduceの結果はMapで返ってくるので、その結果を好きに変換してねってことだと思います。

MapperにはCacheをCDIで注入できるそうですが、今回は飛ばします。

なお、5.2以前のInfinispanでは、Reduceフェーズは単一のMaster Nodeでしか実行できなかったそうです。
*Map Reduce Frameworkは、Infinispan 5.0からの機能
よって、Reduceフェーズは単一のInfinispanのメモリサイズに縛られていたらしいのですが、Infinispan 5.2からはその制限はなくなったとか。MapReduceTaskのコンストラクタで、この挙動は制御できます。小さなタスクを実行する時は、単一のNodeでReduceフェーズを実行することを推奨しているそうです。

それでは、使ってみましょう。

ひとりMapReduce

Scalaに書き換える程度で、他には何も考えずに
https://docs.jboss.org/author/display/ISPN/Map+Reduce+framework#MapReduceframework-Examples
に載っているサンプルを書いてみます。

まずは、単一のInfinispanでMapReduceです。

用意したもの。
build.sbt

name := "infinispan-mapreduce-example"

version := "0.0.1"

scalaVersion := "2.10.1"

organization := "littlewings"

fork in run := true

scalacOptions += "-deprecation"

resolvers += "JBoss Public Maven Repository Group" at "http://repository.jboss.org/nexus/content/groups/public-jboss/"

libraryDependencies += "org.infinispan" % "infinispan-core" % "5.2.1.Final"

src/main/resources/infinispan.xml

<?xml version="1.0" encoding="UTF-8"?>
<infinispan
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="urn:infinispan:config:5.2 http://www.infinispan.org/schemas/infinispan-config-5.2.xsd"
    xmlns="urn:infinispan:config:5.2">

  <global>
    <transport clusterName="mapReduceCluster">
      <properties>
        <property name="configurationFile" value="jgroups.xml" />
      </properties>
    </transport>
    <globalJmxStatistics
        enabled="true"
        jmxDomain="org.infinispan"
        cacheManagerName="DefaultCacheManager"
        />
  </global>

  <default>
    <jmxStatistics enabled="true"/>
    <clustering mode="distribution">
      <hash numOwners="1" />
      <sync />
    </clustering>
  </default>
</infinispan>

src/main/resources/jgroups.xml

<?xml version="1.0" encoding="UTF-8"?>
<config xmlns="urn:org:jgroups"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/JGroups-3.2.xsd">
  <UDP
      mcast_addr="${jgroups.udp.mcast_addr:228.11.11.11}"
      mcast_port="${jgroups.udp.mcast_port:45688}"
      tos="8"
      ucast_recv_buf_size="130000"
      ucast_send_buf_size="100000"
      mcast_recv_buf_size="130000"
      mcast_send_buf_size="100000"
      loopback="true"

      thread_naming_pattern="cl"

      thread_pool.enabled="true"
      thread_pool.min_threads="2"
      thread_pool.max_threads="8"
      thread_pool.keep_alive_time="5000"
      thread_pool.queue_enabled="true"
      thread_pool.queue_max_size="1000"
      thread_pool.rejection_policy="discard"

      oob_thread_pool.enabled="true"
      oob_thread_pool.min_threads="2"
      oob_thread_pool.max_threads="8"
      oob_thread_pool.keep_alive_time="1000"
      oob_thread_pool.queue_enabled="false"
      oob_thread_pool.rejection_policy="discard"
      />

  <PING />
  <FD_ALL />
  <FD_SOCK />
  <UNICAST2 />
  <pbcast.NAKACK2 />
  <pbcast.GMS print_local_addr="true" />
</config>

JGroupsの設定は、だいぶ絞り込みました。スッキリ。

infinispan.xmlで、MapReduceを使う時に重要な設定はココで、

    <clustering mode="distribution">
      <hash numOwners="1" />
      <sync />
    </clustering>

クラスタリングにするように設定して、なおかつ「distribution」(分散モード)でなければなりません。クラスタリングの設定がなかったり、モードをレプリケーションにしたりするとMapReduceは動作しません。

最初に設定なしでサンプルを書いた時に、いきなりこれにぶつかりました…。

それでは、サンプルプログラムを
src/main/scala/MapReduceExample.scala

import scala.collection.JavaConverters._

import org.infinispan.manager.DefaultCacheManager
import org.infinispan.distexec.mapreduce.{Collator, Collector, Mapper, MapReduceTask, Reducer}

object MapReduceExample {
  def main(args: Array[String]): Unit = {
    val manager = new DefaultCacheManager("infinispan.xml")
    val cache = manager.getCache[String, String]()

    cache.put("1", "Hello world here I am")
    cache.put("2", "Infinispan rules the world")
    cache.put("3", "JUDCon is in Boston")
    cache.put("4", "JBoss World is in Boston as well")
    cache.put("12","JBoss Application Server")
    cache.put("15", "Hello world")
    cache.put("14", "Infinispan community")
    cache.put("15", "Hello world")
    
    cache.put("111", "Infinispan open source")
    cache.put("112", "Boston is close to Toronto")
    cache.put("113", "Toronto is a capital of Ontario")
    cache.put("114", "JUDCon is cool")
    cache.put("211", "JBoss World is awesome")
    cache.put("212", "JBoss rules")
    cache.put("213", "JBoss division of RedHat ")
    cache.put("214", "RedHat community")

    val task = new MapReduceTask[String, String, String, Int](cache)
    val map =
      task.mappedWith(new WordCountMapper)
        .reducedWith(new WordCountReducer)
        .execute

    println("----- [MapReduceResult] START -----")
    map.asScala.foreach { case (k, v) => println(s"Key[$k] = $v") }
    println("----- [MapReduceResult] END -----")

    cache.stop()
    manager.stop()
  }
}

Mapper。

@SerialVersionUID(1L)
class WordCountMapper extends Mapper[String, String, String, Int] {
  def map(key: String, value: String, c: Collector[String, Int]): Unit = {
    println(s"[Mapper] input key => [$key], input value => [$value]")
    """\s+""".r.split(value).foreach(c.emit(_, 1))
  }
}

Reducer。

@SerialVersionUID(1L)
class WordCountReducer extends Reducer[String, Int] {
  def reduce(key: String, iter: java.util.Iterator[Int]): Int = {
    println(s"[Reducer] input key => [$key]")
    iter.asScala.sum
  }
}

Scalaに書き直している以外は、ほぼサンプルのままです。ただ、サンプルでは2つのCacheを宣言してデータをputしているのですが、どう見ても2つ目のインスタンスは使っていないので今回はまとめてしまいました…。

本家のサンプル通りに分けた場合は、1つ目のインスタンスに対してMapReduceを行うので、2つ目のCacheの内容はMapReduceの処理に影響しません。

あと、MapperとReducerにprintlnを入れています。

実行してみます。

> run-main MapReduceExample
[info] Running MapReduceExample 
  〜省略〜
[info] [Mapper] input key => [211], input value => [JBoss World is awesome]
[info] [Mapper] input key => [212], input value => [JBoss rules]
[info] [Mapper] input key => [114], input value => [JUDCon is cool]
[info] [Mapper] input key => [112], input value => [Boston is close to Toronto]
[info] [Mapper] input key => [113], input value => [Toronto is a capital of Ontario]
[info] [Mapper] input key => [213], input value => [JBoss division of RedHat ]
[info] [Mapper] input key => [214], input value => [RedHat community]
[info] [Mapper] input key => [111], input value => [Infinispan open source]
[info] [Mapper] input key => [15], input value => [Hello world]
[info] [Mapper] input key => [14], input value => [Infinispan community]
[info] [Mapper] input key => [12], input value => [JBoss Application Server]
[info] [Mapper] input key => [3], input value => [JUDCon is in Boston]
[info] [Mapper] input key => [2], input value => [Infinispan rules the world]
[info] [Mapper] input key => [1], input value => [Hello world here I am]
[info] [Mapper] input key => [4], input value => [JBoss World is in Boston as well]
[info] [Reducer] input key => [to]
[info] [Reducer] input key => [Boston]
[info] [Reducer] input key => [well]
[info] [Reducer] input key => [I]
[info] [Reducer] input key => [capital]
[info] [Reducer] input key => [close]
[info] [Reducer] input key => [Toronto]
[info] [Reducer] input key => [open]
[info] [Reducer] input key => [of]
[info] [Reducer] input key => [division]
[info] [Reducer] input key => [awesome]
[info] [Reducer] input key => [World]
[info] [Reducer] input key => [JBoss]
[info] [Reducer] input key => [cool]
[info] [Reducer] input key => [is]
[info] [Reducer] input key => [JUDCon]
[info] [Reducer] input key => [a]
[info] [Reducer] input key => [RedHat]
[info] [Reducer] input key => [community]
[info] [Reducer] input key => [as]
[info] [Reducer] input key => [here]
[info] [Reducer] input key => [the]
[info] [Reducer] input key => [in]
[info] [Reducer] input key => [Server]
[info] [Reducer] input key => [rules]
[info] [Reducer] input key => [Hello]
[info] [Reducer] input key => [Ontario]
[info] [Reducer] input key => [Infinispan]
[info] [Reducer] input key => [source]
[info] [Reducer] input key => [am]
[info] [Reducer] input key => [Application]
[info] [Reducer] input key => [world]
[info] ----- [MapReduceResult] START -----
[info] Key[to] = 1
[info] Key[Boston] = 3
[info] Key[well] = 1
[info] Key[capital] = 1
[info] Key[I] = 1
[info] Key[Toronto] = 2
[info] Key[close] = 1
[info] Key[of] = 2
[info] Key[open] = 1
[info] Key[division] = 1
[info] Key[awesome] = 1
[info] Key[World] = 2
[info] Key[JBoss] = 5
[info] Key[cool] = 1
[info] Key[is] = 6
[info] Key[JUDCon] = 2
[info] Key[a] = 1
[info] Key[RedHat] = 2
[info] Key[community] = 2
[info] Key[as] = 1
[info] Key[here] = 1
[info] Key[the] = 1
[info] Key[in] = 2
[info] Key[Hello] = 2
[info] Key[rules] = 2
[info] Key[Server] = 1
[info] Key[Ontario] = 1
[info] Key[source] = 1
[info] Key[Infinispan] = 3
[info] Key[am] = 1
[info] Key[Application] = 1
[info] Key[world] = 3
[info] ----- [MapReduceResult] END -----
  〜省略〜
[success] Total time: 6 s, completed 2013/03/17 19:27:27

JGroupsとInfinispanのログは端折りました。

Cacheに登録したKey/ValueがMapperに渡され、MapperでCollectorに渡したKeyがReducerの引数となっていることがわかると思います。

MapReduceTask#executeの戻り値は、java.util.Mapとなります。

分散実行

それでは、今度はInfinispanのインスタンスを増やして実行してみましょう。

インスタンスを増やすために、こんなコードを用意します。
src/main/scala/EmbeddedCacheServer.scala

import org.infinispan.manager.DefaultCacheManager

object EmbeddedCacheServer {
  def main(args: Array[String]): Unit = {
    val manager = new DefaultCacheManager("infinispan.xml")
    val cache = manager.getCache[String, String]()
  }
}

ここで、最初のMapReduceExampleにより起動されるInfiispanのインスタンスをMaster Node、EmbeddedCacheServerにより起動されるインスタンス

# Node-B
$ sbt "run-main EmbeddedCacheServer"

# Node-C
$ sbt "run-main EmbeddedCacheServer"

と呼ぶことにします。

で、Node-B,Cを起動させたところで、Master Nodeからタスクを実行。

> run-main MapReduceExample
  〜省略〜
[info] [Mapper] input key => [113], input value => [Toronto is a capital of Ontario]
[info] [Reducer] input key => [to]
[info] [Reducer] input key => [Boston]
[info] [Reducer] input key => [well]
[info] [Reducer] input key => [capital]
[info] [Reducer] input key => [I]
[info] [Reducer] input key => [Toronto]
[info] [Reducer] input key => [close]
[info] [Reducer] input key => [open]
[info] [Reducer] input key => [of]
[info] [Reducer] input key => [division]
[info] [Reducer] input key => [awesome]
[info] [Reducer] input key => [World]
[info] [Reducer] input key => [JBoss]
[info] [Reducer] input key => [cool]
[info] [Reducer] input key => [is]
[info] [Reducer] input key => [JUDCon]
[info] [Reducer] input key => [a]
[info] [Reducer] input key => [community]
[info] [Reducer] input key => [RedHat]
[info] [Reducer] input key => [as]
[info] [Reducer] input key => [here]
[info] [Reducer] input key => [the]
[info] [Reducer] input key => [in]
[info] [Reducer] input key => [rules]
[info] [Reducer] input key => [Hello]
[info] [Reducer] input key => [Server]
[info] [Reducer] input key => [Ontario]
[info] [Reducer] input key => [Infinispan]
[info] [Reducer] input key => [source]
[info] [Reducer] input key => [am]
[info] [Reducer] input key => [Application]
[info] [Reducer] input key => [world]
[info] ----- [MapReduceResult] START -----
[info] Key[to] = 1
[info] Key[Boston] = 3
[info] Key[well] = 1
[info] Key[I] = 1
[info] Key[capital] = 1
[info] Key[close] = 1
[info] Key[Toronto] = 2
[info] Key[of] = 2
[info] Key[open] = 1
[info] Key[division] = 1
[info] Key[awesome] = 1
[info] Key[World] = 2
[info] Key[JBoss] = 5
[info] Key[cool] = 1
[info] Key[is] = 6
[info] Key[JUDCon] = 2
[info] Key[a] = 1
[info] Key[community] = 2
[info] Key[RedHat] = 2
[info] Key[as] = 1
[info] Key[here] = 1
[info] Key[the] = 1
[info] Key[in] = 2
[info] Key[Server] = 1
[info] Key[Hello] = 2
[info] Key[rules] = 2
[info] Key[Ontario] = 1
[info] Key[source] = 1
[info] Key[Infinispan] = 3
[info] Key[am] = 1
[info] Key[Application] = 1
[info] Key[world] = 3
[info] ----- [MapReduceResult] END -----
  〜省略〜
[success] Total time: 5 s, completed 2013/03/17 19:38:10

Node-B

[info] [Mapper] input key => [211], input value => [JBoss World is awesome]
[info] [Mapper] input key => [3], input value => [JUDCon is in Boston]
[info] [Mapper] input key => [1], input value => [Hello world here I am]
[info] [Mapper] input key => [112], input value => [Boston is close to Toronto]
[info] [Mapper] input key => [213], input value => [JBoss division of RedHat ]
[info] [Mapper] input key => [111], input value => [Infinispan open source]
[info] [Mapper] input key => [15], input value => [Hello world]
[info] [Mapper] input key => [12], input value => [JBoss Application Server]

Node-C

[info] [Mapper] input key => [2], input value => [Infinispan rules the world]
[info] [Mapper] input key => [212], input value => [JBoss rules]
[info] [Mapper] input key => [114], input value => [JUDCon is cool]
[info] [Mapper] input key => [4], input value => [JBoss World is in Boston as well]
[info] [Mapper] input key => [214], input value => [RedHat community]
[info] [Mapper] input key => [14], input value => [Infinispan community]

Master Nodeは、Mapperでひとつの仕事しかしていませんね…。その代わり、Reducerは全部Master Nodeで実行されています。結果は、単一のNodeでやってた時と若干並び順が違う程度(結果は、HashMapなので…)で同じですね。

ここで、Reducerも分散実行させたい場合は、
src/main/scala/MapReduceExample.scala

    val task = new MapReduceTask[String, String, String, Int](cache)

    val task = new MapReduceTask[String, String, String, Int](cache, true)

と修正します。

では、もう1度。

> run-main MapReduceExample
  〜省略〜
[info] [Mapper] input key => [4], input value => [JBoss World is in Boston as well]
[info] [Reducer] input key => [cool]
[info] [Reducer] input key => [JUDCon]
[info] [Reducer] input key => [a]
[info] [Reducer] input key => [as]
[info] [Reducer] input key => [here]
[info] [Reducer] input key => [well]
[info] [Reducer] input key => [the]
[info] [Reducer] input key => [in]
[info] [Reducer] input key => [I]
[info] [Reducer] input key => [Ontario]
[info] [Reducer] input key => [open]
[info] [Reducer] input key => [of]
[info] [Reducer] input key => [Infinispan]
[info] [Reducer] input key => [awesome]
[info] [Reducer] input key => [World]
[info] [Reducer] input key => [world]
[info] ----- [MapReduceResult] START -----
[info] Key[to] = 1
[info] Key[Boston] = 3
[info] Key[well] = 1
[info] Key[capital] = 1
[info] Key[I] = 1
[info] Key[close] = 1
[info] Key[Toronto] = 2
[info] Key[open] = 1
[info] Key[of] = 2
[info] Key[division] = 1
[info] Key[awesome] = 1
[info] Key[JBoss] = 5
[info] Key[World] = 2
[info] Key[cool] = 1
[info] Key[JUDCon] = 2
[info] Key[is] = 6
[info] Key[a] = 1
[info] Key[RedHat] = 2
[info] Key[community] = 2
[info] Key[as] = 1
[info] Key[here] = 1
[info] Key[the] = 1
[info] Key[in] = 2
[info] Key[Hello] = 2
[info] Key[rules] = 2
[info] Key[Server] = 1
[info] Key[Ontario] = 1
[info] Key[source] = 1
[info] Key[Infinispan] = 3
[info] Key[am] = 1
[info] Key[Application] = 1
[info] Key[world] = 3
[info] ----- [MapReduceResult] END -----
  〜省略〜
[success] Total time: 17 s, completed 2013/03/17 19:43:41

Node-B。

[info] [Mapper] input key => [211], input value => [JBoss World is awesome]
[info] [Mapper] input key => [3], input value => [JUDCon is in Boston]
[info] [Mapper] input key => [1], input value => [Hello world here I am]
[info] [Mapper] input key => [112], input value => [Boston is close to Toronto]
[info] [Mapper] input key => [213], input value => [JBoss division of RedHat ]
[info] [Mapper] input key => [111], input value => [Infinispan open source]
[info] [Mapper] input key => [15], input value => [Hello world]
[info] [Mapper] input key => [12], input value => [JBoss Application Server]
[info] [Reducer] input key => [to]
[info] [Reducer] input key => [source]
[info] [Reducer] input key => [community]
[info] [Reducer] input key => [RedHat]
[info] [Reducer] input key => [JBoss]
[info] [Reducer] input key => [capital]
[info] [Reducer] input key => [rules]
[info] [Reducer] input key => [Hello]

Node-C。

[info] [Mapper] input key => [2], input value => [Infinispan rules the world]
[info] [Mapper] input key => [212], input value => [JBoss rules]
[info] [Mapper] input key => [114], input value => [JUDCon is cool]
[info] [Mapper] input key => [113], input value => [Toronto is a capital of Ontario]
[info] [Mapper] input key => [214], input value => [RedHat community]
[info] [Mapper] input key => [14], input value => [Infinispan community]
[info] [Reducer] input key => [is]
[info] [Reducer] input key => [division]
[info] [Reducer] input key => [am]
[info] [Reducer] input key => [Application]
[info] [Reducer] input key => [Boston]
[info] [Reducer] input key => [Server]
[info] [Reducer] input key => [close]
[info] [Reducer] input key => [Toronto]

Reducerがバラけました。ちょっとMaster Nodeに寄ってる気もしますが…。

MapReduceTaskには、3つのコンストラクタがあります。
http://docs.jboss.org/infinispan/5.2/apidocs/org/infinispan/distexec/mapreduce/MapReduceTask.html
https://github.com/infinispan/infinispan/blob/master/core/src/main/java/org/infinispan/distexec/mapreduce/MapReduceTask.java

   /**
    * Create a new MapReduceTask given a master cache node. All distributed task executions will be
    * initiated from this cache node. This task will by default only use distributed map phase while
    * reduction will be executed on task originating Infinispan node.
    * <p>
    *
    * Large and data intensive tasks whose reduction phase would exceed working memory of one
    * Infinispan node should use distributed reduce phase
    *
    * @param masterCacheNode
    * cache node initiating map reduce task
    */
   public MapReduceTask(Cache<KIn, VIn> masterCacheNode);
   
   /**
    * Create a new MapReduceTask given a master cache node. All distributed task executions will be
    * initiated from this cache node.
    *
    * @param masterCacheNode
    * cache node initiating map reduce task
    * @param distributeReducePhase
    * if true this task will use distributed reduce phase execution
    *
    */
   public MapReduceTask(Cache<KIn, VIn> masterCacheNode, boolean distributeReducePhase);
   
   /**
    * Create a new MapReduceTask given a master cache node. All distributed task executions will be
    * initiated from this cache node.
    *
    * @param masterCacheNode
    * cache node initiating map reduce task
    * @param distributeReducePhase
    * if true this task will use distributed reduce phase execution
    * @param useIntermediateSharedCache
    * if true this tasks will share intermediate value cache with other executing
    * MapReduceTasks on the grid. Otherwise, if false, this task will use its own
    * dedicated cache for intermediate values
    *
    */
   public MapReduceTask(Cache<KIn, VIn> masterCacheNode, boolean distributeReducePhase, boolean useIntermediateSharedCache)

第2引数にbooleanで指定するのは、Reduceフェーズを分散実行するかどうかです。先ほどは、ここをtrueにしたのでReducerが分散実行されました。

では、第3引数のbooleanは?といいますと、MapReduce実行時に作成される、内部キャッシュの話です。

以下の部分の

  • Intermediate KOut/VOut migration phase
  • Reduce phase

でも、よくよく見ると「temporary DIST cache」とか「temporary cache」とか言っていますね。
https://docs.jboss.org/author/display/ISPN/Map+Reduce+framework#MapReduceframework-MapReduceTaskdistributedexecution%26nbsp%3B

この一時Cacheは、Reduceフェーズの前にKey/Valueを移動する際と、Reduceフェーズで使われるようです。

この一時Cacheは

の2つから選ぶことができます。これを指定するのが第3引数で、trueにすると「MapReduceタスク全体で共有する」一時Cacheが「__tmpMapReduce」という名前で作成されます。なおかつ、このCacheは残り続けます。
falseにした場合は、タスクごとにタスクIDでCacheが作成され、MapReduceタスクの終了と共に破棄されます。
デフォルトは、trueなのでCacheが共有され、1度MapReduceタスクを実行するとクラスタに残ります。

なので、Reducerを分散した場合にどこで実行されるのかは、この一時CacheのKeyの分散結果が表れているんだろうなぁ、と予想してみます。

Collatorを使う

最後に、Collatorを使ってみます。Collatorは、Collatorインターフェースを実装したクラスを作成し、そのインスタンスをMapReduceTask#executeに放り込みます。

とりあえず、今回はランキング的なListを返すようにしてみました。出現回数が同じだった場合は、アルファベット昇順(大文字・小文字区別なし)でソートしています。

class WordCountCollator extends Collator[String, Int, List[(String, Int)]] {
  def collate(reducedResults: java.util.Map[String, Int]): List[(String, Int)] =
    reducedResults
      .asScala
      .toSeq
      .sortWith {
        case (kv1, kv2) =>
          kv1._2.compareTo(kv2._2) match {
            case 0 => kv1._1.toLowerCase < kv2._1.toLowerCase
            case n if n > 0 => true
            case n if n < 0 => false
          }
      }
      .toList
}

MapReduceTask#executeはこう変えました。

    val task = new MapReduceTask[String, String, String, Int](cache, true)
    val ranking =
      task.mappedWith(new WordCountMapper)
        .reducedWith(new WordCountReducer)
        .execute(new WordCountCollator)

    ranking.foreach(println)

Master Nodeの実行結果は、こんな感じです。

[info] [Mapper] input key => [4], input value => [JBoss World is in Boston as well]
[info] [Reducer] input key => [cool]
[info] [Reducer] input key => [open]
[info] [Reducer] input key => [Infinispan]
[info] [Reducer] input key => [a]
[info] [Reducer] input key => [as]
[info] [Reducer] input key => [World]
[info] [Reducer] input key => [I]
[info] [Reducer] input key => [capital]
[info] [Reducer] input key => [Server]
[info] [Reducer] input key => [Ontario]
[info] [Reducer] input key => [Toronto]
[info] (is,6)
[info] (JBoss,5)
[info] (Boston,3)
[info] (Infinispan,3)
[info] (world,3)
[info] (community,2)
[info] (Hello,2)
[info] (in,2)
[info] (JUDCon,2)
[info] (of,2)
[info] (RedHat,2)
[info] (rules,2)
[info] (Toronto,2)
[info] (World,2)
[info] (a,1)
[info] (am,1)
[info] (Application,1)
[info] (as,1)
[info] (awesome,1)
[info] (capital,1)
[info] (close,1)
[info] (cool,1)
[info] (division,1)
[info] (here,1)
[info] (I,1)
[info] (Ontario,1)
[info] (open,1)
[info] (Server,1)
[info] (source,1)
[info] (the,1)
[info] (to,1)
[info] (well,1)

ちなみに、MapReduceTask#executeはCollatorを受け取るものの他、結果がFutureとなる非同期版も存在します。

面白いフレームワークなのですが、MapReduceにそんなに詳しくないのでちょっと理解度として微妙な感じです…。他のプロダクトを触ってから、戻ってきた方がいいのかな?