だいぶ今更ながらですが、Apache Sparkを試してみることにしました。前々から、興味がちょっとありまして。
Apache Spark
http://spark.apache.org/
Apache Spark の紹介(前半:Sparkのキホン)
http://www.slideshare.net/hadoopxnttdata/apache-spark-spark
Apache Sparkのご紹介 (後半:技術トピック)
http://www.slideshare.net/hadoopxnttdata/apache-spark
が、あんまりSpark自体まだよくわかっていないので、ちょっとずつ触りながら感覚をつかめたらいいなーと思っています。
とりあえず、くじけなければ継続テーマにするつもりです(笑)。
では、Hello World的に始めてみたいと思います。
Apache Sparkのダウンロード
まずはダウンロードページから、Apache Sparkをダウンロードしてきます。現時点でのApache Sparkのバージョンは、1.4.1でした。
Download Spark
http://spark.apache.org/downloads.html
なんかいろいろあってよくわからないのですが、今回は「Pre-built for Hadoop 2.6 and later」というのを選んでダウンロードしました。今のところ、Hadoopには触れるつもりはないのですが…それでもいいのかな…。
ダウンロードして、解凍。
$ tar -zxvf spark-1.4.1-bin-hadoop2.6.tgz
SparkはScala 2.10系になっていて、2.11系にも切り替えられるようですが、Spark SQLは未対応そうな雰囲気もあるので、ここは2.10系のまま進めることにします。
Quick Startに習う
圧縮ファイルを解凍したら、Quick Startに沿って動かしてみます。
Quick Start
http://spark.apache.org/docs/latest/quick-start.html
解凍したディレクトリ内に入って
$ cd spark-1.4.1-bin-hadoop2.6
Spark Shellを起動。
$ bin/spark-shell
こんなバナーが出つつ、Spark Shellが起動します。
Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 1.4.1 /_/ Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_45)
まずは、Quick Start(Basics)に沿ってWord Count的な。
Quick Start(Basics)
http://spark.apache.org/docs/latest/quick-start.html#basics
scala> val textFile = sc.textFile("README.md")
ここで、README.mdは「spark-1.4.1-bin-hadoop2.6」ディレクトリ配下にあるものです。
spark-1.4.1-bin-hadoop2.6/README.md
count。
scala> textFile.count() 〜省略〜 15/07/24 23:10:10 INFO DAGScheduler: Job 0 finished: count at <console>:24, took 0.242759 s res0: Long = 98
first。
scala> textFile.first() 〜省略〜 15/07/24 23:10:48 INFO DAGScheduler: Job 1 finished: first at <console>:24, took 0.018444 s res1: String = # Apache Spark
「Spark」を含む行数。
scala> textFile.filter(_.contains("Spark")).count() 〜省略〜 15/07/24 23:12:46 INFO DAGScheduler: Job 3 finished: count at <console>:24, took 0.025289 s res6: Long = 19
とりあえず、動かせました。
いったん、Spark Shellを終了。
scala> :q
LuceneのStandardAnalyzerでWord Countしてみる
今度は、こちらを見ながらアプリケーションを書いて、Spark Submitで実行してみたいと思います。
Self-Contained Applications
http://spark.apache.org/docs/latest/quick-start.html#self-contained-applications
お題は、先ほどのREADME.mdを使って、LuceneのStandardAnalyzerでトークナイズして、頻出語上位10個を取り出してみます。
ビルド定義。
build.sbt
name := "word-count-lucene-analyzer" version := "0.0.1-SNAPSHOT" scalaVersion := "2.10.4" organization := "org.littlewings" scalacOptions ++= Seq("-Xlint", "-deprecation", "-unchecked", "-feature") updateOptions := updateOptions.value.withCachedResolution(true) libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % "1.4.1" % "provided", "org.apache.lucene" % "lucene-analyzers-common" % "5.2.1" )
Sparkは、「provided」としました。
sbt-assemblyも使います。
project/plugins.sbt
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.13.0")
Word Countするプログラム。
src/main/scala/org/littlewings/spark/WordCount.scala
package org.littlewings.spark import java.io.StringReader import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf import org.apache.lucene.analysis.standard.StandardAnalyzer import org.apache.lucene.analysis.tokenattributes.CharTermAttribute object WordCount { def main(args: Array[String]): Unit = { val file = args.toList.headOption.getOrElse("README.md") println(s"Input File => $file") val conf = new SparkConf().setAppName("Word Count Application") val sc = new SparkContext(conf) val textFile = sc.textFile(file).cache() implicit val ordering = TulpleOrdering textFile .flatMap { line => val analyzer = new StandardAnalyzer val tokenStream = analyzer.tokenStream("", new StringReader(line)) val charAttr = tokenStream.addAttribute(classOf[CharTermAttribute]) tokenStream.reset() try { Iterator .continually(tokenStream.incrementToken()) .takeWhile(identity) .map(_ => charAttr.toString) .toVector } finally { tokenStream.end() tokenStream.close() } } .map(word => (word, 1)) .reduceByKey((a, b) => a + b) .top(10) .foreach(println) } .map(word => (word, 1)) .reduceByKey((a, b) => a + b) .top(10) .foreach(println) } } object TulpleOrdering extends Ordering[(String, Int)] { override def compare(x: (String, Int), y: (String, Int)): Int = x._2 - y._2 }
あとはassemblyして
> assembly
実行してみます。
$ bin/spark-submit --class "org.littlewings.spark.WordCount" --master local /path/to/word-count-lucene-analyzer-assembly-0.0.1-SNAPSHOT.jar
結果、このようになりました。
(spark,24) (run,13) (hadoop,9) (building,9) (you,8) (example,8) (http,7) (spark.apache.org,6) (can,6) (also,5)
できたみたいです。
とりあえず動かせましたよということで。