CLOVER🍀

That was when it all began.

Apache Sparkことはじめ

だいぶ今更ながらですが、Apache Sparkを試してみることにしました。前々から、興味がちょっとありまして。

Apache Spark
http://spark.apache.org/

Apache Spark の紹介(前半:Sparkのキホン)
http://www.slideshare.net/hadoopxnttdata/apache-spark-spark

Apache Sparkのご紹介 (後半:技術トピック)
http://www.slideshare.net/hadoopxnttdata/apache-spark

が、あんまりSpark自体まだよくわかっていないので、ちょっとずつ触りながら感覚をつかめたらいいなーと思っています。

とりあえず、くじけなければ継続テーマにするつもりです(笑)。

では、Hello World的に始めてみたいと思います。

Apache Sparkのダウンロード

まずはダウンロードページから、Apache Sparkをダウンロードしてきます。現時点でのApache Sparkのバージョンは、1.4.1でした。

Download Spark
http://spark.apache.org/downloads.html

なんかいろいろあってよくわからないのですが、今回は「Pre-built for Hadoop 2.6 and later」というのを選んでダウンロードしました。今のところ、Hadoopには触れるつもりはないのですが…それでもいいのかな…。

ダウンロードして、解凍。

$ tar -zxvf spark-1.4.1-bin-hadoop2.6.tgz

SparkはScala 2.10系になっていて、2.11系にも切り替えられるようですが、Spark SQLは未対応そうな雰囲気もあるので、ここは2.10系のまま進めることにします。

Quick Startに習う

圧縮ファイルを解凍したら、Quick Startに沿って動かしてみます。

Quick Start
http://spark.apache.org/docs/latest/quick-start.html

解凍したディレクトリ内に入って

$ cd spark-1.4.1-bin-hadoop2.6

Spark Shellを起動。

$ bin/spark-shell

こんなバナーが出つつ、Spark Shellが起動します。

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.4.1
      /_/

Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_45)

まずは、Quick Start(Basics)に沿ってWord Count的な。

Quick Start(Basics)
http://spark.apache.org/docs/latest/quick-start.html#basics

scala> val textFile = sc.textFile("README.md")

ここで、README.mdは「spark-1.4.1-bin-hadoop2.6」ディレクトリ配下にあるものです。

spark-1.4.1-bin-hadoop2.6/README.md

count。

scala> textFile.count()
〜省略〜
15/07/24 23:10:10 INFO DAGScheduler: Job 0 finished: count at <console>:24, took 0.242759 s
res0: Long = 98

first。

scala> textFile.first()
〜省略〜
15/07/24 23:10:48 INFO DAGScheduler: Job 1 finished: first at <console>:24, took 0.018444 s
res1: String = # Apache Spark

「Spark」を含む行数。

scala> textFile.filter(_.contains("Spark")).count()
〜省略〜
15/07/24 23:12:46 INFO DAGScheduler: Job 3 finished: count at <console>:24, took 0.025289 s
res6: Long = 19

とりあえず、動かせました。

いったん、Spark Shellを終了。

scala> :q

LuceneのStandardAnalyzerでWord Countしてみる

今度は、こちらを見ながらアプリケーションを書いて、Spark Submitで実行してみたいと思います。

Self-Contained Applications
http://spark.apache.org/docs/latest/quick-start.html#self-contained-applications

お題は、先ほどのREADME.mdを使って、LuceneのStandardAnalyzerでトークナイズして、頻出語上位10個を取り出してみます。

ビルド定義。
build.sbt

name := "word-count-lucene-analyzer"

version := "0.0.1-SNAPSHOT"

scalaVersion := "2.10.4"

organization := "org.littlewings"

scalacOptions ++= Seq("-Xlint", "-deprecation", "-unchecked", "-feature")

updateOptions := updateOptions.value.withCachedResolution(true)

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "1.4.1" % "provided",
  "org.apache.lucene" % "lucene-analyzers-common" % "5.2.1"
)

Sparkは、「provided」としました。

sbt-assemblyも使います。
project/plugins.sbt

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.13.0")

Word Countするプログラム。
src/main/scala/org/littlewings/spark/WordCount.scala

package org.littlewings.spark

import java.io.StringReader

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

import org.apache.lucene.analysis.standard.StandardAnalyzer
import org.apache.lucene.analysis.tokenattributes.CharTermAttribute

object WordCount {
  def main(args: Array[String]): Unit = {
    val file = args.toList.headOption.getOrElse("README.md")
    println(s"Input File => $file")

    val conf = new SparkConf().setAppName("Word Count Application")
    val sc = new SparkContext(conf)
    val textFile = sc.textFile(file).cache()

    implicit val ordering = TulpleOrdering

    textFile
      .flatMap { line =>
        val analyzer = new StandardAnalyzer
        val tokenStream = analyzer.tokenStream("", new StringReader(line))
        val charAttr = tokenStream.addAttribute(classOf[CharTermAttribute])

        tokenStream.reset()

        try {
          Iterator
            .continually(tokenStream.incrementToken())
            .takeWhile(identity)
            .map(_ => charAttr.toString)
            .toVector
        } finally {
          tokenStream.end()
          tokenStream.close()
        }
      }
      .map(word => (word, 1))
      .reduceByKey((a, b) => a + b)
      .top(10)
      .foreach(println)
  }

      .map(word => (word, 1))
      .reduceByKey((a, b) => a + b)
      .top(10)
      .foreach(println)
  }
}

object TulpleOrdering extends Ordering[(String, Int)] {
  override def compare(x: (String, Int), y: (String, Int)): Int =
    x._2 - y._2
}

あとはassemblyして

> assembly

実行してみます。

$ bin/spark-submit --class "org.littlewings.spark.WordCount" --master local /path/to/word-count-lucene-analyzer-assembly-0.0.1-SNAPSHOT.jar

結果、このようになりました。

(spark,24)
(run,13)
(hadoop,9)
(building,9)
(you,8)
(example,8)
(http,7)
(spark.apache.org,6)
(can,6)
(also,5)

できたみたいです。

とりあえず動かせましたよということで。