CLOVER🍀

That was when it all began.

Apache SparkのSpark Standalone Modeを動かしてみる

前回SparkでHello World的なことをやりましたが、今回はSpark Standalone Modeを試してみることにします。

Spark Standalone Mode
https://spark.apache.org/docs/latest/spark-standalone.html

が、そもそもこれってSparkでとりうるDeployment Modeの一種らしく、クラスタについての用語を見ておく必要がありそうです。

Cluster Modeの用語を読む

Cluster Mode Overview
https://spark.apache.org/docs/latest/cluster-overview.html

ここでのGlossaryという項目に着目すると、それぞれこんな感じ…?

用語 意味
Application ユーザーがビルドした、Spark上に載せられるもの。クラスタ上のプログラムとExecutorで構成される
Application jar ユーザーが作ったSparkアプリケーションを含むJARファイル。「uber jar」形式(依存関係も含めてひとまとめにしたJARファイル)で作成することになり、HadoopおよびSparkのライブラリは含めないこと(実行時に追加される)
Driver program mainメソッドを持ち、SparkContextを作成するアプリケーション(要はユーザーが作ったRDD変換プログラムのこと)
Cluster manager クラスタ上のリソースを管理するための外部サービス(Standalone Manager、Mesos、YARN)
Deploy mode Driverの実行場所を区別するもの。「クラスタ」モードでは、フレームワーククラスタ内のDriverを起動する。「クライアント」モードでは、Submitした人がクラスタ外のDriverを起動する
Worker node クラスタ内で、アプリケーション・コードを実行できる任意のNode
Executor Workker Node上で、アプリケーションのために起動されたプロセス。タスクの実行中は、メモリまたはディスクにデータを保持する。各アプリケーションは、自分のExecutor(s)を持つ
Task ひとつのExecutorに送信される、作業の単位
Job Sparkのアクション(save、collectなど)への応答として生成される、複数のタスクから成る並列計算。この単語をDriverのログで見ることができるだろう
Stage 各ジョブは、互いに依存のあるステージと呼ばれる小さなタスクのセットに分割される(MapReduceでのmapとreduceのステージに似ている)。この単語を、Driverのログで見ることができるだろう

参考)
Apache Sparkのご紹介 (後半:技術トピック)
http://www.slideshare.net/hadoopxnttdata/apache-spark/14

なんかちょっと怪しい気もしますけど、雰囲気は…まあ…。

サンプルアプリケーション

まずはSparkで動かすアプリケーションがなければ話が進まないので、プログラムを用意します。といっても、前回利用したプログラムをちょっといじったものですけど。
※先ほどの用語でいくと、Driverですね
src/main/scala/org/littlewings/spark/WordCount.scala

package org.littlewings.spark

import org.apache.lucene.analysis.standard.StandardAnalyzer
import org.apache.lucene.analysis.tokenattributes.CharTermAttribute
import org.apache.spark.{SparkConf, SparkContext}

object WordCount {
  def main(args: Array[String]): Unit = {
    val file = args.toList.headOption.getOrElse("README.md")
    println(s"Input File => $file")

    val conf = new SparkConf().setAppName("Word Count Application")
    val sc = new SparkContext(conf)
    val textFile = sc.textFile(file)

    println(s"partitioner = ${textFile.partitioner}")
    println(s"partitions = ${textFile.partitions.size}")

    implicit val ordering = TulpleOrdering

    textFile
      .flatMap { line =>

      val analyzer = new StandardAnalyzer
      val tokenStream = analyzer.tokenStream("", line)
      val charAttr = tokenStream.addAttribute(classOf[CharTermAttribute])

      tokenStream.reset()

      try {
        Iterator
          .continually(tokenStream.incrementToken())
          .takeWhile(identity)
          .map(_ => charAttr.toString)
          .toVector
      } finally {
        tokenStream.end()
        tokenStream.close()
      }
    }
      .map(word => (word, 1))
      .reduceByKey((a, b) => a + b)
      .top(10)
      .foreach(println)
  }
}

object TulpleOrdering extends Ordering[(String, Int)] {
  override def compare(x: (String, Int), y: (String, Int)): Int =
    x._2 - y._2
}

ザ・Word Countです。

ビルド定義と、プラグインの定義はこのように。
build.sbt

name := "word-count-lucene-analyzer"

version := "0.0.1-SNAPSHOT"

scalaVersion := "2.10.4"

organization := "org.littlewings"

scalacOptions ++= Seq("-Xlint", "-deprecation", "-unchecked", "-feature")

updateOptions := updateOptions.value.withCachedResolution(true)

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "1.4.1" % "provided",
  "org.apache.lucene" % "lucene-analyzers-common" % "5.2.1"
)

project/plugins.sbt

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.13.0")

クラスタを立てる

それでは、以下を参考にSpark Standalone Modeでクラスタを立ててみます。

Starting a Cluster Manually
https://spark.apache.org/docs/latest/spark-standalone.html#starting-a-cluster-manually

Cluster Launch Scripts
https://spark.apache.org/docs/latest/spark-standalone.html#cluster-launch-scripts

とりあえず、今回は「spark-1.4.1-bin-hadoop2.6」で動かすとして…

$ cd spark-1.4.1-bin-hadoop2.6

マスターの起動。

$ sbin/start-master.sh -h localhost -p 7077

オプション-hと-pで、それぞれリッスンするホスト名とポートを指定しているのですが、これがUbuntu Linuxだとちょっとハマるので、ほぼデフォルトみたいなものですが明示することにします(デフォルトではstart-master.shから起動されるスクリプトでは、ホスト名はhostnameコマンドの結果を使う)。

スレーブの起動。マスタを起動した時と、同じホスト、同じディレクトリで実行しています。

$ sbin/start-slave.sh spark://localhost:7077

引数は、マスターのURLです。

「SPARK_WORKER_INSTANCES」という値を調整しないと、ひとつしかスレーブは起動できないみたいですが…とりあえず、今回はこのままとします。

なお、起動するとlogsというディレクトリができるので、この中にNodeのログが出力されるようです。

http://localhost:8080/」にアクセスすると、Worker(この文章中では、スレーブと書かれていますが…)っぽいのが入っていることがわかります。

Spark Shellで、クラスタにつないでみます。

$ bin/spark-shell --master spark://localhost:7077

接続すると、Web UIにもSpark Shellが現れます。

では、先ほど用意したWord Countのアプリケーションを動かしてみます。

$ bin/spark-submit --class "org.littlewings.spark.WordCount" --master spark://localhost:7077 /path/to/word-count-lucene-analyzer-assembly-0.0.1-SNAPSHOT.jar

とりあえず、結果は普通に得られますが

(spark,24)
(run,13)
(building,9)
(hadoop,9)
(example,8)
(you,8)
(http,7)
(can,6)
(spark.apache.org,6)
(latest,5)

Web UIからも、タスクの情報を見ることができます。標準出力、標準エラー出力も見ることができます。

なお、タスクを実行するとworkというディレクトリができるようなので、この中を見ると標準出力、標準エラー出力、そしてApplication Jarが置かれていることがわかります。

$ ll work/app-20150726214853-0001/0/
合計 10832
drwxrwxr-x 2 xxxxx xxxxx     4096  726 21:48 ./
drwxrwxr-x 3 xxxxx xxxxx     4096  726 21:48 ../
-rw-rw-r-- 1 xxxxx xxxxx     8741  726 21:48 stderr
-rw-rw-r-- 1 xxxxx xxxxx        0  726 21:48 stdout
-rwxrwxr-x 1 xxxxx xxxxx 11068992  726 21:48 word-count-lucene-analyzer-assembly-0.0.1-SNAPSHOT.jar

なるほど…。

マスター、スレーブの終了

ここに書かれてあるスクリプトを実行すると、マスターおよびスレーブ、もしくは全部を終了させることができます。

Cluster Launch Scripts
https://spark.apache.org/docs/latest/spark-standalone.html#cluster-launch-scripts

今回は、stop-all.shスクリプトを使用します。

$ sbin/stop-all.sh 
xxxxx@localhost's password: 

SSH公開鍵の設定は特にしていないので、パスワードを聞かれますが…。

これで、マスターおよびスレーブが停止できました。