CLOVER🍀

That was when it all began.

InfinispanのMap Reduce Frameworkが、Local/Repl Cacheでも実行できるようになってた+α

Infinispan 5.2の頃にちょこっと触ってそれ以来だったInfinispanのMap Reduce Frameworkですが、その時にはDist Cacheでなければならないという制限がありました。

そのあたりが、Infinispan 5.3で変わっていたようなので試してみました。加えて、タイムアウトの設定もできるようになったみたいですよ。

12. Map/Reduce
http://infinispan.org/docs/6.0.x/user_guide/user_guide.html#_map_reduce

Infinispan 7.0では、性能向上もあるみたいですね〜。

では、試していってみましょう。InfinispanのMap Reduce Frameworkそのものについては、あまり書きません。

準備

何はともあれ、実行する準備を。build.sbtはこのように定義。
build.sbt

name := "infinispan-mapreduce-clustering-type"

version := "0.0.1-SNAPSHOT"

scalaVersion := "2.11.1"

organization := "org.littlewings"

scalacOptions ++= Seq("-Xlint", "-deprecation", "-unchecked", "-feature")

incOptions := incOptions.value.withNameHashing(true)

fork in Test := true

parallelExecution in Test := true

libraryDependencies ++= Seq(
  "org.infinispan" % "infinispan-core" % "6.0.2.Final" excludeAll(
    ExclusionRule(organization = "org.jgroups", name = "jgroups"),
    ExclusionRule(organization = "org.jboss.marshalling", name = "jboss-marshalling-river"),
    ExclusionRule(organization = "org.jboss.marshalling", name = "jboss-marshalling"),
    ExclusionRule(organization = "org.jboss.logging", name = "jboss-logging"),
    ExclusionRule(organization = "org.jboss.spec.javax.transaction", name = "jboss-transaction-api_1.1_spec")
  ),
  "org.jgroups" % "jgroups" % "3.4.1.Final",
  "org.jboss.spec.javax.transaction" % "jboss-transaction-api_1.1_spec" % "1.0.1.Final",
  "org.jboss.marshalling" % "jboss-marshalling-river" % "1.4.4.Final",
  "org.jboss.marshalling" % "jboss-marshalling" % "1.4.4.Final",
  "org.jboss.logging" % "jboss-logging" % "3.1.2.GA",
  "net.jcip" % "jcip-annotations" % "1.0",
  "org.scalatest" %% "scalatest" % "2.2.0" % "test"
)

Infinispanの設定ファイルには、利用可能なCacheを種類別に4つ用意。
src/test/resources/infinispan.xml

<?xml version="1.0" encoding="UTF-8"?>
<infinispan
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="urn:infinispan:config:6.0 http://www.infinispan.org/schemas/infinispan-config-6.0.xsd"
    xmlns="urn:infinispan:config:6.0">

  <global>
    <transport clusterName="MapReduceCluster">
      <properties>
        <property name="configurationFile" value="jgroups.xml" />
      </properties>
    </transport>
    <globalJmxStatistics
        enabled="true"
        jmxDomain="org.infinispan"
        cacheManagerName="DefaultCacheManager"
        allowDuplicateDomains="true"
        />

    <shutdown hookBehavior="REGISTER"/>
  </global>

  <namedCache name="localCache" />

  <namedCache name="distCache">
    <clustering mode="dist" />
  </namedCache>

  <namedCache name="replCache">
    <clustering mode="repl" />
  </namedCache>

  <namedCache name="invlCache">
    <clustering mode="invl" />
  </namedCache>
</infinispan>

JGroupsの設定は端折ります。

それぞれ、Local Cache、Dist Cache、Repl Cache、Invl Cacheですね。

簡単なMapReduceを書く

とりあえず、Map Reduce Frameworkを使用するためのコードを書きましょう。

MapperとReducerを作成。
src/main/scala/org/littlewings/infinispan/mapreduce/MapperReducer.scala

package org.littlewings.infinispan.mapreduce

import scala.collection.JavaConverters._

import org.infinispan.distexec.mapreduce.{Collator, Collector, Mapper, Reducer}

@SerialVersionUID(1L)
class DoublingMapper extends Mapper[String, Int, String, Int] {
  override def map(key: String, value: Int, collector: Collector[String, Int]): Unit =
    collector.emit(key, value * 2)
}

@SerialVersionUID(1L)
class DoublingReducer extends Reducer[String, Int] {
  override def reduce(key: String, iter: java.util.Iterator[Int]): Int =
    iter.asScala.sum
}

class SummerizeCollator extends Collator[String, Int, Int] {
  override def collate(reduceResults: java.util.Map[String, Int]): Int =
    reduceResults.asScala.foldLeft(0) { case (acc, (k, v)) => acc + v }
}

Map Phaseで値を2倍して、Reducerで合算します。Collatorも用意しており、こちらですべてのキーに対する合計を算出します。

まあ、単純ですね。

では、これに対するテストコードを書いて確認してみます。
src/test/scala/org/littlewings/infinispan/mapreduce/DoublingMapReduceSpec.scala

package org.littlewings.infinispan.mapreduce

import org.infinispan.Cache
import org.infinispan.distexec.mapreduce.MapReduceTask
import org.infinispan.manager.DefaultCacheManager

import org.scalatest.FunSpec
import org.scalatest.Matchers._

class DoublingMapReduceSpec extends FunSpec {
  describe("Infinispan Map/Reduce Spec") {
    // ここに、テストコードを書く!
  }

  def withCache(cacheName: String, nodeNum: Int = 1)(fun: Cache[String, Int] => Unit): Unit = {
    val managers = (1 to nodeNum).map(_ => new DefaultCacheManager("infinispan.xml"))

    try {
      managers.foreach(_.getCache[String, Int](cacheName))

      val manager = managers.head
      val cache = manager.getCache[String, Int](cacheName)

      try {
        fun(cache)
      } finally {
        cache.stop()
      }
    } finally {
      managers.foreach(_.stop())
    }
  }
}

いつもの、簡単なクラスタ環境でのテスト実行コードです。

では順に見ていくと…まずはLocal Cache。

    it("local cache") {
      withCache("localCache") { cache =>
        (1 to 10).foreach(i => cache.put(s"key$i", i))

        val task = new MapReduceTask[String, Int, String, Int](cache, true)
        task
          .mappedWith(new DoublingMapper)
          .reducedWith(new DoublingReducer)

        val result = task.execute(new SummerizeCollator)

        result should be (110)
      }
    }

普通に動作します!

Dist Cache。これは以前からですね。

    it("dist cache") {
      withCache("distCache", 4) { cache =>
        (1 to 10).foreach(i => cache.put(s"key$i", i))

        val task = new MapReduceTask[String, Int, String, Int](cache, true)
        task
          .mappedWith(new DoublingMapper)
          .reducedWith(new DoublingReducer)

        val result = task.execute(new SummerizeCollator)

        result should be (110)
      }
    }

Repl Cache。

    it("repl cache") {
      withCache("replCache", 4) { cache =>
        (1 to 10).foreach(i => cache.put(s"key$i", i))

        val task = new MapReduceTask[String, Int, String, Int](cache, true)
        task
          .mappedWith(new DoublingMapper)
          .reducedWith(new DoublingReducer)

        val result = task.execute(new SummerizeCollator)

        result should be (110)
      }
    }

こちらも動作します。

Invl Cache。さすがに、こちらはダメでした。

    it("invl cache") {
      withCache("invlCache", 4) { cache =>
        (1 to 10).foreach(i => cache.put(s"key$i", i))

        the [IllegalStateException] thrownBy {
          new MapReduceTask[String, Int, String, Int](cache, true)
        } should have message "ISPN000231: Cache mode should be DIST or REPL, rather than INVALIDATION_SYNC"
      }
    }

実行しようとすると、IllegalStateExceptionがスローされます。

Invl Cacheは対象外とはいえ、Local CacheとRepl Cacheで実行可能になっているのは嬉しいのではないでしょうか?

で、これがいつ入ったかというと、以下のISSUEらしいです。Infinispan 5.3からみたいですね。

[ISPN-2812] Allow Map-Reduce tasks run in local and replicated envs.
https://issues.jboss.org/browse/ISPN-2812

自分は、なんとなくマニュアルを読んでいて「あれ?Dist Cache以外にも動かせそうな気配が…?」と思った記憶があるのですが、どこを見てそう思ったのかまったく覚えておりません…。

Repl Cacheの場合、どのNodeがデータのPrimaryLocationになるんだっけ?Map Reduceってそれで動くんだっけ?と思いましたが、以下のコードを書いて試してみた結果…

    it("print key locate") {
      withCache("replCache", 4) { cache =>
        (1 to 10).foreach(i => cache.put(s"key$i", s"value$i"))

        val dm = cache.getAdvancedCache.getDistributionManager

        cache.keySet.asScala.toList.sorted.foreach { key =>
          println(s"Key[$key]: PrimaryLocation[${dm.getPrimaryLocation(key)}], locate:${dm.locate(key)}")
        }
      }
    }

このような形になりました。

Key[key1]: PrimaryLocation[xxxxx-63419], locate:[xxxxx-63419, xxxxx-40323, xxxxx-9323, xxxxx-25440]
Key[key10]: PrimaryLocation[xxxxx-25440], locate:[xxxxx-25440, xxxxx-40323, xxxxx-9323, xxxxx-63419]
Key[key2]: PrimaryLocation[xxxxx-63419], locate:[xxxxx-63419, xxxxx-40323, xxxxx-9323, xxxxx-25440]
Key[key3]: PrimaryLocation[xxxxx-63419], locate:[xxxxx-63419, xxxxx-40323, xxxxx-9323, xxxxx-25440]
Key[key4]: PrimaryLocation[xxxxx-25440], locate:[xxxxx-25440, xxxxx-40323, xxxxx-9323, xxxxx-63419]
Key[key5]: PrimaryLocation[xxxxx-40323], locate:[xxxxx-40323, xxxxx-9323, xxxxx-63419, xxxxx-25440]
Key[key6]: PrimaryLocation[xxxxx-40323], locate:[xxxxx-40323, xxxxx-9323, xxxxx-63419, xxxxx-25440]
Key[key7]: PrimaryLocation[xxxxx-9323], locate:[xxxxx-9323, xxxxx-40323, xxxxx-63419, xxxxx-25440]
Key[key8]: PrimaryLocation[xxxxx-9323], locate:[xxxxx-9323, xxxxx-40323, xxxxx-63419, xxxxx-25440]
Key[key9]: PrimaryLocation[xxxxx-40323], locate:[xxxxx-40323, xxxxx-9323, xxxxx-63419, xxxxx-25440]

Repl Cacheでも、PrimaryLocationがあるみたいですね。

なお、このData Localityを見ているのはMapReduceManagerImplというクラスですが、この際はClusteringDependentLogicというインターフェースの実装を使って頑張っているようです。

タイムアウトの設定について

Infinispan 5.3から、タイムアウトの設定ができるようになったみたいです。

val task = new MapReduceTask[String, Int, String, Int](cache, true)

// Mapper、Reducerの設定

task.timeout(3, TimeUnit.SECONDS)

こんな感じで、TimeUnitと一緒に指定します。

確認してみるために、このようなMapper/Reducerを用意。
src/main/scala/org/littlewings/infinispan/mapreduce/SlowMapperReducer.scala

package org.littlewings.infinispan.mapreduce

import scala.collection.JavaConverters._

import org.infinispan.distexec.mapreduce.{Collector, Mapper, Reducer}

import java.util.concurrent.TimeUnit

@SerialVersionUID(1L)
class SlowMapper(waitTime: Long, timeUnit: TimeUnit) extends Mapper[String, Int, String, Int] {
  override def map(key: String, value: Int, collator: Collector[String, Int]): Unit = {
    timeUnit.sleep(waitTime)
    collator.emit(key, value)
  }
}

@SerialVersionUID(1L)
class SlowReducer(waitTime: Long, timeUnit: TimeUnit) extends Reducer[String, Int] {
  override def reduce(key: String, iter: java.util.Iterator[Int]): Int = {
    timeUnit.sleep(waitTime)
    iter.asScala.sum
  }
}

map/reduceの呼び出しのたびに、指定の単位でスリープするMapper/Reducerの実装です。

では、これをテスト。

    it("timeout test") {
      withCache("distCache", 4) { cache =>
        (1 to 10).foreach(i => cache.put(s"key$i", i))

        val task = new MapReduceTask[String, Int, String, Int](cache, true)

        task
          .mappedWith(new SlowMapper(1, TimeUnit.SECONDS))
          .reducedWith(new SlowReducer(1, TimeUnit.SECONDS))
          .timeout(3, TimeUnit.SECONDS)

        the [CacheException] thrownBy {
          task.execute
        } getCause() should be (a [ExecutionException])
      }
    }

並列実行されるので、投入するデータの数とタイムアウトの設定を合わせるのが若干難しいですが、うちの環境だとこれくらいなら実行に3秒以上かかるのでタイムアウトします。

タイムアウトは、CacheExceptionがスローされ、その中にタイムアウトした旨の情報が含まれる形で報告されます。

ところで、このタイムアウトのデフォルト値はいくらなんでしょう?

    it("default timeout") {
      withCache("distCache", 4) { cache =>
        (1 to 10).foreach(i => cache.put(s"key$i", i))

        val task = new MapReduceTask[String, Int, String, Int](cache, true)

        task.timeout(TimeUnit.MILLISECONDS) should be (15000)
      }
    }

15,000ミリ秒、15秒ですね。

これってどこから出てきたのかというと…設定でいう

  <namedCache name="distCache">
    <clustering mode="dist">
      <sync replTimeout="15000" />
    </clustering>
  </namedCache>

のreplTimeoutのデフォルト値になります。このデフォルト値が15秒であり、内部的にこの値を使用しているからみたいですね。

久々に使いましたが、いろいろ確認になりました。

今回作成した全コードは、こちらに置いてあります。
https://github.com/kazuhira-r/infinispan-examples/tree/master/infinispan-mapreduce-clustering-type