今までは、SparkのStandalone ModeとYARN上で動かすといった動作環境的なものを少し試してきたので、もうちょっとSpark自体で遊んでみようと思いまして。
まずは、Spark Streamingの初歩的なところからやってみたいと思います。
Spark Streaming Programming Guide
http://spark.apache.org/docs/latest/streaming-programming-guide.html
とか言いながら、ちょっと寄り道してみたらめっちゃハマりました…。
Spark Streamingとは?
こちらを見る限り…
Overview
http://spark.apache.org/docs/latest/streaming-programming-guide.html#overview
Kafka、Flume、Twitter、ZeroMQ、Kinesis、そしてTCPソケットなどから継続的にデータを読み込み、map、reduce、joinやwindow関数を使用した後、処理結果をファイルシステムやデータベースに保存したり、ダッシュボードで使ったりする感じみたいですね。機械学習とかも睨んでいるようです。
が、今の自分にはあまり知識のない領域だったりします…。
準備
とりあえず、動かす環境を用意します。今回は、テストコード上でSpark Streamingを動かすことにします。
build.sbtは、このようにしました。
build.sbt
name := "spark-streaming-simple" version := "0.0.1-SNAPSHOT" scalaVersion := "2.10.4" organization := "org.littlewings" scalacOptions ++= Seq("-Xlint", "-deprecation", "-unchecked", "-feature") updateOptions := updateOptions.value.withCachedResolution(true) fork in Test := true parallelExecution in Test := false libraryDependencies ++= Seq( "org.apache.spark" %% "spark-streaming" % "1.4.1" % "test", "org.slf4j" % "slf4j-api" % "1.7.10" % "test", "org.apache.lucene" % "lucene-analyzers-common" % "5.2.1" % "test", "org.scalatest" %% "scalatest" % "2.2.5" % "test" )
「spark-streaming」というモジュールを利用します。
そして、今回もSparkのREADME.mdをWord Countしようと思いますので、LuceneのAnalyzerが依存関係に引っ付いています。
Spark Streamingを使ったプログラムを書く
実行は、ScalaTestと合わせて行うことにしました。とはいえ、別にテスト自体をしているわけではなく、動かす環境がテストコードだ、というだけなのですが。
雛形は、こんな感じです。
src/test/scala/org/littlewings/spark/SimpleStreamingSpec.scala
package org.littlewings.spark import java.io.File import java.net.ServerSocket import java.nio.charset.StandardCharsets import java.nio.file import java.nio.file.attribute.{BasicFileAttributes, FileTime} import java.nio.file.{FileVisitResult, Files, Paths, SimpleFileVisitor} import java.time.{LocalDateTime, ZoneOffset} import java.util.concurrent.TimeUnit import org.apache.commons.io.FileUtils import org.apache.lucene.analysis.standard.StandardAnalyzer import org.apache.lucene.analysis.tokenattributes.CharTermAttribute import org.apache.spark.SparkConf import org.apache.spark.streaming.{Duration, Durations, StreamingContext} import org.scalatest.FunSpec import scala.io.Source class SimpleStreamingSpec extends FunSpec { describe("SimpleStreamingSpec") { FileUtils.deleteDirectory(new File("src/test/resources/output")) // ここに、テストを書く! } protected def withStreaming(f: StreamingContext => Unit): Unit = { val conf = new SparkConf().setMaster("local[4]").setAppName("Simple Spark Streaming") val sc = new StreamingContext(conf, Durations.seconds(5L)) try { f(sc) } finally { sc.stop(true) } } }
Spark Streamingでは、SparkContextに代わりStreamingContextを使用するようです(SparkContextを内部に持っているようですが)。
StreamingContextに渡している第2引数は、読み込み対象へのアクセス間隔っぽいです。
val sc = new StreamingContext(conf, Durations.seconds(5L))
今回は、5秒としました。
最後に、StreamingContext#stopを行います。
sc.stop(true)
引数にtrueを指定することで、内部のSparkContextも停止してくれます。また、第2引数を取るものもあり、こちらはGraceful Shutdown的な感じみたいです。
それでは、これらを使って今回はソケットとローカルファイルシステムからデータを読み取ってWord Countし、結果をローカルファイルに保存してみます。
まずはソケットから。
とりあえず、StreamingContextを作成して
it("with socket text stream") {
withStreaming { sc =>
簡単なサーバーを書いてみます。
val readmeFromGitHub: String = { val source = Source.fromURL("https://raw.githubusercontent.com/apache/spark/v1.4.1/README.md", "UTF-8") val text = source.mkString source.close() text } val thread = new Thread { override def run(): Unit = { Iterator .continually(serverSocket.accept()) .foreach { socket => val os = socket.getOutputStream readmeFromGitHub.getBytes(StandardCharsets.UTF_8).foreach(b => os.write(b.asInstanceOf[Int])) // 少し時間調整… TimeUnit.SECONDS.sleep(2L) os.flush() socket.close() } } } thread.start() // ServerSocketの起動待ち TimeUnit.SECONDS.sleep(1L)
このサーバーが返す内容は、SparkのREADME.mdをGitHubより取得したものにしています。
このサーバーに対して、StreamingContext#socketStreamで接続するよう定義します。
val lines = sc.socketTextStream("localhost", 9999)
Word Countするように処理を書き…
lines .flatMap { line => val analyzer = new StandardAnalyzer val tokenStream = analyzer.tokenStream("", line) val charAttr = tokenStream.addAttribute(classOf[CharTermAttribute]) tokenStream.reset() try { Iterator .continually(tokenStream.incrementToken()) .takeWhile(identity) .map(_ => charAttr.toString) .toVector } finally { tokenStream.end() tokenStream.close() } } .map(word => (word, 1)) .reduceByKey((a, b) => a + b) .saveAsTextFiles("src/test/resources/output/sendSocket")
StreamingContext#startします。
sc.start()
ここですぐプログラムを抜けてしまうと、Streamingの処理を何も行わずに抜けてしまうので、ちょっとスリープしておきます。
// しばらく待ち… TimeUnit.SECONDS.sleep(3L) serverSocket.close() } }
最後に、一応ServerSocketも閉じておきますよ、と。
結果は、「src/test/resources/output/sendSocket」で始まるディレクトリ内に、以下のような感じで置かれます。Streaming処理を行った回数だけ、このディレクトリができるんでしょうか…。
$ ll src/test/resources/output/sendSocket-1439125530000/ 合計 44 drwxrwxr-x 2 xxxxx xxxxx 4096 8月 9 22:05 ./ drwxrwxr-x 4 xxxxx xxxxx 4096 8月 9 22:05 ../ -rw-rw-r-- 1 xxxxx xxxxx 8 8月 9 22:05 ._SUCCESS.crc -rw-rw-r-- 1 xxxxx xxxxx 16 8月 9 22:05 .part-00000.crc -rw-rw-r-- 1 xxxxx xxxxx 12 8月 9 22:05 .part-00001.crc -rw-rw-r-- 1 xxxxx xxxxx 16 8月 9 22:05 .part-00002.crc -rw-rw-r-- 1 xxxxx xxxxx 16 8月 9 22:05 .part-00003.crc -rw-r--r-- 1 xxxxx xxxxx 0 8月 9 22:05 _SUCCESS -rw-r--r-- 1 xxxxx xxxxx 640 8月 9 22:05 part-00000 -rw-r--r-- 1 xxxxx xxxxx 504 8月 9 22:05 part-00001 -rw-r--r-- 1 xxxxx xxxxx 569 8月 9 22:05 part-00002 -rw-r--r-- 1 xxxxx xxxxx 621 8月 9 22:05 part-00003
中身の一部。
$ head src/test/resources/output/sendSocket-1439125530000/part-00000 (detailed,2) (stream,1) (distribution,2) (automatedtesting,1) (return,2) (hive,2) (its,1) (guide,2) (level,2) (runs,1)
OKそうです。
続いて、ローカルファイルシステムのファイルから読み取ってみます。
withStreaming { sc => val lines = sc.textFileStream("src/test/resources/input") lines .flatMap { line => val analyzer = new StandardAnalyzer val tokenStream = analyzer.tokenStream("", line) val charAttr = tokenStream.addAttribute(classOf[CharTermAttribute]) tokenStream.reset() try { Iterator .continually(tokenStream.incrementToken()) .takeWhile(identity) .map(_ => charAttr.toString) .toVector } finally { tokenStream.end() tokenStream.close() } } .map(word => (word, 1)) .reduceByKey((a, b) => a + b) .saveAsTextFiles("src/test/resources/output/files") sc.start()
先ほどのソケットを使った時と違うのは、StreamingContext#textFileStreamになったことですね。
val lines = sc.textFileStream("src/test/resources/input")
Word Countの部分と、最後にローカルファイルシステムに結果を保存しているところはほぼ同じです。保存先のパスが微妙に違うくらいです。
なお、「src/test/resources/input」ディレクトリ配下には、SparkのREADME.mdを置いています。
$ head src/test/resources/input/README.md # Apache Spark Spark is a fast and general cluster computing system for Big Data. It provides high-level APIs in Scala, Java, and Python, and an optimized engine that supports general computation graphs for data analysis. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Spark Streaming for stream processing. <http://spark.apache.org/>
ただ、ここでちょっとハマったのは、読み込むファイルのタイムスタンプですかね…。
// タイムスタンプ変更のために、StreamingContext開始後少し待つ TimeUnit.SECONDS.sleep(3L) Files.walkFileTree(Paths.get("src/test/resources/input"), new SimpleFileVisitor[file.Path] { override def visitFile(f: file.Path, attr: BasicFileAttributes): FileVisitResult = { Files.setLastModifiedTime(f, FileTime.from(LocalDateTime.now().toInstant(ZoneOffset.ofHours(9)))) FileVisitResult.CONTINUE } }) // しばらく待ち… TimeUnit.SECONDS.sleep(5L) } }
どうも、StreamingContextよりも(?)新しいものにしないと、読んでくれないみたいです。
※まあ、そりゃあそうだという気もしますが
で、今回のコードで動かした結果は、このように。
$ ll src/test/resources/output/files-1439126630000/ 合計 44 drwxrwxr-x 2 xxxxx xxxxx 4096 8月 9 22:23 ./ drwxrwxr-x 4 xxxxx xxxxx 4096 8月 9 22:23 ../ -rw-rw-r-- 1 xxxxx xxxxx 8 8月 9 22:23 ._SUCCESS.crc -rw-rw-r-- 1 xxxxx xxxxx 16 8月 9 22:23 .part-00000.crc -rw-rw-r-- 1 xxxxx xxxxx 12 8月 9 22:23 .part-00001.crc -rw-rw-r-- 1 xxxxx xxxxx 16 8月 9 22:23 .part-00002.crc -rw-rw-r-- 1 xxxxx xxxxx 16 8月 9 22:23 .part-00003.crc -rw-r--r-- 1 xxxxx xxxxx 0 8月 9 22:23 _SUCCESS -rw-r--r-- 1 xxxxx xxxxx 640 8月 9 22:23 part-00000 -rw-r--r-- 1 xxxxx xxxxx 504 8月 9 22:23 part-00001 -rw-r--r-- 1 xxxxx xxxxx 569 8月 9 22:23 part-00002 -rw-r--r-- 1 xxxxx xxxxx 621 8月 9 22:23 part-00003
中身。
$ head src/test/resources/output/files-1439126630000/part-00000 (detailed,2) (stream,1) (distribution,2) (automatedtesting,1) (return,2) (hive,2) (its,1) (guide,2) (level,2) (runs,1)
こちらも、OKそうな。
やってみて
けっこう予想以上にハマりました。
というか、Streamingみたいなものを、環境がほぼ止まったようなテストコード上でいきなりやろうとしたのがよくなかったのか、結果が安定しなかったりして、かなりてこずりました。
今度やる時は、Kafka、Flumeあたりを置いてやった方がいいのかなーと思いました。どれも使ったことないですけど!
まあ、APIの基礎みたいなものは少しは触れたと思うので、次はもうちょっと前へ進められるでしょう。