CLOVER🍀

That was when it all began.

Apache SparkをScalaTestで動かしたい

このところ、SparkをStandalone ModeやYARNで動かしていましたが、もうちょっと機能的な感覚をつかみたいと思いまして。

で、毎度Spark Submitしてもいいのですが、テストコードで動かせないかなぁ、と…。

調べた感じ、やれないこともなさそうな雰囲気…。

Testing Spark Streaming Applications
http://eng.tapjoy.com/blog-list/testing-spark-streaming-applications

https://spark-summit.org/2014/wp-content/uploads/2014/06/Testing-Spark-Best-Practices-Anupama-Shetty-Neil-Marshall.pdf

ちょっと、やってみましょう!

追記
ドキュメントをよーく見ると、ここにヒントが書いてありました…。

Unit Testing
http://spark.apache.org/docs/latest/programming-guide.html#unit-testing

準備

まずは、ビルド定義。
build.sbt

name := "spark-simple-test"

version := "0.0.1-SNAPSHOT"

scalaVersion := "2.10.4"

organization := "org.littlewings"

scalacOptions ++= Seq("-Xlint", "-deprecation", "-unchecked", "-feature")

updateOptions := updateOptions.value.withCachedResolution(true)

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "1.4.1" % "test",
  "org.slf4j" % "slf4j-api" % "1.7.10" % "test",
  "org.scalatest" %% "scalatest" % "2.2.5" % "test"
)

今回はCDHではなくApacheのものを使います。「spark-core」のスコープは、「test」です。

依存関係にSLF4Jが入っているのですが、どうもこれがないとエラーになるみたいです。

A needed class was not found. This could be due to an error in your runpath. Missing class: org/slf4j/LoggerFactory
java.lang.NoClassDefFoundError: org/slf4j/LoggerFactory
	at org.apache.commons.logging.impl.SLF4JLogFactory.getInstance(SLF4JLogFactory.java:155)

はて?という気もしますが。

テストを書く

先ほどの依存関係の定義にも書いていますが、テストコードの作成にはScalaTestを使うことにしました。

テストコードの外観は、こんな感じ。
src/test/scala/org/littlewings/spark/SparkSimpleSpec.scala

package org.littlewings.spark

import org.apache.spark.{SparkConf, SparkContext}
import org.scalatest.FunSpec
import org.scalatest.Matchers._

class SparkSimpleSpec extends FunSpec {
  describe("Spark Simple Test") {
    // ここに、テストを書く!
  }
}

このクラスに、SparkContextを生成するメソッドを追加します。結果は、このように。

  protected def withSpark(f: (SparkContext => Unit)): Unit = {
    val conf = new SparkConf().setMaster("local[4]").setAppName("Spark Test")
    val sc = new SparkContext(conf)

    try {
      f(sc)
    } finally {
      sc.stop()
    }
  }

引数として SparkContext => Unit な関数を取り、SparkContext生成の上で関数を呼び出すようにしました。後始末的には、最後にSparkContext#stopを呼び出しておけばよいみたいです。

また、SparkConf#setMasterしておくことも必要です。

    val conf = new SparkConf().setMaster("local[4]").setAppName("Spark Test")

ポイントは、このあたりでしょうか。

参考にしたサイトでは、BeforeAndAfterトレイトを使っているものが多かったのですが、自分はこういうスタイルにしました。

今回書いてみた、簡単なテストコード。

    it("sum") {
      withSpark(sc => {
        val rdd = sc.parallelize(Array(1, 2, 3, 4, 5))
        val result = rdd.map(_ * 2).fold(0)(_ + _)

        result should be(30)
      })
    }

    it("max") {
      withSpark(sc => {
        val rdd = sc.parallelize(Array(1, 5, 3, 16, 10, 4, 15, 1))
        rdd.max should be(16)
      })
    }

    it("values") {
      withSpark(sc => {
        val rdd = sc.parallelize(Map("1" -> "Java", "2" -> "Scala", "3" -> "Groovy").toVector)
        rdd.values.collect should be(Array("Java", "Scala", "Groovy"))
      })
    }

HDFSはおろか、ローカルファイルすら使っていませんが…きちんとパスしてくれたので、OKそうでございます。

bin/spark-submitは?

と、ここまでやると個人的にはspark-submitスクリプトが何をしているのかがちょっと気になります。何か大事な初期化処理とか抜かしていないでしょうかと…。

spark-submitスクリプトは、内部でspark-classスクリプトを呼び出すようです。
https://github.com/apache/spark/blob/v1.4.1/bin/spark-submit#L37

exec "$SPARK_HOME"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"

spark-classスクリプトでは、上述のSparkSubmitクラスが起動する前に、1度Javaプログラムが起動するようになっています。
https://github.com/apache/spark/blob/v1.4.1/bin/spark-class#L100

CMD=()
while IFS= read -d '' -r ARG; do
  CMD+=("$ARG")
done < <("$RUNNER" -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@")

このクラス、どうも次に起動するmainメソッドを持ったクラスに渡す起動引数を作るためのものっぽいです。

若干、SparkSubmitクラスは特別扱いされているようですが。
https://github.com/apache/spark/blob/v1.4.1/launcher/src/main/java/org/apache/spark/launcher/Main.java#L60

で、ここでできあがったコマンドがexecされます。
https://github.com/apache/spark/blob/v1.4.1/bin/spark-class#L105

 exec "${CMD[@]}"

例えば、以下のような起動コマンドでspark-submitスクリプトを実行した場合

$ bin/spark-submit --class "org.littlewings.spark.WordCount" --master local /path/to/target/scala-2.10/word-count-lucene-analyzer-assembly-0.0.1-SNAPSHOT.jar 

execには、以下のような結果が渡されます。

/usr/lib/jvm/java-8-oracle/bin/java -cp /path/to/spark-1.4.1-bin-hadoop2.6/conf/:/path/to/spark-1.4.1-bin-hadoop2.6/lib/spark-assembly-1.4.1-hadoop2.6.0.jar:/path/to/spark-1.4.1-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/path/to/spark-1.4.1-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/path/to/spark-1.4.1-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar -Xms512m -Xmx512m org.apache.spark.deploy.SparkSubmit --master local --class org.littlewings.spark.WordCount /path/to/target/scala-2.10/word-count-lucene-analyzer-assembly-0.0.1-SNAPSHOT.jar

ここから起動されるSparSubmitクラスですが、起動引数に応じて環境をいろいろ設定して、最後にDriver Programのmainメソッドを実行する、みたいな流れのようです。

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L118
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L216
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L666

実行環境がYARNとかだったりする場合は、mainメソッドを持ったプログラムは「org.apache.spark.deploy.yarn.Client」とかになったりするようですが。

とりあえず、テストコード上でSpark動かせそうなので、これでいいかな…。