Infinispan 5.2の頃にちょこっと触ってそれ以来だったInfinispanのMap Reduce Frameworkですが、その時にはDist Cacheでなければならないという制限がありました。
そのあたりが、Infinispan 5.3で変わっていたようなので試してみました。加えて、タイムアウトの設定もできるようになったみたいですよ。
12. Map/Reduce
http://infinispan.org/docs/6.0.x/user_guide/user_guide.html#_map_reduce
Infinispan 7.0では、性能向上もあるみたいですね〜。
では、試していってみましょう。InfinispanのMap Reduce Frameworkそのものについては、あまり書きません。
準備
何はともあれ、実行する準備を。build.sbtはこのように定義。
build.sbt
name := "infinispan-mapreduce-clustering-type" version := "0.0.1-SNAPSHOT" scalaVersion := "2.11.1" organization := "org.littlewings" scalacOptions ++= Seq("-Xlint", "-deprecation", "-unchecked", "-feature") incOptions := incOptions.value.withNameHashing(true) fork in Test := true parallelExecution in Test := true libraryDependencies ++= Seq( "org.infinispan" % "infinispan-core" % "6.0.2.Final" excludeAll( ExclusionRule(organization = "org.jgroups", name = "jgroups"), ExclusionRule(organization = "org.jboss.marshalling", name = "jboss-marshalling-river"), ExclusionRule(organization = "org.jboss.marshalling", name = "jboss-marshalling"), ExclusionRule(organization = "org.jboss.logging", name = "jboss-logging"), ExclusionRule(organization = "org.jboss.spec.javax.transaction", name = "jboss-transaction-api_1.1_spec") ), "org.jgroups" % "jgroups" % "3.4.1.Final", "org.jboss.spec.javax.transaction" % "jboss-transaction-api_1.1_spec" % "1.0.1.Final", "org.jboss.marshalling" % "jboss-marshalling-river" % "1.4.4.Final", "org.jboss.marshalling" % "jboss-marshalling" % "1.4.4.Final", "org.jboss.logging" % "jboss-logging" % "3.1.2.GA", "net.jcip" % "jcip-annotations" % "1.0", "org.scalatest" %% "scalatest" % "2.2.0" % "test" )
Infinispanの設定ファイルには、利用可能なCacheを種類別に4つ用意。
src/test/resources/infinispan.xml
<?xml version="1.0" encoding="UTF-8"?> <infinispan xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:infinispan:config:6.0 http://www.infinispan.org/schemas/infinispan-config-6.0.xsd" xmlns="urn:infinispan:config:6.0"> <global> <transport clusterName="MapReduceCluster"> <properties> <property name="configurationFile" value="jgroups.xml" /> </properties> </transport> <globalJmxStatistics enabled="true" jmxDomain="org.infinispan" cacheManagerName="DefaultCacheManager" allowDuplicateDomains="true" /> <shutdown hookBehavior="REGISTER"/> </global> <namedCache name="localCache" /> <namedCache name="distCache"> <clustering mode="dist" /> </namedCache> <namedCache name="replCache"> <clustering mode="repl" /> </namedCache> <namedCache name="invlCache"> <clustering mode="invl" /> </namedCache> </infinispan>
JGroupsの設定は端折ります。
それぞれ、Local Cache、Dist Cache、Repl Cache、Invl Cacheですね。
簡単なMapReduceを書く
とりあえず、Map Reduce Frameworkを使用するためのコードを書きましょう。
MapperとReducerを作成。
src/main/scala/org/littlewings/infinispan/mapreduce/MapperReducer.scala
package org.littlewings.infinispan.mapreduce import scala.collection.JavaConverters._ import org.infinispan.distexec.mapreduce.{Collator, Collector, Mapper, Reducer} @SerialVersionUID(1L) class DoublingMapper extends Mapper[String, Int, String, Int] { override def map(key: String, value: Int, collector: Collector[String, Int]): Unit = collector.emit(key, value * 2) } @SerialVersionUID(1L) class DoublingReducer extends Reducer[String, Int] { override def reduce(key: String, iter: java.util.Iterator[Int]): Int = iter.asScala.sum } class SummerizeCollator extends Collator[String, Int, Int] { override def collate(reduceResults: java.util.Map[String, Int]): Int = reduceResults.asScala.foldLeft(0) { case (acc, (k, v)) => acc + v } }
Map Phaseで値を2倍して、Reducerで合算します。Collatorも用意しており、こちらですべてのキーに対する合計を算出します。
まあ、単純ですね。
では、これに対するテストコードを書いて確認してみます。
src/test/scala/org/littlewings/infinispan/mapreduce/DoublingMapReduceSpec.scala
package org.littlewings.infinispan.mapreduce import org.infinispan.Cache import org.infinispan.distexec.mapreduce.MapReduceTask import org.infinispan.manager.DefaultCacheManager import org.scalatest.FunSpec import org.scalatest.Matchers._ class DoublingMapReduceSpec extends FunSpec { describe("Infinispan Map/Reduce Spec") { // ここに、テストコードを書く! } def withCache(cacheName: String, nodeNum: Int = 1)(fun: Cache[String, Int] => Unit): Unit = { val managers = (1 to nodeNum).map(_ => new DefaultCacheManager("infinispan.xml")) try { managers.foreach(_.getCache[String, Int](cacheName)) val manager = managers.head val cache = manager.getCache[String, Int](cacheName) try { fun(cache) } finally { cache.stop() } } finally { managers.foreach(_.stop()) } } }
いつもの、簡単なクラスタ環境でのテスト実行コードです。
では順に見ていくと…まずはLocal Cache。
it("local cache") { withCache("localCache") { cache => (1 to 10).foreach(i => cache.put(s"key$i", i)) val task = new MapReduceTask[String, Int, String, Int](cache, true) task .mappedWith(new DoublingMapper) .reducedWith(new DoublingReducer) val result = task.execute(new SummerizeCollator) result should be (110) } }
普通に動作します!
Dist Cache。これは以前からですね。
it("dist cache") { withCache("distCache", 4) { cache => (1 to 10).foreach(i => cache.put(s"key$i", i)) val task = new MapReduceTask[String, Int, String, Int](cache, true) task .mappedWith(new DoublingMapper) .reducedWith(new DoublingReducer) val result = task.execute(new SummerizeCollator) result should be (110) } }
Repl Cache。
it("repl cache") { withCache("replCache", 4) { cache => (1 to 10).foreach(i => cache.put(s"key$i", i)) val task = new MapReduceTask[String, Int, String, Int](cache, true) task .mappedWith(new DoublingMapper) .reducedWith(new DoublingReducer) val result = task.execute(new SummerizeCollator) result should be (110) } }
こちらも動作します。
Invl Cache。さすがに、こちらはダメでした。
it("invl cache") { withCache("invlCache", 4) { cache => (1 to 10).foreach(i => cache.put(s"key$i", i)) the [IllegalStateException] thrownBy { new MapReduceTask[String, Int, String, Int](cache, true) } should have message "ISPN000231: Cache mode should be DIST or REPL, rather than INVALIDATION_SYNC" } }
実行しようとすると、IllegalStateExceptionがスローされます。
Invl Cacheは対象外とはいえ、Local CacheとRepl Cacheで実行可能になっているのは嬉しいのではないでしょうか?
で、これがいつ入ったかというと、以下のISSUEらしいです。Infinispan 5.3からみたいですね。
[ISPN-2812] Allow Map-Reduce tasks run in local and replicated envs.
https://issues.jboss.org/browse/ISPN-2812
自分は、なんとなくマニュアルを読んでいて「あれ?Dist Cache以外にも動かせそうな気配が…?」と思った記憶があるのですが、どこを見てそう思ったのかまったく覚えておりません…。
Repl Cacheの場合、どのNodeがデータのPrimaryLocationになるんだっけ?Map Reduceってそれで動くんだっけ?と思いましたが、以下のコードを書いて試してみた結果…
it("print key locate") { withCache("replCache", 4) { cache => (1 to 10).foreach(i => cache.put(s"key$i", s"value$i")) val dm = cache.getAdvancedCache.getDistributionManager cache.keySet.asScala.toList.sorted.foreach { key => println(s"Key[$key]: PrimaryLocation[${dm.getPrimaryLocation(key)}], locate:${dm.locate(key)}") } } }
このような形になりました。
Key[key1]: PrimaryLocation[xxxxx-63419], locate:[xxxxx-63419, xxxxx-40323, xxxxx-9323, xxxxx-25440] Key[key10]: PrimaryLocation[xxxxx-25440], locate:[xxxxx-25440, xxxxx-40323, xxxxx-9323, xxxxx-63419] Key[key2]: PrimaryLocation[xxxxx-63419], locate:[xxxxx-63419, xxxxx-40323, xxxxx-9323, xxxxx-25440] Key[key3]: PrimaryLocation[xxxxx-63419], locate:[xxxxx-63419, xxxxx-40323, xxxxx-9323, xxxxx-25440] Key[key4]: PrimaryLocation[xxxxx-25440], locate:[xxxxx-25440, xxxxx-40323, xxxxx-9323, xxxxx-63419] Key[key5]: PrimaryLocation[xxxxx-40323], locate:[xxxxx-40323, xxxxx-9323, xxxxx-63419, xxxxx-25440] Key[key6]: PrimaryLocation[xxxxx-40323], locate:[xxxxx-40323, xxxxx-9323, xxxxx-63419, xxxxx-25440] Key[key7]: PrimaryLocation[xxxxx-9323], locate:[xxxxx-9323, xxxxx-40323, xxxxx-63419, xxxxx-25440] Key[key8]: PrimaryLocation[xxxxx-9323], locate:[xxxxx-9323, xxxxx-40323, xxxxx-63419, xxxxx-25440] Key[key9]: PrimaryLocation[xxxxx-40323], locate:[xxxxx-40323, xxxxx-9323, xxxxx-63419, xxxxx-25440]
Repl Cacheでも、PrimaryLocationがあるみたいですね。
なお、このData Localityを見ているのはMapReduceManagerImplというクラスですが、この際はClusteringDependentLogicというインターフェースの実装を使って頑張っているようです。
タイムアウトの設定について
Infinispan 5.3から、タイムアウトの設定ができるようになったみたいです。
val task = new MapReduceTask[String, Int, String, Int](cache, true) // Mapper、Reducerの設定 task.timeout(3, TimeUnit.SECONDS)
こんな感じで、TimeUnitと一緒に指定します。
確認してみるために、このようなMapper/Reducerを用意。
src/main/scala/org/littlewings/infinispan/mapreduce/SlowMapperReducer.scala
package org.littlewings.infinispan.mapreduce import scala.collection.JavaConverters._ import org.infinispan.distexec.mapreduce.{Collector, Mapper, Reducer} import java.util.concurrent.TimeUnit @SerialVersionUID(1L) class SlowMapper(waitTime: Long, timeUnit: TimeUnit) extends Mapper[String, Int, String, Int] { override def map(key: String, value: Int, collator: Collector[String, Int]): Unit = { timeUnit.sleep(waitTime) collator.emit(key, value) } } @SerialVersionUID(1L) class SlowReducer(waitTime: Long, timeUnit: TimeUnit) extends Reducer[String, Int] { override def reduce(key: String, iter: java.util.Iterator[Int]): Int = { timeUnit.sleep(waitTime) iter.asScala.sum } }
map/reduceの呼び出しのたびに、指定の単位でスリープするMapper/Reducerの実装です。
では、これをテスト。
it("timeout test") { withCache("distCache", 4) { cache => (1 to 10).foreach(i => cache.put(s"key$i", i)) val task = new MapReduceTask[String, Int, String, Int](cache, true) task .mappedWith(new SlowMapper(1, TimeUnit.SECONDS)) .reducedWith(new SlowReducer(1, TimeUnit.SECONDS)) .timeout(3, TimeUnit.SECONDS) the [CacheException] thrownBy { task.execute } getCause() should be (a [ExecutionException]) } }
並列実行されるので、投入するデータの数とタイムアウトの設定を合わせるのが若干難しいですが、うちの環境だとこれくらいなら実行に3秒以上かかるのでタイムアウトします。
タイムアウトは、CacheExceptionがスローされ、その中にタイムアウトした旨の情報が含まれる形で報告されます。
ところで、このタイムアウトのデフォルト値はいくらなんでしょう?
it("default timeout") { withCache("distCache", 4) { cache => (1 to 10).foreach(i => cache.put(s"key$i", i)) val task = new MapReduceTask[String, Int, String, Int](cache, true) task.timeout(TimeUnit.MILLISECONDS) should be (15000) } }
15,000ミリ秒、15秒ですね。
これってどこから出てきたのかというと…設定でいう
<namedCache name="distCache"> <clustering mode="dist"> <sync replTimeout="15000" /> </clustering> </namedCache>
のreplTimeoutのデフォルト値になります。このデフォルト値が15秒であり、内部的にこの値を使用しているからみたいですね。
久々に使いましたが、いろいろ確認になりました。
今回作成した全コードは、こちらに置いてあります。
https://github.com/kazuhira-r/infinispan-examples/tree/master/infinispan-mapreduce-clustering-type