CLOVER🍀

That was when it all began.

Hazelcast 3.2の新機能、MapReduce Framework

先日、Hazelcast 3.2がリリースされました。ここで追加された新機能の中に、MapReduce Frameworkがあります。

Hazelcast Announces In-Memory MapReduce API
http://www.hazelcast.org/hazelcast-announces-in-memory-mapreduce-api/

HazelcastがMapReduce APIをサポート
http://www.infoq.com/jp/news/2014/03/hazelcast-mapreduce-api

このMapReduce API、リリースされる前にドキュメントを自分でビルドして読んでたり、RCの頃に使って開発者の方と会話させてもらったりバグ報告したりと個人的にはちょっとしたイベントになりました。

なお、上記のInfoQでインタビューを受けているChristoph Engelbertという方が作成した「CastMapR」というのがHazelcast MapReduceの前身らしいのですが、こちらはInfinispanのMapReduceに影響を受けて作られたものだそうです。

CastMapR
https://github.com/noctarius/castmapr

で、この方が以前から自分がHazelcastについてたまに会話していた方なんですけど、このような立場の方とはつゆ知らず、ええ、驚きました、驚きましたとも。

MapReduce Frameworkの簡単な紹介

では、オフィシャルサイトのドキュメントに沿って、簡単にHazelcast MapReduceの紹介を。

MapReduce Framework自体は、割と有名なデータを読んで、Mapでキーと値のペアを作って、Redeceで畳み込むアレですね。Hazelcastの場合は、インメモリ処理となり、リアルタイム性に長けているので、そのような要求があるところで使われるんじゃないでしょうか?

Hazelcast MapReduce
http://www.hazelcast.org/docs/latest/manual/html-single/hazelcast-documentation.html#hazelcast-mapreduce

Javadoc
http://www.hazelcast.org/docs/latest/javadoc/com/hazelcast/mapreduce/package-summary.html

CastMapRはInfinispanのMapReduceに影響を受けたものだそうですが、Hazelcastに取り込まれたMapReduceは最終的にはHadoopのそれに近いものになりました。

Hazelcast MapReduceの実行フェーズは、

  • Mapping
  • Combine
  • Reducing

で構成されます。仮想的にGrouping/Shuffling Phase、Producing the Final Resultというフェーズもあることになっていますが、実質この3つです。

Reducing後に、さらに集約処理を行うことはできます。

では、MapReduceで主に使用するAPIの紹介にいきましょう。

ここの説明で使用するコードは、MapReduce Frameworkの書き方や挙動を確認するためのものになっています。みんな大好きWord Countのサンプルは、別途エントリを書きました。

Hazelcast MapReduce/lucene-kuromojiでテキストマイニング入門 〜形態素解析からワードカウントまで〜
http://d.hatena.ne.jp/Kazuhira/20140329/1396079599

データ構造とKeyValueSource

Hazelcastには、分散Map、Listなど複数のデータ構造を扱うことができますが、MapReduceの対象にできるのは

  • Map
  • MultiMap
  • List
  • Set

となります。これらを入力としてMapReduceを行うことができます。

ただ、直接MapやListなどを渡すのではなく、KeyValueSourceというクラスを経由します。

KeyValueSource
http://www.hazelcast.org/docs/latest/manual/html-single/hazelcast-documentation.html#keyvaluesource

KeyValueSource.fromの各種メソッドにMapなどのデータ構造を渡すことで、KeyValueSourceのインスタンスを取得することができます。

        // Mapの場合
        val map = hazelcast.getMap[String, String]("simple-map")
        // ...
        val source = KeyValueSource.fromMap(map)

        // Listの場合
        val list = hazelcast.getList[String]("simple-list")
        // ...
        val source = KeyValueSource.fromList(list)

ここで取得したKeyValueSourceは、後述するJobの入力となります。

ちなみに、ListやSetってキーがないじゃん?という疑問があるかと思いますが、これらの場合はMappingフェーズでキーの値にはデータ構造の名前(HazelcastInstance#getListした時の名前)がキーとなります。

Listの要素が100件あっても、全部同じキーが渡ってくるということです。

JobTracker

JobTrackerは、MapReduceの起点となるJobを作成するためのAPIになります。

JobTracker
http://www.hazelcast.org/docs/latest/manual/html-single/hazelcast-documentation.html#jobtracker

JobTracker自体は、HazelcastInstanceから取得します。

        val jobTracker = hazelcast.getJobTracker("default")

注意事項としては、この後Jobを作成する時は、同じHazelcastInstanceから取得したデータ構造(Map、Listなど)を使うこと、となっています。

また、MapReduce APIの設定/チューニングポイントは、JobTrackerとなります。

  <jobtracker name="default">
    <max-thread-size>0</max-thread-size>
    <!-- Queue size 0 means number of partitions * 2 -->
    <queue-size>0</queue-size>
    <retry-count>0</retry-count>
    <chunk-size>1000</chunk-size>
    <communicate-stats>true</communicate-stats>
    <topology-changed-strategy>CANCEL_RUNNING_OPERATION</topology-changed-strategy>
  </jobtracker>

設定内容は、こちらを参照。

JobTracker Configuration
http://www.hazelcast.org/docs/latest/manual/html-single/hazelcast-documentation.html#jobtracker-configuration

ジョブのキューや、Reducerに送信する際のチャンクサイズ、スレッド数の設定があるのでチューニングポイントですね。キューのサイズは、Job単位にでも設定することが可能だそうです。

Job

Jobは、実際にMapReduceを行う際に、MapperやReducerを登録したり、処理を開始するためのフロントエンドになります。MapReduceのタスクごとに1度だけ設定を行いますが、実行自体は複数回可能だそうです。

Job
http://www.hazelcast.org/docs/latest/manual/html-single/hazelcast-documentation.html#job

Jobの作成には、先ほど紹介したKeyValueSourceが必要になります。

        val jobTracker = hazelcast.getJobTracker("default")
        val job = jobTracker.newJob(source)

        val future: ICompletableFuture[java.util.Map[String, Int]] =
          job
            .mapper(new SimpleAllKeysMapper)
            .combiner(new SimpleAllKeysCombinerFactory)
            .reducer(new SimpleAllKeysReducerFactory)
            .submit

        val result: java.util.Map[String, Int] = future.get

MapperやCombiner、Reducerを指定していますが、簡単にはこんな呼び出し方になります。submitを実行することでMapReduceジョブが開始されますが、Futureが戻ってくるので後で待ち合わせが必要です。

Jobに対してMapper、Combiner、Reducerを登録するのは、各メソッドの戻り値をいったん変数などで受けるのではなく、メソッドチェーンで設定することを強く推奨しています。ジェネリクスの影響を気にしているみたいです。

Futureにはコールバックリスナーも付与できるようですが、端折ります。

Mapper

Mapperには、Mappingフェーズで実行する処理を記述します。

Mapper
http://www.hazelcast.org/docs/latest/manual/html-single/hazelcast-documentation.html#mapper

Mapperを作成するには、Mapperインターフェースを実装してmapメソッドに処理を書きます。作成したデータ構造に入っていたキーと値が渡されてくるので、そこで好きな処理を記述して、MapReduceでのキーと値を定義します。

@SerialVersionUID(1L)
class SimpleAllKeysMapper extends Mapper[String, String, String, Int] {
  override def map(key: String, value: String, context: Context[String, Int]): Unit =
    context.emit(key, 1)
}

この時、Contextが一緒に渡ってくるので、キーと値を決めたらContext#emitします。ここでは、キーに対して常に1を渡しています。

Mapper自体は、HazelcastInstance(というかNode)で並列に実行されますが、HazelcastInstanceではシングルスレッドで動作するようです。ちなみに、MapperはSerializableです。

この他、LifecycleMapper/LifecycleMapperAdapterというものもあるらしいですが、試していません。

Combiner/CombinerFactory

Combineフェーズはオプションですが、使用することでMappingフェーズからReducingフェーズに移る際の転送量を削減できる可能性があります。Combinerを使用することは、強く推奨されているそうです。

Combiner / CombinerFactory
http://www.hazelcast.org/docs/latest/manual/html-single/hazelcast-documentation.html#combiner-combinerfactory

Combinerを使用するためには、Combinerを作成するCombinerFactoryインターフェースを実装したクラスと、Combinerクラスを継承したクラスを作成する必要があります。

@SerialVersionUID(1L)
class SimpleAllKeysCombinerFactory extends CombinerFactory[String, Int, Int] {
  override def newCombiner(key: String): Combiner[String, Int, Int] =
    new SimpleAllKeysCombiner
}

class SimpleAllKeysCombiner extends Combiner[String, Int, Int] {
  private[this] var sum: Int = _

  override def combine(key: String, value: Int): Unit =
    sum += value

  override def finalizeChunk: Int = {
    val s = sum
    sum = 0
    s
  }
}

CombinerFactory#newCombinerを実装し、キーを受け取ってそれに対応したCombinerを作成します。Combinerは、キーと値が渡ってくるcombineメソッドをオーバーライドしますが、実際の中間集計結果を返却するのはfinalizeChunkメソッドになります。この後、Combinerはチャンクサイズによって引き続き使用ことがあるので、finalizeChunkメソッド実行時に初期化する必要があります。

ですので、Combinerはステートフルになります。

動作は、各HazelcastInstanceごとに並列で、Mapperと同じスレッドで動作します。

ここでSerializableなのは、CombinerFactoryになります。Jobに登録するのも、CombinerFactoryです。

Reducer/ReducerFactory

Reducingフェーズが、MapReduce最後のフェーズになり、ここでキーごとに値を合計したりなどの、演算処理を行います。

Reducer / ReducerFactory
http://www.hazelcast.org/docs/latest/manual/html-single/hazelcast-documentation.html#reducer-reducerfactory

Reducerを使用するには、ReducerFactoryインターフェースを実装したクラスと、Reducerクラスを継承したクラスを作成する必要があります。インスタンス生成のイメージは、Combiner/CombinerFactoryと同じですね。

@SerialVersionUID(1L)
class SimpleAllKeysReducerFactory extends ReducerFactory[String, Int, Int] {
  override def newReducer(key: String): Reducer[String, Int, Int] =
    new SimpleAllKeysReducer
}

class SimpleAllKeysReducer extends Reducer[String, Int, Int] {
  private[this] var sum: Int = _

  override def reduce(value: Int): Unit =
    sum += value

  override def finalizeReduce: Int =
    sum
}

Reducer#reduceの引数が単一の値であること、finalizeReduceメソッドで結果を返却する必要があることから想像できるかもしれませんが、Reducer#reduceメソッドは複数回呼び出され、最後にfinalizeReduceメソッドで結果を返すような、Combiner同様ステートフルな実装になります。

Combinerと違って、使いまわしは気にしなくてもよさそうですが。

実行スレッドですが、MapperやCombinerとは違いちょっと安定しない結果になりました。クラスタの状態によって、使用されるスレッド数がけっこう変わるみたいです。

Serializableである必要があるのは、CombinerFactoryですね。

Collator

Job#submit時にオプションとして渡し、各Reducerの結果をさらに集約することができます。

Collator
http://www.hazelcast.org/docs/latest/manual/html-single/hazelcast-documentation.html#collator

Collatorのインターフェースは、こんな感じです。

public interface Collator<ValueIn,ValueOut> {
    ValueOut collate(Iterable<ValueIn> values);
}

今回は、実装は端折ります。

KeyPredicate

今回は紹介しませんが、Mappingフェーズで処理対象を絞るためのKeyPredicateというインターフェースもあるようです。

KeyPredicate
http://www.hazelcast.org/docs/latest/manual/html-single/hazelcast-documentation.html#keypredicate

こちらも、やはりJobに登録して使用します。

動作確認のための、簡単なサンプルを

よくあるWord Countのサンプルは、別途エントリを書きました。

Hazelcast MapReduce/lucene-kuromojiでテキストマイニング入門 〜形態素解析からワードカウントまで〜
http://d.hatena.ne.jp/Kazuhira/20140329/1396079599

ここでは、単純にすべてのキーの出現回数をカウントするMapReduceタスク(MapとListの違いを見るためのものです)と、実行時のスレッドに関するMapReduceタスクを書いてみます。

まずは、依存関係の定義。
build.sbt

name := "hazelcast-mapreduce-trial"

version := "0.0.1-SNAPSHOT"

scalaVersion := "2.10.4"

organization := "org.littlewings"

scalacOptions ++= Seq("-Xlint", "-deprecation", "-unchecked")

// javaOptions in Test += "-javaagent:/usr/local/byteman/current/lib/byteman.jar=script:rule.btm"

// fork in Test := true

// connectInput := true

parallelExecution in Test := false

libraryDependencies ++= Seq(
  "com.hazelcast" % "hazelcast" % "3.2",
  "org.scalatest" %% "scalatest" % "2.0" % "test"
)

テストで使う、設定ファイル。
src/test/resources/hazelcast.xml

<?xml version="1.0" encoding="UTF-8"?>
<hazelcast xsi:schemaLocation="http://www.hazelcast.com/schema/config hazelcast-config-3.2.xsd"
           xmlns="http://www.hazelcast.com/schema/config"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
  <group>
    <name>my-cluster</name>
    <password>my-cluster-password</password>
  </group>

  <network>
    <port auto-increment="true" port-count="100">5701</port>

    <join>
      <multicast enabled="true">
        <multicast-group>224.2.2.3</multicast-group>
        <multicast-port>54327</multicast-port>
      </multicast>
      <tcp-ip enabled="false" />
    </join>
  </network>

  <jobtracker name="default">
    <max-thread-size>0</max-thread-size>
    <!-- Queue size 0 means number of partitions * 2 -->
    <queue-size>0</queue-size>
    <retry-count>0</retry-count>
    <chunk-size>1000</chunk-size>
    <communicate-stats>true</communicate-stats>
    <topology-changed-strategy>CANCEL_RUNNING_OPERATION</topology-changed-strategy>
  </jobtracker>
</hazelcast>

最初は、単純にすべてのキーの出現回数をカウントするMapReduceタスクです。上記の説明中で登場していたのは、これですね。
src/main/scala/org/littlewings/hazelcast/mapreduce/SimpleAllKeysMapReduce.scala

package org.littlewings.hazelcast.mapreduce

import com.hazelcast.mapreduce.{Combiner, CombinerFactory, Context, Mapper, Reducer, ReducerFactory}

@SerialVersionUID(1L)
class SimpleAllKeysMapper extends Mapper[String, String, String, Int] {
  override def map(key: String, value: String, context: Context[String, Int]): Unit =
    context.emit(key, 1)
}

@SerialVersionUID(1L)
class SimpleAllKeysCombinerFactory extends CombinerFactory[String, Int, Int] {
  override def newCombiner(key: String): Combiner[String, Int, Int] =
    new SimpleAllKeysCombiner
}

class SimpleAllKeysCombiner extends Combiner[String, Int, Int] {
  private[this] var sum: Int = _

  override def combine(key: String, value: Int): Unit =
    sum += value

  override def finalizeChunk: Int = {
    val s = sum
    sum = 0
    s
  }
}

@SerialVersionUID(1L)
class SimpleAllKeysReducerFactory extends ReducerFactory[String, Int, Int] {
  override def newReducer(key: String): Reducer[String, Int, Int] =
    new SimpleAllKeysReducer
}

class SimpleAllKeysReducer extends Reducer[String, Int, Int] {
  private[this] var sum: Int = _

  override def reduce(value: Int): Unit =
    sum += value

  override def finalizeReduce: Int =
    sum
}

テストのためには、複数のHazelcastInstanceが欲しくなるので、このようなトレイトを用意しました。
src/test/scala/org/littlewings/hazelcast/mapreduce/HazelcastSpecSupport.scala

package org.littlewings.hazelcast.mapreduce

import com.hazelcast.config.ClasspathXmlConfig
import com.hazelcast.core.{Hazelcast, HazelcastInstance}

import org.scalatest.Suite

trait HazelcastSpecSupport {
  this: Suite =>

  protected def withHazelcast(n: Int)(fun: HazelcastInstance => Unit): Unit = {
    val instances =
      (1 to n).map(i => Hazelcast.newHazelcastInstance(new ClasspathXmlConfig("hazelcast.xml")))

    try {
      fun(instances.head)
    } finally {
      instances.foreach(_.getLifecycleService.shutdown())
    }
  }
}

で、テストコードは以下の様に書きます。
src/test/scala/org/littlewings/hazelcast/mapreduce/SimpleAllKeysMapReduceSpec.scala

package org.littlewings.hazelcast.mapreduce

import com.hazelcast.core.ICompletableFuture
import com.hazelcast.mapreduce.{JobTracker, KeyValueSource}

import org.scalatest.{FunSpec, Entry}
import org.scalatest.Matchers._

class SimpleAllKeysMapReduceSpec extends FunSpec
                                 with HazelcastSpecSupport {
  describe("simple all mapreduce") {
    // ここに、テストを書く!
  }
}

まずは、Mapを使った例。

    it("test map") {
      withHazelcast(2) { hazelcast =>
        val map = hazelcast.getMap[String, String]("simple-map")

        (1 to 100).foreach(i => map.put(s"key$i", s"value$i"))

        val source = KeyValueSource.fromMap(map)

        val jobTracker = hazelcast.getJobTracker("default")
        val job = jobTracker.newJob(source)

        val future: ICompletableFuture[java.util.Map[String, Int]] =
          job
            .mapper(new SimpleAllKeysMapper)
            .combiner(new SimpleAllKeysCombinerFactory)
            .reducer(new SimpleAllKeysReducerFactory)
            .submit

        val result: java.util.Map[String, Int] = future.get

        result.get("key1") should be (1)
        result should contain (Entry("key1", 1))
        result should contain (Entry("key2", 1))
        result should have size 100
      }
    }

当然ですが、元のMapの写像になります。

続いて、Listで実行。

    it("test list") {
      withHazelcast(2) { hazelcast =>
        val list = hazelcast.getList[String]("simple-list")

        (1 to 100).foreach(i => list.add(s"entry$i"))

        val source = KeyValueSource.fromList(list)

        val jobTracker = hazelcast.getJobTracker("default")
        val job = jobTracker.newJob(source)

        val future: ICompletableFuture[java.util.Map[String, Int]] =
          job
            .mapper(new SimpleAllKeysMapper)
            .combiner(new SimpleAllKeysCombinerFactory)
            .reducer(new SimpleAllKeysReducerFactory)
            .submit

        val result: java.util.Map[String, Int] = future.get

        result should contain (Entry("simple-list", 100))
        result should have size 1
      }
    }

Mapの時とは異なり、結果のMapのサイズが1になり、キーがすべて「simple-list」になりました。これは、元のListを「simple-list」という名前で作成したからですね。

続いて、スレッドの動作状態を観測するMapReduceタスク。
src/main/scala/org/littlewings/hazelcast/mapreduce/ThreadInspectMapReduce.scala

package org.littlewings.hazelcast.mapreduce

import com.hazelcast.mapreduce.{Combiner, CombinerFactory, Context, Mapper, Reducer, ReducerFactory}

@SerialVersionUID(1L)
class ThreadInspectMapper extends Mapper[String, String, String, String] {
  override def map(key: String, value: String, context: Context[String, String]): Unit =
    context.emit("threads", Thread.currentThread.getName)
}

@SerialVersionUID(1L)
class ThreadInspectCombinerFactory extends CombinerFactory[String, String, Map[String, Set[String]]] {
  override def newCombiner(key: String): Combiner[String, String, Map[String, Set[String]]] =
    new ThreadInspectCombiner
}

class ThreadInspectCombiner extends Combiner[String, String, Map[String, Set[String]]] {
  private[this] var names: Map[String, Set[String]] = Map.empty

  override def combine(key: String, value: String): Unit = {
    names += ("mapper" -> (names.get("mapper").getOrElse(Set.empty) + value))
    names += ("combiner" -> (names.get("combiner").getOrElse(Set.empty) + Thread.currentThread.getName))
  }

  override def finalizeChunk: Map[String, Set[String]] = {
    val m = names
    names = Map.empty
    m
  }
}

@SerialVersionUID(1L)
class ThreadInspectReducerFactory extends ReducerFactory[String, Map[String, Set[String]], Map[String, Set[String]]] {
  override def newReducer(key: String): Reducer[String, Map[String, Set[String]], Map[String, Set[String]]] =
    new ThreadInspectReducer
}

class ThreadInspectReducer extends Reducer[String, Map[String, Set[String]], Map[String, Set[String]]] {
  private[this] var names: Map[String, Set[String]] = Map.empty

  override def reduce(values: Map[String, Set[String]]): Unit = {
    values.foreach { case (k, v) =>
      names += (k -> (names.get(k).getOrElse(Set.empty) ++ v))
    }
    names += ("reducer" -> (names.get("reducer").getOrElse(Set.empty) + Thread.currentThread.getName))
  }

  override def finalizeReduce: Map[String, Set[String]] =
    names
}

各フェーズで検出したスレッド名を、Setに詰めていきます。

今回のコードでは、HazelcastInstanceの数が重要になるようなので、テストコードとしてはこのようなものを用意します。
src/test/scala/org/littlewings/hazelcast/mapreduce/ThreadInspectMapReduceSpec.scala

package org.littlewings.hazelcast.mapreduce

import scala.collection.JavaConverters._

import com.hazelcast.core.ICompletableFuture
import com.hazelcast.mapreduce.{JobTracker, KeyValueSource}

import org.scalatest.{FunSpec, Entry}
import org.scalatest.Matchers._

class ThreadInspectMapReduceSpec extends FunSpec
                                 with HazelcastSpecSupport {
  describe("thread inspect mapreduce") {
    it("test map") {
      val instanceNumber = 4

      withHazelcast(instanceNumber) { hazelcast =>
        // ここで、MapReduceを実行
      }
    }
  }
}

HazelcastInstance数は、4つとしました。

で、テストコード。

      withHazelcast(instanceNumber) { hazelcast =>
        val map = hazelcast.getMap[String, String]("simple-map")

        (1 to 100).foreach(i => map.put(s"key$i", s"value$i"))

        val source = KeyValueSource.fromMap(map)

        val jobTracker = hazelcast.getJobTracker("default")
        val job = jobTracker.newJob(source)

        val future: ICompletableFuture[java.util.Map[String, Map[String, Set[String]]]] =
          job
            .mapper(new ThreadInspectMapper)
            .combiner(new ThreadInspectCombinerFactory)
            .reducer(new ThreadInspectReducerFactory)
            .submit

        val result: java.util.Map[String, Map[String, Set[String]]] = future.get

        val threads = result.get("threads")

        threads.foreach { case (key, values) =>
          println(s"""|Phase[$key]:
                      |${values.mkString("  ", System.lineSeparator + "  ", "")}""".stripMargin)
        }

        threads("mapper") should have size (instanceNumber)
        threads("combiner") should have size (instanceNumber)
        threads("mapper") should be (threads("combiner"))
        threads("reducer").size should be >= 1
      }

MapperとCombinerのスレッド数は同じで、かつ同じスレッドが使用されます。

        threads("mapper") should have size (instanceNumber)
        threads("combiner") should have size (instanceNumber)
        threads("mapper") should be (threads("combiner"))

HazelcastInstanceの数とも同じですね。

Reducerのスレッド数は、割と不定

        threads("reducer").size should be >= 1

これだけだと結果がわかりにくいので、標準出力も使用。

        threads.foreach { case (key, values) =>
          println(s"""|Phase[$key]:
                      |${values.mkString("  ", System.lineSeparator + "  ", "")}""".stripMargin)
        }

結果の一例。

Phase[mapper]:
  hz._hzInstance_5_my-cluster.cached.thread-3
  hz._hzInstance_7_my-cluster.cached.thread-1
  hz._hzInstance_6_my-cluster.cached.thread-2
  hz._hzInstance_8_my-cluster.cached.thread-3
Phase[combiner]:
  hz._hzInstance_5_my-cluster.cached.thread-3
  hz._hzInstance_7_my-cluster.cached.thread-1
  hz._hzInstance_6_my-cluster.cached.thread-2
  hz._hzInstance_8_my-cluster.cached.thread-3
Phase[reducer]:
  hz._hzInstance_6_my-cluster.cached.thread-3
  hz._hzInstance_6_my-cluster.cached.thread-2
  hz._hzInstance_6_my-cluster.cached.thread-6
  hz._hzInstance_6_my-cluster.cached.thread-7

ここではReducerは4つのスレッドが表示されていますが、ひとつのことも普通にありました。

今回ご紹介したコードは、こちらにアップしています。
https://github.com/kazuhira-r/hazelcast-examples/tree/master/hazelcast-mapreduce-trial

Hazelcast MapReduce Frameworkの簡単な紹介でした、続いてWord Countのサンプルに移ります。