CLOVER🍀

That was when it all began.

Hazelcast MapReduce/lucene-kuromojiでテキストマイニング入門 〜形態素解析からワードカウントまで〜

先のエントリで、Hazelcast 3.2で追加された、MapReduce Frameworkの簡単な紹介をしました。

Hazelcast 3.2の新機能、MapReduce Framework
http://d.hatena.ne.jp/Kazuhira/20140329/1396079530

ここでは、もうちょっと具体的なサンプルとして、Word Countをやりたいと思います。

で、せっかくなので以前、ClojureLucene-Kuromoji/Incanter/clj-soupでやった以下のエントリを書き直して使ってみようと思います。

Re: Clojure/lucene-kuromojiでテキストマイニング入門 〜形態素解析からワードカウントまで〜
http://d.hatena.ne.jp/Kazuhira/20140309/1394366284

今度は、ScalaLucene-Kuromoji/Dispatch/jsoupでやります。Scalaだと、Incanterの代わりはちょっといいのが見つからなかったので端折ります。

題材は、前回同様「坊ちゃん」を対象にして形態素解析を行い、上位N件の単語を抽出します。

つまり…

  • Mapper … 文章の形態素解析を行い、単語を抽出
  • Combiner/Reducer … 形態素解析された単語を、カウント
  • Collator … Reducerで集計された単語とカウントから、上位N件に結果を絞り込み

というサンプルを作成します。

対象のドキュメントは、こちら。

坊っちゃん
http://www.aozora.gr.jp/cards/000148/files/752_14964.html

では、やっていきましょう。

準備

まずは、ビルドの準備。
build.sbt

name := "hazelcast-mapreduce-wordcount"

version := "0.0.1-SNAPSHOT"

scalaVersion := "2.10.4"

organization := "org.littlewings"

scalacOptions ++= Seq("-Xlint", "-deprecation", "-unchecked")

parallelExecution in Test := false

libraryDependencies ++= Seq(
  "org.apache.lucene" % "lucene-analyzers-kuromoji" % "4.7.0",
  "com.hazelcast" % "hazelcast" % "3.2",
  "net.databinder.dispatch" %% "dispatch-core" % "0.11.0" % "test",
  "org.jsoup" % "jsoup" % "1.7.3" % "test",
  "org.scalatest" %% "scalatest" % "2.0" % "test"
)

テストコードは、ScalaTestで行います。「坊ちゃん」のドキュメントをダウンロードするところは、Dispatchとjsoupを追加。どちらもテストスコープですね。

設定ファイルは、こちら。
src/test/resources/hazelcast.xml

<?xml version="1.0" encoding="UTF-8"?>
<hazelcast xsi:schemaLocation="http://www.hazelcast.com/schema/config hazelcast-config-3.2.xsd"
           xmlns="http://www.hazelcast.com/schema/config"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
  <group>
    <name>my-cluster</name>
    <password>my-cluster-password</password>
  </group>

  <network>
    <port auto-increment="true" port-count="100">5701</port>

    <join>
      <multicast enabled="true">
        <multicast-group>224.2.2.3</multicast-group>
        <multicast-port>54327</multicast-port>
      </multicast>
      <tcp-ip enabled="false" />
    </join>
  </network>

  <jobtracker name="default">
    <max-thread-size>0</max-thread-size>
    <!-- Queue size 0 means number of partitions * 2 -->
    <queue-size>0</queue-size>
    <retry-count>0</retry-count>
    <chunk-size>1000</chunk-size>
    <communicate-stats>true</communicate-stats>
    <topology-changed-strategy>CANCEL_RUNNING_OPERATION</topology-changed-strategy>
  </jobtracker>
</hazelcast>

Mapper

では、まずは形態素解析を行うMapperを作成します。

値の中に、形態素解析すべき文章が入っていると仮定して、読んでください。

src/main/scala/org/littlewings/hazelcast/mapreduce/TokenizeMapper.scala

package org.littlewings.hazelcast.mapreduce

import scala.util.matching.Regex

import com.hazelcast.mapreduce.{Context, Mapper}
import org.apache.lucene.analysis.ja.JapaneseAnalyzer
import org.apache.lucene.analysis.ja.tokenattributes.PartOfSpeechAttribute
import org.apache.lucene.analysis.tokenattributes.CharTermAttribute
import org.apache.lucene.util.Version

object TokenizeMapper {
  private val NOMINAL_REGEX: Regex = "名詞".r

  def isNominal(source: String): Boolean =
    NOMINAL_REGEX.findFirstIn(source).isDefined
}

@SerialVersionUID(1L)
class TokenizeMapper extends Mapper[String, String, String, Long] {
  override def map(key: String, value: String, context: Context[String, Long]): Unit = {
    val analyzer = new JapaneseAnalyzer(Version.LUCENE_47)
    val tokenStream = analyzer.tokenStream("", value)
    val charTermAttr = tokenStream.addAttribute(classOf[CharTermAttribute])
    val partOfSpeechAttr = tokenStream.addAttribute(classOf[PartOfSpeechAttribute])

    tokenStream.reset()

    try {
      Iterator
        .continually(tokenStream.incrementToken())
        .takeWhile(_ == true)
        .withFilter(_ => TokenizeMapper.isNominal(partOfSpeechAttr.getPartOfSpeech))
        .map(_ => charTermAttr.toString)
        .foreach(token => context.emit(token, 1L))
    } finally {
      tokenStream.end()
    }
  }
}

値を形態素解析して、そのうち名詞のみに絞り込みます。名詞かどうかの判定は、Kuromojiから取得できるPartOfSpeechAttributeを使用しています。

ここで、分割した単語をそれぞれキーとして、値1でContext#emitしていきます。

CombinerFactory/Combiner

続いて、CombinerFactoryとCombinerです。ここでは、Mapperで形態素解析した単語と出現頻度(この時点では1)を加算していきます。

src/main/scala/org/littlewings/hazelcast/mapreduce/WordCountCombinerFactory.scala

package org.littlewings.hazelcast.mapreduce

import com.hazelcast.mapreduce.{Combiner, CombinerFactory}

@SerialVersionUID(1L)
class WordCountCombinerFactory extends CombinerFactory[String, Long, Long] {
  override def newCombiner(key: String): Combiner[String, Long, Long] =
    new WordCountCombiner
}

class WordCountCombiner extends Combiner[String, Long, Long] {
  private[this] var count: Long = _

  override def combine(key: String, value: Long): Unit =
    count += value

  override def finalizeChunk: Long = {
    val c = count
    count = 0L
    c
  }
}

Combinerはキーごとに作成されるので、他のキーのことは気にしなくてよいですね。ただ、finalizeChunkが複数回呼び出される可能性があることは、考慮する必要があります。

ReducerFactory/Reducer

MapReduceの最後のフェーズのReducer。とはいえ、Combinerとほぼやることは同じです。

src/main/scala/org/littlewings/hazelcast/mapreduce/WordCountReducerFactory.scala

package org.littlewings.hazelcast.mapreduce

import com.hazelcast.mapreduce.{Reducer, ReducerFactory}

@SerialVersionUID(1L)
class WordCountReducerFactory extends ReducerFactory[String, Long, Long] {
  override def newReducer(key: String): Reducer[String, Long, Long] =
    new WordCountReducer
}

class WordCountReducer extends Reducer[String, Long, Long] {
  private[this] var count: Long = _

  override def reduce(value: Long): Unit =
    count += value

  override def finalizeReduce: Long =
    count
}

ただし、finalizeReduceが複数回呼び出されることは考慮しなくてOKです。

ところで、今回はCombinerが入ってもReducerに渡る型は変わらないため、ReducerはCombinerの結果型を意識したコードになっていませんが、CombinerがMapperで設定する値の型を変換する場合は、合わせてReducerも変える必要があります。

先のエントリで、動作しているスレッドを確認するサンプルが、そのような例になっていました。

Collator

MapReduceの終了後に、Reducerが返した結果をさらにまとめるための、Collatorです。

今回は、Reducerで単語ごとに出現頻度が格納されたMapが結果となっているので、これを出現頻度でソートして、上位N件を返すCollatorを定義します。
src/main/scala/org/littlewings/hazelcast/mapreduce/WordCountCollator.scala

package org.littlewings.hazelcast.mapreduce

import scala.collection.JavaConverters._
import scala.collection._
import scala.collection.immutable.TreeMap

import com.hazelcast.mapreduce.Collator

trait WordCountCollator extends Collator[java.util.Map.Entry[String, Long], Seq[(String, Long)]] {
  protected val topN: Int

  override def collate(values: java.lang.Iterable[java.util.Map.Entry[String, Long]]): Seq[(String, Long)] =
    values
      .asScala
      .foldLeft(mutable.ArrayBuffer.empty[(String, Long)]) { (acc, entry) =>
        acc += (entry.getKey -> entry.getValue)
        acc
      }
      .sortWith((a, b) => a._2 > b._2)
      .take(topN)
}

class WordCountTop10Collator extends WordCountCollator {
  val topN: Int = 10
}

class WordCountTop100Collator extends WordCountCollator {
  val topN: Int = 100
}

上位10件と100件を返すことができるCollatorを定義しましたが、今回は上位10件のみ使うことにします。

データを取得してきて、動かそう

最後に、動作確認を行うためのテストコードを書きます。

テストコードの骨格は、こんな感じです。
src/test/scala/org/littlewings/hazelcast/mapreduce/WordCountMapReduceSpec.scala

package org.littlewings.hazelcast.mapreduce

import scala.collection.JavaConverters._

import java.io.InputStream

import com.hazelcast.config.ClasspathXmlConfig
import com.hazelcast.core.{Hazelcast, HazelcastInstance}
import com.hazelcast.mapreduce.{JobTracker, KeyValueSource}

import dispatch._
import dispatch.Defaults._
import org.jsoup.Jsoup

import org.scalatest.FunSpec
import org.scalatest.Matchers._

class WordCountMapReduceSpec extends FunSpec {
  describe("word count map-reduce spec") {
    // ここに、テストコードを書く!!
  }

  def withHazelcast(instanceNumber: Int)(fun: HazelcastInstance => Unit): Unit = {
    val instances = 
      (1 to instanceNumber)
        .map(i => Hazelcast.newHazelcastInstance(new ClasspathXmlConfig("hazelcast.xml")))

    try {
      try {
        fun(instances.head)
      } finally {
        instances.foreach(_.getLifecycleService.shutdown())
      }
    } finally {
      Hazelcast.shutdownAll()
    }
  }
}

やっぱり、複数のHazelcastInstanceが欲しくなるので、HazelcastInstanceを作成する数を渡せる、withHazelcastメソッドを作成しました。

最初に、HazelcastInstanceを2つ作成し、そのうちひとつでグリッドに対する操作を行います。

    it("word count top10") {
      withHazelcast(2) { hazelcast =>
        val list = hazelcast.getList[String]("default")

        // 以降、処理が続く
      }
    }

今回は、データ構造はListとしました。

Dispatchとjsoupを使って、「坊ちゃん」の文章を取得し、Documentを作成します。

        val http = Http
        val document =
          try {
            import com.ning.http.client
            object ByteStream extends (client.Response => InputStream) {
              def apply(r: client.Response): InputStream =
                r.getResponseBodyAsStream
            }

            val request = url("http://www.aozora.gr.jp/cards/000148/files/752_14964.html")
            val is = http(request OK ByteStream).apply()

            try {
              Jsoup.parse(is,
                          "Windows-31J",
                          "")
            } finally {
              is.close()
            }
          } finally {
            http.shutdown()
          }

取得した文章を、とりあえずテキスト要素ごとにListに放り込みます。

        document
          .select("div.main_text")
          .asScala
          .map(_.text)
          .foreach(_.split("。").foreach(list.add))

このListとHazelcastInstanceを元に、Jobを作成します。

        val jobTracker = hazelcast.getJobTracker("default")
        val source = KeyValueSource.fromList(list)
        val job = jobTracker.newJob(source)

Mapper、Combiner、Reducer、Collatorを設定して、MapReduceを実行します。

        val future =
          job
            .mapper(new TokenizeMapper)
            .combiner(new WordCountCombinerFactory)
            .reducer(new WordCountReducerFactory)
            .submit(new WordCountTop10Collator)

Collatorは、上位10件のものですね。

結果の取得。

        val result = future.get

では、結果を確認してみましょう。

        result should have size (10)
        result should contain theSameElementsInOrderAs Seq(
          ("おれ", 472),
          ("事", 291),
          ("人", 213),
          ("君", 184),
          ("赤" ,178),
          ("一", 176),
          ("シャツ",170),
          ("山嵐", 155),
          ("何", 144),
          ("二", 121)
        )

Mapのサイズが10となり、単語も上位10件とその出現回数が取得できています。一応、元ネタとなったClojureLucene-Kuromojiでの形態素解析と同じ結果になっています。

まあ、違ったら困りますが…。

こんな感じで、MapReduceを使って分散Word Countをやってみました。APIの敷居はそんなに高くないので、けっこう簡単に使えると思います。

興味がある方は、ぜひ。

今回作成したソースコードは、以下にアップしています。

https://github.com/kazuhira-r/hazelcast-examples/tree/master/hazelcast-mapreduce-wordcount

あと、テストコード全体も、一応載せておきます。
src/test/scala/org/littlewings/hazelcast/mapreduce/WordCountMapReduceSpec.scala

package org.littlewings.hazelcast.mapreduce

import scala.collection.JavaConverters._

import java.io.InputStream

import com.hazelcast.config.ClasspathXmlConfig
import com.hazelcast.core.{Hazelcast, HazelcastInstance}
import com.hazelcast.mapreduce.{JobTracker, KeyValueSource}

import dispatch._
import dispatch.Defaults._
import org.jsoup.Jsoup

import org.scalatest.FunSpec
import org.scalatest.Matchers._

class WordCountMapReduceSpec extends FunSpec {
  describe("word count map-reduce spec") {
    it("word count top10") {
      withHazelcast(2) { hazelcast =>
        val list = hazelcast.getList[String]("default")

        val http = Http
        val document =
          try {
            import com.ning.http.client
            object ByteStream extends (client.Response => InputStream) {
              def apply(r: client.Response): InputStream =
                r.getResponseBodyAsStream
            }

            val request = url("http://www.aozora.gr.jp/cards/000148/files/752_14964.html")
            val is = http(request OK ByteStream).apply()

            try {
              Jsoup.parse(is,
                          "Windows-31J",
                          "")
            } finally {
              is.close()
            }
          } finally {
            http.shutdown()
          }

        document
          .select("div.main_text")
          .asScala
          .map(_.text)
          .foreach(_.split("。").foreach(list.add))

        val jobTracker = hazelcast.getJobTracker("default")
        val source = KeyValueSource.fromList(list)
        val job = jobTracker.newJob(source)

        val future =
          job
            .mapper(new TokenizeMapper)
            .combiner(new WordCountCombinerFactory)
            .reducer(new WordCountReducerFactory)
            .submit(new WordCountTop10Collator)

        val result = future.get

        result should have size (10)
        result should contain theSameElementsInOrderAs Seq(
          ("おれ", 472),
          ("事", 291),
          ("人", 213),
          ("君", 184),
          ("赤" ,178),
          ("一", 176),
          ("シャツ",170),
          ("山嵐", 155),
          ("何", 144),
          ("二", 121)
        )
      }
    }
  }

  def withHazelcast(instanceNumber: Int)(fun: HazelcastInstance => Unit): Unit = {
    val instances = 
      (1 to instanceNumber)
        .map(i => Hazelcast.newHazelcastInstance(new ClasspathXmlConfig("hazelcast.xml")))

    try {
      try {
        fun(instances.head)
      } finally {
        instances.foreach(_.getLifecycleService.shutdown())
      }
    } finally {
      Hazelcast.shutdownAll()
    }
  }
}