このところ、SparkをStandalone ModeやYARNで動かしていましたが、もうちょっと機能的な感覚をつかみたいと思いまして。
で、毎度Spark Submitしてもいいのですが、テストコードで動かせないかなぁ、と…。
調べた感じ、やれないこともなさそうな雰囲気…。
Testing Spark Streaming Applications
http://eng.tapjoy.com/blog-list/testing-spark-streaming-applications
ちょっと、やってみましょう!
追記)
ドキュメントをよーく見ると、ここにヒントが書いてありました…。
Unit Testing
http://spark.apache.org/docs/latest/programming-guide.html#unit-testing
準備
まずは、ビルド定義。
build.sbt
name := "spark-simple-test" version := "0.0.1-SNAPSHOT" scalaVersion := "2.10.4" organization := "org.littlewings" scalacOptions ++= Seq("-Xlint", "-deprecation", "-unchecked", "-feature") updateOptions := updateOptions.value.withCachedResolution(true) libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % "1.4.1" % "test", "org.slf4j" % "slf4j-api" % "1.7.10" % "test", "org.scalatest" %% "scalatest" % "2.2.5" % "test" )
今回はCDHではなくApacheのものを使います。「spark-core」のスコープは、「test」です。
依存関係にSLF4Jが入っているのですが、どうもこれがないとエラーになるみたいです。
A needed class was not found. This could be due to an error in your runpath. Missing class: org/slf4j/LoggerFactory java.lang.NoClassDefFoundError: org/slf4j/LoggerFactory at org.apache.commons.logging.impl.SLF4JLogFactory.getInstance(SLF4JLogFactory.java:155)
はて?という気もしますが。
テストを書く
先ほどの依存関係の定義にも書いていますが、テストコードの作成にはScalaTestを使うことにしました。
テストコードの外観は、こんな感じ。
src/test/scala/org/littlewings/spark/SparkSimpleSpec.scala
package org.littlewings.spark import org.apache.spark.{SparkConf, SparkContext} import org.scalatest.FunSpec import org.scalatest.Matchers._ class SparkSimpleSpec extends FunSpec { describe("Spark Simple Test") { // ここに、テストを書く! } }
このクラスに、SparkContextを生成するメソッドを追加します。結果は、このように。
protected def withSpark(f: (SparkContext => Unit)): Unit = { val conf = new SparkConf().setMaster("local[4]").setAppName("Spark Test") val sc = new SparkContext(conf) try { f(sc) } finally { sc.stop() } }
引数として SparkContext => Unit な関数を取り、SparkContext生成の上で関数を呼び出すようにしました。後始末的には、最後にSparkContext#stopを呼び出しておけばよいみたいです。
また、SparkConf#setMasterしておくことも必要です。
val conf = new SparkConf().setMaster("local[4]").setAppName("Spark Test")
ポイントは、このあたりでしょうか。
参考にしたサイトでは、BeforeAndAfterトレイトを使っているものが多かったのですが、自分はこういうスタイルにしました。
今回書いてみた、簡単なテストコード。
it("sum") { withSpark(sc => { val rdd = sc.parallelize(Array(1, 2, 3, 4, 5)) val result = rdd.map(_ * 2).fold(0)(_ + _) result should be(30) }) } it("max") { withSpark(sc => { val rdd = sc.parallelize(Array(1, 5, 3, 16, 10, 4, 15, 1)) rdd.max should be(16) }) } it("values") { withSpark(sc => { val rdd = sc.parallelize(Map("1" -> "Java", "2" -> "Scala", "3" -> "Groovy").toVector) rdd.values.collect should be(Array("Java", "Scala", "Groovy")) }) }
HDFSはおろか、ローカルファイルすら使っていませんが…きちんとパスしてくれたので、OKそうでございます。
bin/spark-submitは?
と、ここまでやると個人的にはspark-submitスクリプトが何をしているのかがちょっと気になります。何か大事な初期化処理とか抜かしていないでしょうかと…。
spark-submitスクリプトは、内部でspark-classスクリプトを呼び出すようです。
https://github.com/apache/spark/blob/v1.4.1/bin/spark-submit#L37
exec "$SPARK_HOME"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
spark-classスクリプトでは、上述のSparkSubmitクラスが起動する前に、1度Javaプログラムが起動するようになっています。
https://github.com/apache/spark/blob/v1.4.1/bin/spark-class#L100
CMD=() while IFS= read -d '' -r ARG; do CMD+=("$ARG") done < <("$RUNNER" -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@")
このクラス、どうも次に起動するmainメソッドを持ったクラスに渡す起動引数を作るためのものっぽいです。
若干、SparkSubmitクラスは特別扱いされているようですが。
https://github.com/apache/spark/blob/v1.4.1/launcher/src/main/java/org/apache/spark/launcher/Main.java#L60
で、ここでできあがったコマンドがexecされます。
https://github.com/apache/spark/blob/v1.4.1/bin/spark-class#L105
exec "${CMD[@]}"
例えば、以下のような起動コマンドでspark-submitスクリプトを実行した場合
$ bin/spark-submit --class "org.littlewings.spark.WordCount" --master local /path/to/target/scala-2.10/word-count-lucene-analyzer-assembly-0.0.1-SNAPSHOT.jar
execには、以下のような結果が渡されます。
/usr/lib/jvm/java-8-oracle/bin/java -cp /path/to/spark-1.4.1-bin-hadoop2.6/conf/:/path/to/spark-1.4.1-bin-hadoop2.6/lib/spark-assembly-1.4.1-hadoop2.6.0.jar:/path/to/spark-1.4.1-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/path/to/spark-1.4.1-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/path/to/spark-1.4.1-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar -Xms512m -Xmx512m org.apache.spark.deploy.SparkSubmit --master local --class org.littlewings.spark.WordCount /path/to/target/scala-2.10/word-count-lucene-analyzer-assembly-0.0.1-SNAPSHOT.jar
ここから起動されるSparSubmitクラスですが、起動引数に応じて環境をいろいろ設定して、最後にDriver Programのmainメソッドを実行する、みたいな流れのようです。
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L118
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L216
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L666
実行環境がYARNとかだったりする場合は、mainメソッドを持ったプログラムは「org.apache.spark.deploy.yarn.Client」とかになったりするようですが。
とりあえず、テストコード上でSpark動かせそうなので、これでいいかな…。