前回SparkでHello World的なことをやりましたが、今回はSpark Standalone Modeを試してみることにします。
Spark Standalone Mode
https://spark.apache.org/docs/latest/spark-standalone.html
が、そもそもこれってSparkでとりうるDeployment Modeの一種らしく、クラスタについての用語を見ておく必要がありそうです。
Cluster Modeの用語を読む
Cluster Mode Overview
https://spark.apache.org/docs/latest/cluster-overview.html
ここでのGlossaryという項目に着目すると、それぞれこんな感じ…?
用語 | 意味 |
---|---|
Application | ユーザーがビルドした、Spark上に載せられるもの。クラスタ上のプログラムとExecutorで構成される |
Application jar | ユーザーが作ったSparkアプリケーションを含むJARファイル。「uber jar」形式(依存関係も含めてひとまとめにしたJARファイル)で作成することになり、HadoopおよびSparkのライブラリは含めないこと(実行時に追加される) |
Driver program | mainメソッドを持ち、SparkContextを作成するアプリケーション(要はユーザーが作ったRDD変換プログラムのこと) |
Cluster manager | クラスタ上のリソースを管理するための外部サービス(Standalone Manager、Mesos、YARN) |
Deploy mode | Driverの実行場所を区別するもの。「クラスタ」モードでは、フレームワークはクラスタ内のDriverを起動する。「クライアント」モードでは、Submitした人がクラスタ外のDriverを起動する |
Worker node | クラスタ内で、アプリケーション・コードを実行できる任意のNode |
Executor | Workker Node上で、アプリケーションのために起動されたプロセス。タスクの実行中は、メモリまたはディスクにデータを保持する。各アプリケーションは、自分のExecutor(s)を持つ |
Task | ひとつのExecutorに送信される、作業の単位 |
Job | Sparkのアクション(save、collectなど)への応答として生成される、複数のタスクから成る並列計算。この単語をDriverのログで見ることができるだろう |
Stage | 各ジョブは、互いに依存のあるステージと呼ばれる小さなタスクのセットに分割される(MapReduceでのmapとreduceのステージに似ている)。この単語を、Driverのログで見ることができるだろう |
参考)
Apache Sparkのご紹介 (後半:技術トピック)
http://www.slideshare.net/hadoopxnttdata/apache-spark/14
なんかちょっと怪しい気もしますけど、雰囲気は…まあ…。
サンプルアプリケーション
まずはSparkで動かすアプリケーションがなければ話が進まないので、プログラムを用意します。といっても、前回利用したプログラムをちょっといじったものですけど。
※先ほどの用語でいくと、Driverですね
src/main/scala/org/littlewings/spark/WordCount.scala
package org.littlewings.spark import org.apache.lucene.analysis.standard.StandardAnalyzer import org.apache.lucene.analysis.tokenattributes.CharTermAttribute import org.apache.spark.{SparkConf, SparkContext} object WordCount { def main(args: Array[String]): Unit = { val file = args.toList.headOption.getOrElse("README.md") println(s"Input File => $file") val conf = new SparkConf().setAppName("Word Count Application") val sc = new SparkContext(conf) val textFile = sc.textFile(file) println(s"partitioner = ${textFile.partitioner}") println(s"partitions = ${textFile.partitions.size}") implicit val ordering = TulpleOrdering textFile .flatMap { line => val analyzer = new StandardAnalyzer val tokenStream = analyzer.tokenStream("", line) val charAttr = tokenStream.addAttribute(classOf[CharTermAttribute]) tokenStream.reset() try { Iterator .continually(tokenStream.incrementToken()) .takeWhile(identity) .map(_ => charAttr.toString) .toVector } finally { tokenStream.end() tokenStream.close() } } .map(word => (word, 1)) .reduceByKey((a, b) => a + b) .top(10) .foreach(println) } } object TulpleOrdering extends Ordering[(String, Int)] { override def compare(x: (String, Int), y: (String, Int)): Int = x._2 - y._2 }
ザ・Word Countです。
ビルド定義と、プラグインの定義はこのように。
build.sbt
name := "word-count-lucene-analyzer" version := "0.0.1-SNAPSHOT" scalaVersion := "2.10.4" organization := "org.littlewings" scalacOptions ++= Seq("-Xlint", "-deprecation", "-unchecked", "-feature") updateOptions := updateOptions.value.withCachedResolution(true) libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % "1.4.1" % "provided", "org.apache.lucene" % "lucene-analyzers-common" % "5.2.1" )
project/plugins.sbt
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.13.0")
クラスタを立てる
それでは、以下を参考にSpark Standalone Modeでクラスタを立ててみます。
Starting a Cluster Manually
https://spark.apache.org/docs/latest/spark-standalone.html#starting-a-cluster-manually
Cluster Launch Scripts
https://spark.apache.org/docs/latest/spark-standalone.html#cluster-launch-scripts
とりあえず、今回は「spark-1.4.1-bin-hadoop2.6」で動かすとして…
$ cd spark-1.4.1-bin-hadoop2.6
マスターの起動。
$ sbin/start-master.sh -h localhost -p 7077
オプション-hと-pで、それぞれリッスンするホスト名とポートを指定しているのですが、これがUbuntu Linuxだとちょっとハマるので、ほぼデフォルトみたいなものですが明示することにします(デフォルトではstart-master.shから起動されるスクリプトでは、ホスト名はhostnameコマンドの結果を使う)。
スレーブの起動。マスタを起動した時と、同じホスト、同じディレクトリで実行しています。
$ sbin/start-slave.sh spark://localhost:7077
引数は、マスターのURLです。
「SPARK_WORKER_INSTANCES」という値を調整しないと、ひとつしかスレーブは起動できないみたいですが…とりあえず、今回はこのままとします。
なお、起動するとlogsというディレクトリができるので、この中にNodeのログが出力されるようです。
「http://localhost:8080/」にアクセスすると、Worker(この文章中では、スレーブと書かれていますが…)っぽいのが入っていることがわかります。
Spark Shellで、クラスタにつないでみます。
$ bin/spark-shell --master spark://localhost:7077
接続すると、Web UIにもSpark Shellが現れます。
では、先ほど用意したWord Countのアプリケーションを動かしてみます。
$ bin/spark-submit --class "org.littlewings.spark.WordCount" --master spark://localhost:7077 /path/to/word-count-lucene-analyzer-assembly-0.0.1-SNAPSHOT.jar
とりあえず、結果は普通に得られますが
(spark,24) (run,13) (building,9) (hadoop,9) (example,8) (you,8) (http,7) (can,6) (spark.apache.org,6) (latest,5)
Web UIからも、タスクの情報を見ることができます。標準出力、標準エラー出力も見ることができます。
なお、タスクを実行するとworkというディレクトリができるようなので、この中を見ると標準出力、標準エラー出力、そしてApplication Jarが置かれていることがわかります。
$ ll work/app-20150726214853-0001/0/ 合計 10832 drwxrwxr-x 2 xxxxx xxxxx 4096 7月 26 21:48 ./ drwxrwxr-x 3 xxxxx xxxxx 4096 7月 26 21:48 ../ -rw-rw-r-- 1 xxxxx xxxxx 8741 7月 26 21:48 stderr -rw-rw-r-- 1 xxxxx xxxxx 0 7月 26 21:48 stdout -rwxrwxr-x 1 xxxxx xxxxx 11068992 7月 26 21:48 word-count-lucene-analyzer-assembly-0.0.1-SNAPSHOT.jar
なるほど…。
マスター、スレーブの終了
ここに書かれてあるスクリプトを実行すると、マスターおよびスレーブ、もしくは全部を終了させることができます。
Cluster Launch Scripts
https://spark.apache.org/docs/latest/spark-standalone.html#cluster-launch-scripts
今回は、stop-all.shスクリプトを使用します。
$ sbin/stop-all.sh xxxxx@localhost's password:
SSH公開鍵の設定は特にしていないので、パスワードを聞かれますが…。
これで、マスターおよびスレーブが停止できました。