CLOVER🍀

That was when it all began.

InfinispanのMap Reduce Frameworkを使う

インメモリ・データグリッドのInfinispanですが、分散処理を行うことができるMap Reduce Frameworkが搭載されています。

Map/Reduce
http://infinispan.org/docs/7.1.x/user_guide/user_guide.html#_map_reduce

Infinispan 5.0から搭載されているようです。

最近、Infinispan 7.1が出ましたが、7系でまた機能が増えているようなのでここで見返し。Reduceの結果を、単一のMapとして返すのではなく、InfinispanのCacheに格納する機能が増えたようです。

それでは、見ていってみましょう。

準備

まずは依存関係の定義。
build.sbt

name := "map-reduce"

version := "0.0.1-SNAPSHOT"

scalaVersion := "2.11.5"

organization := "org.littlewings"

updateOptions := updateOptions.value.withCachedResolution(true)

scalacOptions ++= Seq("-Xlint", "-unchecked", "-deprecation", "-feature")

fork in Test := true

libraryDependencies ++= Seq(
  "org.infinispan" % "infinispan-core" % "7.1.0.Final",
  "net.jcip" % "jcip-annotations" % "1.0" % "provided",
  "org.scalatest" %% "scalatest" % "2.2.4" % "test"
)

Map Reduceはcoreモジュールに含まれているので、そのまま使うことができます。

Mapper/Reducer/Collatorの用意

今回は、Map Reduceのネタとしては、ドキュメントに記載のある(といっても、マネしたのはデータだけですが)WordCountとしました。

Mapper
src/main/scala/org/littlewings/infinispan/mapreduce/WordCountMapper.scala

package org.littlewings.infinispan.mapreduce

import org.infinispan.distexec.mapreduce.{ Collector, Mapper }

@SerialVersionUID(1L)
class WordCountMaper extends Mapper[String, String, String, Int] with Serializable {
  override def map(key: String, value: String, collector: Collector[String, Int]): Unit =
    value.split("""\s+""").foreach(w => collector.emit(w, 1))
}

Reducer
src/main/scala/org/littlewings/infinispan/mapreduce/WordCountReducer.scala

package org.littlewings.infinispan.mapreduce

import scala.collection.JavaConverters._

import org.infinispan.distexec.mapreduce.Reducer

@SerialVersionUID(1L)
class WordCountReducer extends Reducer[String, Int] with Serializable {
  override def reduce(key: String, iterator: java.util.Iterator[Int]): Int =
    iterator.asScala.foldLeft(0) { (acc, cur) => acc + cur }
}

Mapper、Reducerインターフェースは、見たままなので説明は特にいいかなと思います。が、注意点としては、双方Serializableであることが必要です。

また、結果のMapに対して適用するCollatorも用意してみました。今回は、WordCountの結果で最も出現数が高い1語を選出します。
src/main/scala/org/littlewings/infinispan/mapreduce/TopWordCollator.scala

package org.littlewings.infinispan.mapreduce

import scala.collection.JavaConverters._

import org.infinispan.distexec.mapreduce.Collator

class TopWordCollator extends Collator[String, Int, String] {
  override def collate(reducedResults: java.util.Map[String, Int]): String =
    reducedResults
      .asScala
      .toVector
      .sortWith { case ((key1, value1), (key2, value2)) => value1 > value2 }
      .head._1
}

Infinispanの設定

今回の設定は、このようにしました。
src/test/resources/infinispan.xml

<?xml version="1.0" encoding="UTF-8"?>
<infinispan
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="urn:infinispan:config:7.0 http://www.infinispan.org/schemas/infinispan-config-7.0.xsd"
    xmlns="urn:infinispan:config:7.0">
  <jgroups>
    <stack-file name="udp" path="jgroups.xml" />
  </jgroups>

  <cache-container name="cacheManager" shutdown-hook="REGISTER">
    <transport cluster="cluster" stack="udp" />
    <jmx duplicate-domains="true" />

    <distributed-cache name="wordCountCache" />

    <distributed-cache name="mapReduceResultCache" owners="1" />

    <distributed-cache name="mapReduceIntermediateCache"
                       owners="1"
                       unreliable-return-values="true" />
  </cache-container>
</infinispan>

全部Distributed Cacheですが、別にそうある必要はありません。Map Reduce Frameworkを使うためには、CacheはLocal Cache、Distributed Cache、Replicated Cacheのいずれかであれば動作します。Invalidation Cacheでは動作しません。

登場時に少し触れますが、「wordCountCache」がMap Reduceの処理ネタとなるデータを格納するCacheです。これは、どのパターンでも使います。「mapReduceResultCache」は、Map Reduceの結果をCacheに格納する場合に使うCacheです。「mapReduceIntermediateCache」は、Map Reduceの各フェーズの中間結果を格納するCacheです。後ろの2つは、明示的に使わなければ登場しません。

では、サンプルを出しながら使い方を見ていきます。

テストコードの骨格

…とその前に、テストコードの骨格を出しておきます。コードはScalaTestで書いていて、describeの中にテストケースを書いていきます。
src/test/scala/org/littlewings/infinispan/mapreduce/MapReduceSpec.scala

package org.littlewings.infinispan.mapreduce

import java.util.concurrent.TimeUnit

import org.infinispan.Cache
import org.infinispan.distexec.mapreduce.MapReduceTask
import org.infinispan.manager.DefaultCacheManager

import org.scalatest.Entry
import org.scalatest.FunSpec
import org.scalatest.Matchers._

class MapReduceSpec extends FunSpec {
  describe("Map Reduce Spec") {
   // ここに、テストコードを書く!
  }

  private def registerDataToCache(cache: Cache[String, String]): Unit = {
    cache.put("1", "Hello world here I am")
    cache.put("2", "Infinispan rules the world")
    cache.put("3", "JUDCon is in Boston")
    cache.put("4", "JBoss World is in Boston as well")
    cache.put("12","JBoss Application Server")
    cache.put("13", "Hello world")
    cache.put("14", "Infinispan community");
    cache.put("16", "Hello world")

    cache.put("111", "Infinispan open source")
    cache.put("112", "Boston is close to Toronto")
    cache.put("113", "Toronto is a capital of Ontario")
    cache.put("114", "JUDCon is cool")
    cache.put("211", "JBoss World is awesome");
    cache.put("212", "JBoss rules")
    cache.put("213", "JBoss division of RedHat ")
    cache.put("214", "RedHat community")
  }

  protected def withCache[K, V](numInstances: Int, configurationFile: String, cacheName: String)(fun: Cache[K, V] => Unit): Unit = {
    val managers = (1 to numInstances).map(_ => new DefaultCacheManager(configurationFile))

    try {
      managers.foreach(_.getCache[K, V](cacheName))

      val manager = managers.head
      val cache = manager.getCache[K, V](cacheName)

      fun(cache)

      cache.stop()
    } finally {
      managers.foreach(_.stop())
    }
  }
}

また、単一JavaVMでクラスタを構成するための簡易メソッド、Map Reduceの元ネタとなるデータをCacheに登録する処理も書いています。

これらを使って、テストコードを書いていきます。

基本的な使い方

以下がシンプルな例になります。

      withCache[String, String](3, "infinispan.xml", "wordCountCache") { cache =>
        registerDataToCache(cache)

        cache should have size (16)

        val task = new MapReduceTask[String, String, String, Int](cache)
        val result =
          task
            .mappedWith(new WordCountMaper)
            .reducedWith(new WordCountReducer)
            .execute

        result should have size (32)
        result should contain (Entry("is", 6))
        result should contain (Entry("Infinispan", 3))
        result should contain (Entry("RedHat", 2))
      }
    }

処理対象としたいデータを持ったCacheをコンストラクタに渡して、MapRecudeTaskを作成します。

        val task = new MapReduceTask[String, String, String, Int](cache)

このMapReduceTaskに対して、先ほど作成したMapper、Reducerを登録してexecuteします。結果は、java.util.Mapで返ってきます。

        val result =
          task
            .mappedWith(new WordCountMaper)
            .reducedWith(new WordCountReducer)
            .execute

各メソッドは、DSLっぽく繋げられます。簡単ですね!

なお、今回は使いませんがexecuteの部分をexecuteAsynchronouslyとすることで、結果を非同期(Future<Map>)で受け取ることもできます。

Collatorを使う

前の例では、Reduceした結果をそのままMapとして受け取った形ですが、今度はそれにCollatorを加えてみます。

    it("Infinispan Example, using Collator") {
      withCache[String, String](3, "infinispan.xml", "wordCountCache") { cache =>
        registerDataToCache(cache)

        val task = new MapReduceTask[String, String, String, Int](cache)
        val result =
          task
            .mappedWith(new WordCountMaper)
            .reducedWith(new WordCountReducer)
            .execute(new TopWordCollator)

        result should be ("is")
      }
    }

最後のexecuteの部分に、作成したCollatorを渡すだけですね。これで、executeで得られたMapがCollatorに渡されてきます。今回は、WordCountの結果から最も登場回数の多い単語を選出しました。結果は、「is」らしいです。

Reduceの結果を、InfinispanのCacheに格納する

先ほどまでのコードは、Map Reduceの結果は単一の(Map Reduceを開始した)Nodeのメモリにすべて乗ることになります。しかし、それではMap Reduceの結果がとても大きい場合には、困ったことになるかもしれません。

そういうことからか、Infinispan 7.0からexecuteの結果をCacheに格納できるようになりました。

    it("Result Into Cache") {
      withCache[String, String](3, "infinispan.xml", "wordCountCache") { cache =>
        registerDataToCache(cache)

        val task = new MapReduceTask[String, String, String, Int](cache)
        task
          .mappedWith(new WordCountMaper)
          .reducedWith(new WordCountReducer)
          .execute("mapReduceResultCache")

        val resultCache =
          cache.getCacheManager.getCache[String, Int]("mapReduceResultCache")

        resultCache.getCacheConfiguration.clustering.hash.numOwners should be (1)

        resultCache should have size (32)
        resultCache.get("is") should be (6)
        resultCache.get("Infinispan") should be (3)
        resultCache.get("RedHat") should be (2)
      }
    }

ここでは、executeの引数に事前定義した「mapReduceResultCache」に結果を格納します。

        val task = new MapReduceTask[String, String, String, Int](cache)
        task
          .mappedWith(new WordCountMaper)
          .reducedWith(new WordCountReducer)
          .execute("mapReduceResultCache")

よって、executeの戻り値はvoidとなり、結果は別途EmbeddedCacheManagerからCacheとして取得します。

        val resultCache =
          cache.getCacheManager.getCache[String, Int]("mapReduceResultCache")

executeの引数は今回はStringでCacheの名前を与えていますが、InfinispanのCacheを直接与えるバージョンもあります。ですが、中身はCache#getNameして結局同じ実装に行き着いたりします。
https://github.com/infinispan/infinispan/blob/7.1.0.Final/core/src/main/java/org/infinispan/distexec/mapreduce/MapReduceTask.java#L433

まあ、お好みで。

あと、Infinispanは関係ないですけど、結果を確認する時に通常のMapではなくてInfinispanのCacheになると、ScalaTestのEntryがうまく使えませんでした…。

Reduceフェーズを分散実行する

Reducerは、デフォルトだと単一Nodeで行われます。これをクラスタの各Nodeで実行するようにするには、MapReduceTaskの作成時のコンストラクタ引数をちょっと変更します。

    it("Distributed Reduce Phase") { 
      withCache[String, String](3, "infinispan.xml", "wordCountCache") { cache =>
        registerDataToCache(cache)

        val task = new MapReduceTask[String, String, String, Int](cache, true)
        task
          .mappedWith(new WordCountMaper)
          .reducedWith(new WordCountReducer)
          .execute("mapReduceResultCache")

        val resultCache =
          cache.getCacheManager.getCache[String, Int]("mapReduceResultCache")

        resultCache should have size (32)
        resultCache.get("is") should be (6)
        resultCache.get("Infinispan") should be (3)
        resultCache.get("RedHat") should be (2)
      }
    }

MapReduceTaskのコンストラクタに第2引数を加え、これをtrueにするとReduceフェーズが分散実行されるようになります。

        val task = new MapReduceTask[String, String, String, Int](cache, true)

タスクのタイムアウトを設定する

Map Reduceタスクのタイムアウトを、timeoutで設定することができます。TimeUnitのみを引数にすると現在のタイムアウト値が取得でき、intとTimeUnitを引数に取る方だとその値で設定することができます。

    it("default time out") {
      withCache[String, String](3, "infinispan.xml", "wordCountCache") { cache =>
        val task = new MapReduceTask[String, String, String, Int](cache)

        task.timeout(TimeUnit.SECONDS) should be (0)
      }
    }

    it("configuration time out") {
      withCache[String, String](3, "infinispan.xml", "wordCountCache") { cache =>
        val task = new MapReduceTask[String, String, String, Int](cache)

        task.timeout(15, TimeUnit.SECONDS)
      }
    }

なお、デフォルトは0でタイムアウトしません。以前は、15秒(レプリケーションタイムアウトのデフォルト値)だったのですが、途中で変えられたようです。

中間結果を単一のCacheとする

MapperおよびReducerの各フェーズの結果ですが、実はInfinispanのCacheに格納されています。これを、MapReduceTaskごとに同じCacheを使用するのか、別々のCacheを使うのかをMapReduceTaskの第3引数を使うことで制御できます。

    it("Use Shared Intermediate Cache") {
      withCache[String, String](3, "infinispan.xml", "wordCountCache") { cache =>
        registerDataToCache(cache)

        val task = new MapReduceTask[String, String, String, Int](cache, true, true)
        task
          .mappedWith(new WordCountMaper)
          .reducedWith(new WordCountReducer)
          .execute("mapReduceResultCache")

        val resultCache =
          cache.getCacheManager.getCache[String, Int]("mapReduceResultCache")

        resultCache should have size (32)
        resultCache.get("is") should be (6)
        resultCache.get("Infinispan") should be (3)
        resultCache.get("RedHat") should be (2)
      }
    }

第3引数をtrueにすると、MapReduceTaskで同じCacheを使うように設定されます。ちなみに、この時のCache名はデフォルトで「__tmpMapReduce」となります。
https://github.com/infinispan/infinispan/blob/7.1.0.Final/core/src/main/java/org/infinispan/distexec/mapreduce/MapReduceTask.java#L145
https://github.com/infinispan/infinispan/blob/7.1.0.Final/core/src/main/java/org/infinispan/distexec/mapreduce/MapReduceTask.java#L227

また、falseにするとUUID#randomUUIDをStringにした値がCache名となります。
https://github.com/infinispan/infinispan/blob/7.1.0.Final/core/src/main/java/org/infinispan/distexec/mapreduce/MapReduceTask.java#L229

つまり、InfinispanのMap Reduce Frameworkを使う時には、最低で2つのCacheが必要ということになりますね。結果をCacheに格納する場合はもうひとつ、と。

一応、タスクごとにCacheを作成する場合は、以下のようなコードになります。まあ、MapReduceTaskの第3引数をfalseにするだけですが。

    it("Use Task Per Intermediate Cache") {
      withCache[String, String](3, "infinispan.xml", "wordCountCache") { cache =>
        registerDataToCache(cache)

        val task = new MapReduceTask[String, String, String, Int](cache, true, false)
        task
          .mappedWith(new WordCountMaper)
          .reducedWith(new WordCountReducer)
          .execute("mapReduceResultCache")

        val resultCache =
          cache.getCacheManager.getCache[String, Int]("mapReduceResultCache")

        resultCache should have size (32)
        resultCache.get("is") should be (6)
        resultCache.get("Infinispan") should be (3)
        resultCache.get("RedHat") should be (2)
      }
    }

MapReduceTaskのデフォルト値は?

こう書くと、MapReduceTaskのコンストラクタ引数のデフォルト値がどうなっているのか気になるところです。

以下、確認してみました。

コンストラク Reduceフェーズの分散実行 中間結果のCacheの共有 該当ソース
MapReduceTask(Cache masterCacheNode) false false https://github.com/infinispan/infinispan/blob/7.1.0.Final/core/src/main/java/org/infinispan/distexec/mapreduce/MapReduceTask.java#L181
MapReduceTask(Cache masterCacheNode, boolean distributeReducePhase) 自分で指定 true https://github.com/infinispan/infinispan/blob/7.1.0.Final/core/src/main/java/org/infinispan/distexec/mapreduce/MapReduceTask.java#L195

Reduceフェーズを設定を行うと、中間結果のCacheを共有するように設定されるみたいですね!

ちなみに、中間結果のCacheを共有する、と書くとキーとか被らないか心配になるところですが、タスクのIDとキーを使った、内部的なキーを別に作成するようなので心配不要?
https://github.com/infinispan/infinispan/blob/7.1.0.Final/core/src/main/java/org/infinispan/distexec/mapreduce/MapReduceManagerImpl.java#L341

でも、Reducerの方は…?
https://github.com/infinispan/infinispan/blob/7.1.0.Final/core/src/main/java/org/infinispan/distexec/mapreduce/MapReduceManagerImpl.java#L157

MapReduceTaskで共有する中間Cacheを指定する

先ほど紹介した中間Cacheですが、どのCacheを指定するのか設定することができます。まずは、共有するパターンから。

    it("Use Shared Intermediate Cache, Spec Cache") {
      withCache[String, String](3, "infinispan.xml", "wordCountCache") { cache =>
        registerDataToCache(cache)

        val task = new MapReduceTask[String, String, String, Int](cache, true, true)
        task.usingSharedIntermediateCache("mapReduceIntermediateCache")
        task
          .mappedWith(new WordCountMaper)
          .reducedWith(new WordCountReducer)
          .execute("mapReduceResultCache")

        val resultCache =
          cache.getCacheManager.getCache[String, Int]("mapReduceResultCache")

        resultCache should have size (32)
        resultCache.get("is") should be (6)
        resultCache.get("Infinispan") should be (3)
        resultCache.get("RedHat") should be (2)
      }
    }

MapReduceTask#usingSharedIntermediateCacheを使用することで、どのCacheを中間Cacheとして使用するか指定することができます。今回は、「mapReduceIntermediateCache」というCacheを中間Cacheに選びました。
※繰り替えしますが、他のMapReduceTaskでも共有するCacheのパターンです

        task.usingSharedIntermediateCache("mapReduceIntermediateCache")

なお、このメソッドを使用すると、MapReduceTaskコンストラクタの第3引数での指定をtrueにしたことに再設定されます。


また、既存のCacheの定義を使って、新たなCacheを作成してそれを中間Cache

    it("Use Shared Intermediate Cache, Spec Cache and Cache Configuration") {
      withCache[String, String](3, "infinispan.xml", "wordCountCache") { cache =>
        registerDataToCache(cache)

        val task = new MapReduceTask[String, String, String, Int](cache, true, true)
        task.usingSharedIntermediateCache("mapReduceTmpCache", "mapReduceIntermediateCache")
        task
          .mappedWith(new WordCountMaper)
          .reducedWith(new WordCountReducer)
          .execute("mapReduceResultCache")

        val resultCache =
          cache.getCacheManager.getCache[String, Int]("mapReduceResultCache")

        resultCache should have size (32)
        resultCache.get("is") should be (6)
        resultCache.get("Infinispan") should be (3)
        resultCache.get("RedHat") should be (2)
      }
    }

こう書くと、「mapReduceIntermediateCache」のCache定義を元にして、「mapReduceTmpCache」というCacheを定義してMap Reduceの中間Cacheとして使用します。

        task.usingSharedIntermediateCache("mapReduceTmpCache", "mapReduceIntermediateCache")

ところで、この共有版の中間Cacheですが、Map Reduceのタスク実行後も残り続けます。Cacheのエントリは、非同期に削除されるようです。
https://github.com/infinispan/infinispan/blob/7.1.0.Final/core/src/main/java/org/infinispan/distexec/mapreduce/MapReduceTask.java#L492

MapReduceTask単位で使う中間Cacheの設定を指定する

今度は、タスク単位で中間Cacheを使う場合です。

    it("Use Task Per Intermediate Cache, Spec Cache Configuration") {
      withCache[String, String](3, "infinispan.xml", "wordCountCache") { cache =>
        registerDataToCache(cache)

        val task = new MapReduceTask[String, String, String, Int](cache, true, false)
        // 内部的に、UUID#randomUUIDを使った名前でCacheが作成され、タスク終了時に削除される
        task.usingIntermediateCache("mapReduceIntermediateCache")
        task
          .mappedWith(new WordCountMaper)
          .reducedWith(new WordCountReducer)
          .execute("mapReduceResultCache")

        val resultCache =
          cache.getCacheManager.getCache[String, Int]("mapReduceResultCache")

        resultCache should have size (32)
        resultCache.get("is") should be (6)
        resultCache.get("Infinispan") should be (3)
        resultCache.get("RedHat") should be (2)
      }
    }

この場合は、MapReduceTask#usingIntermediateCacheで使用するCacheの名前を指定しますが、こちらはCacheの定義が使用されます。このCacheの定義をテンプレートに、UUID#randomUUIDを使った名前でCacheが作成され、こちらが中間Cacheとなります。

        // 内部的に、UUID#randomUUIDを使った名前でCacheが作成され、タスク終了時に削除される
        task.usingIntermediateCache("mapReduceIntermediateCache")

共有版の中間Cacheとは異なり、こちらはMap Reduce終了時に削除されます。
https://github.com/infinispan/infinispan/blob/7.1.0.Final/core/src/main/java/org/infinispan/distexec/mapreduce/MapReduceTask.java#L486

MapReduceTask#usingIntermediateCacheを使うと、MapReduceTaskの第3引数をfalseに設定したように変更されるのですが、もしも最初がtrueだった場合は、中間Cacheの名前は「__tmpMapReduce」になりそうな気がします…。

デフォルトの中間Cacheの設定は?

オマケです。気になったところなので。

特に中間Cacheの名前を指定しないと、「__tmpMapReduce」というCacheが実行時に定義されます。タスク単位で中間Cacheを使うように設定した場合は、「__tmpMapReduce」の設定をテンプレートにCacheを作成します。

この定義内容なのですが、ここで実行時に動的に定義されるようです。
https://github.com/infinispan/infinispan/blob/7.1.0.Final/core/src/main/java/org/infinispan/commands/CreateCacheCommand.java#L65

Distributed Cacheで同期モード、unreliable-return-valuesをtrue(Mapの契約を破って、パフォーマンスアップ)、エントリのオーナーは2つといったところですね。

いろいろ追ったので、勉強になりました。

今回作成したコードは、こちらに置いています。
https://github.com/kazuhira-r/infinispan-getting-started/tree/master/embedded-map-reduce