CLOVER🍀

That was when it all began.

はじめてのSpark Streaming

今までは、SparkのStandalone ModeとYARN上で動かすといった動作環境的なものを少し試してきたので、もうちょっとSpark自体で遊んでみようと思いまして。

まずは、Spark Streamingの初歩的なところからやってみたいと思います。

Spark Streaming Programming Guide
http://spark.apache.org/docs/latest/streaming-programming-guide.html

とか言いながら、ちょっと寄り道してみたらめっちゃハマりました…。

Spark Streamingとは?

こちらを見る限り…

Overview
http://spark.apache.org/docs/latest/streaming-programming-guide.html#overview

Kafka、Flume、Twitter、ZeroMQ、Kinesis、そしてTCPソケットなどから継続的にデータを読み込み、map、reduce、joinやwindow関数を使用した後、処理結果をファイルシステムやデータベースに保存したり、ダッシュボードで使ったりする感じみたいですね。機械学習とかも睨んでいるようです。

が、今の自分にはあまり知識のない領域だったりします…。

準備

とりあえず、動かす環境を用意します。今回は、テストコード上でSpark Streamingを動かすことにします。

build.sbtは、このようにしました。
build.sbt

name := "spark-streaming-simple"

version := "0.0.1-SNAPSHOT"

scalaVersion := "2.10.4"

organization := "org.littlewings"

scalacOptions ++= Seq("-Xlint", "-deprecation", "-unchecked", "-feature")

updateOptions := updateOptions.value.withCachedResolution(true)

fork in Test := true

parallelExecution in Test := false

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-streaming" % "1.4.1" % "test",
  "org.slf4j" % "slf4j-api" % "1.7.10" % "test",
  "org.apache.lucene" % "lucene-analyzers-common" % "5.2.1" % "test",
  "org.scalatest" %% "scalatest" % "2.2.5" % "test"
)

「spark-streaming」というモジュールを利用します。

そして、今回もSparkのREADME.mdをWord Countしようと思いますので、LuceneのAnalyzerが依存関係に引っ付いています。

Spark Streamingを使ったプログラムを書く

実行は、ScalaTestと合わせて行うことにしました。とはいえ、別にテスト自体をしているわけではなく、動かす環境がテストコードだ、というだけなのですが。

雛形は、こんな感じです。
src/test/scala/org/littlewings/spark/SimpleStreamingSpec.scala

package org.littlewings.spark

import java.io.File
import java.net.ServerSocket
import java.nio.charset.StandardCharsets
import java.nio.file
import java.nio.file.attribute.{BasicFileAttributes, FileTime}
import java.nio.file.{FileVisitResult, Files, Paths, SimpleFileVisitor}
import java.time.{LocalDateTime, ZoneOffset}
import java.util.concurrent.TimeUnit

import org.apache.commons.io.FileUtils
import org.apache.lucene.analysis.standard.StandardAnalyzer
import org.apache.lucene.analysis.tokenattributes.CharTermAttribute
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Duration, Durations, StreamingContext}
import org.scalatest.FunSpec

import scala.io.Source

class SimpleStreamingSpec extends FunSpec {
  describe("SimpleStreamingSpec") {
    FileUtils.deleteDirectory(new File("src/test/resources/output"))

    // ここに、テストを書く!
  }

  protected def withStreaming(f: StreamingContext => Unit): Unit = {
    val conf = new SparkConf().setMaster("local[4]").setAppName("Simple Spark Streaming")
    val sc = new StreamingContext(conf, Durations.seconds(5L))

    try {
      f(sc)

    } finally {
      sc.stop(true)
    }
  }
}

Spark Streamingでは、SparkContextに代わりStreamingContextを使用するようです(SparkContextを内部に持っているようですが)。

StreamingContextに渡している第2引数は、読み込み対象へのアクセス間隔っぽいです。

    val sc = new StreamingContext(conf, Durations.seconds(5L))

今回は、5秒としました。

最後に、StreamingContext#stopを行います。

      sc.stop(true)

引数にtrueを指定することで、内部のSparkContextも停止してくれます。また、第2引数を取るものもあり、こちらはGraceful Shutdown的な感じみたいです。

それでは、これらを使って今回はソケットとローカルファイルシステムからデータを読み取ってWord Countし、結果をローカルファイルに保存してみます。

まずはソケットから。

とりあえず、StreamingContextを作成して

    it("with socket text stream") {
      withStreaming { sc =>

簡単なサーバーを書いてみます。

        val readmeFromGitHub: String = {
          val source = Source.fromURL("https://raw.githubusercontent.com/apache/spark/v1.4.1/README.md", "UTF-8")
          val text = source.mkString
          source.close()
          text
        }

        val thread = new Thread {
          override def run(): Unit = {
            Iterator
              .continually(serverSocket.accept())
              .foreach { socket =>
              val os = socket.getOutputStream
              readmeFromGitHub.getBytes(StandardCharsets.UTF_8).foreach(b => os.write(b.asInstanceOf[Int]))

              // 少し時間調整…
              TimeUnit.SECONDS.sleep(2L)

              os.flush()
              socket.close()
            }
          }
        }
        thread.start()

        // ServerSocketの起動待ち
        TimeUnit.SECONDS.sleep(1L)

このサーバーが返す内容は、SparkのREADME.mdをGitHubより取得したものにしています。

このサーバーに対して、StreamingContext#socketStreamで接続するよう定義します。

        val lines = sc.socketTextStream("localhost", 9999)

Word Countするように処理を書き…

        lines
          .flatMap { line =>
          val analyzer = new StandardAnalyzer
          val tokenStream = analyzer.tokenStream("", line)
          val charAttr = tokenStream.addAttribute(classOf[CharTermAttribute])

          tokenStream.reset()

          try {
            Iterator
              .continually(tokenStream.incrementToken())
              .takeWhile(identity)
              .map(_ => charAttr.toString)
              .toVector
          } finally {
            tokenStream.end()
            tokenStream.close()
          }
        }
          .map(word => (word, 1))
          .reduceByKey((a, b) => a + b)
          .saveAsTextFiles("src/test/resources/output/sendSocket")

StreamingContext#startします。

        sc.start()

ここですぐプログラムを抜けてしまうと、Streamingの処理を何も行わずに抜けてしまうので、ちょっとスリープしておきます。

        // しばらく待ち…
        TimeUnit.SECONDS.sleep(3L)

        serverSocket.close()
      }
    }

最後に、一応ServerSocketも閉じておきますよ、と。

結果は、「src/test/resources/output/sendSocket」で始まるディレクトリ内に、以下のような感じで置かれます。Streaming処理を行った回数だけ、このディレクトリができるんでしょうか…。

$ ll src/test/resources/output/sendSocket-1439125530000/
合計 44
drwxrwxr-x 2 xxxxx xxxxx 4096  89 22:05 ./
drwxrwxr-x 4 xxxxx xxxxx 4096  89 22:05 ../
-rw-rw-r-- 1 xxxxx xxxxx    8  89 22:05 ._SUCCESS.crc
-rw-rw-r-- 1 xxxxx xxxxx   16  89 22:05 .part-00000.crc
-rw-rw-r-- 1 xxxxx xxxxx   12  89 22:05 .part-00001.crc
-rw-rw-r-- 1 xxxxx xxxxx   16  89 22:05 .part-00002.crc
-rw-rw-r-- 1 xxxxx xxxxx   16  89 22:05 .part-00003.crc
-rw-r--r-- 1 xxxxx xxxxx    0  89 22:05 _SUCCESS
-rw-r--r-- 1 xxxxx xxxxx  640  89 22:05 part-00000
-rw-r--r-- 1 xxxxx xxxxx  504  89 22:05 part-00001
-rw-r--r-- 1 xxxxx xxxxx  569  89 22:05 part-00002
-rw-r--r-- 1 xxxxx xxxxx  621  89 22:05 part-00003

中身の一部。

$ head src/test/resources/output/sendSocket-1439125530000/part-00000
(detailed,2)
(stream,1)
(distribution,2)
(automatedtesting,1)
(return,2)
(hive,2)
(its,1)
(guide,2)
(level,2)
(runs,1)

OKそうです。

続いて、ローカルファイルシステムのファイルから読み取ってみます。

      withStreaming { sc =>
        val lines = sc.textFileStream("src/test/resources/input")

        lines
          .flatMap { line =>
          val analyzer = new StandardAnalyzer
          val tokenStream = analyzer.tokenStream("", line)
          val charAttr = tokenStream.addAttribute(classOf[CharTermAttribute])

          tokenStream.reset()

          try {
            Iterator
              .continually(tokenStream.incrementToken())
              .takeWhile(identity)
              .map(_ => charAttr.toString)
              .toVector
          } finally {
            tokenStream.end()
            tokenStream.close()
          }
        }
          .map(word => (word, 1))
          .reduceByKey((a, b) => a + b)
          .saveAsTextFiles("src/test/resources/output/files")

        sc.start()

先ほどのソケットを使った時と違うのは、StreamingContext#textFileStreamになったことですね。

        val lines = sc.textFileStream("src/test/resources/input")

Word Countの部分と、最後にローカルファイルシステムに結果を保存しているところはほぼ同じです。保存先のパスが微妙に違うくらいです。

なお、「src/test/resources/input」ディレクトリ配下には、SparkのREADME.mdを置いています。

$ head src/test/resources/input/README.md 
# Apache Spark

Spark is a fast and general cluster computing system for Big Data. It provides
high-level APIs in Scala, Java, and Python, and an optimized engine that
supports general computation graphs for data analysis. It also supports a
rich set of higher-level tools including Spark SQL for SQL and structured
data processing, MLlib for machine learning, GraphX for graph processing,
and Spark Streaming for stream processing.

<http://spark.apache.org/>

ただ、ここでちょっとハマったのは、読み込むファイルのタイムスタンプですかね…。

        // タイムスタンプ変更のために、StreamingContext開始後少し待つ
        TimeUnit.SECONDS.sleep(3L)

        Files.walkFileTree(Paths.get("src/test/resources/input"), new SimpleFileVisitor[file.Path] {
          override def visitFile(f: file.Path, attr: BasicFileAttributes): FileVisitResult = {
            Files.setLastModifiedTime(f, FileTime.from(LocalDateTime.now().toInstant(ZoneOffset.ofHours(9))))
            FileVisitResult.CONTINUE
          }
        })

        // しばらく待ち…
        TimeUnit.SECONDS.sleep(5L)
      }
    }

どうも、StreamingContextよりも(?)新しいものにしないと、読んでくれないみたいです。
※まあ、そりゃあそうだという気もしますが

で、今回のコードで動かした結果は、このように。

$ ll src/test/resources/output/files-1439126630000/
合計 44
drwxrwxr-x 2 xxxxx xxxxx 4096  89 22:23 ./
drwxrwxr-x 4 xxxxx xxxxx 4096  89 22:23 ../
-rw-rw-r-- 1 xxxxx xxxxx    8  89 22:23 ._SUCCESS.crc
-rw-rw-r-- 1 xxxxx xxxxx   16  89 22:23 .part-00000.crc
-rw-rw-r-- 1 xxxxx xxxxx   12  89 22:23 .part-00001.crc
-rw-rw-r-- 1 xxxxx xxxxx   16  89 22:23 .part-00002.crc
-rw-rw-r-- 1 xxxxx xxxxx   16  89 22:23 .part-00003.crc
-rw-r--r-- 1 xxxxx xxxxx    0  89 22:23 _SUCCESS
-rw-r--r-- 1 xxxxx xxxxx  640  89 22:23 part-00000
-rw-r--r-- 1 xxxxx xxxxx  504  89 22:23 part-00001
-rw-r--r-- 1 xxxxx xxxxx  569  89 22:23 part-00002
-rw-r--r-- 1 xxxxx xxxxx  621  89 22:23 part-00003

中身。

$ head src/test/resources/output/files-1439126630000/part-00000
(detailed,2)
(stream,1)
(distribution,2)
(automatedtesting,1)
(return,2)
(hive,2)
(its,1)
(guide,2)
(level,2)
(runs,1)

こちらも、OKそうな。

やってみて

けっこう予想以上にハマりました。

というか、Streamingみたいなものを、環境がほぼ止まったようなテストコード上でいきなりやろうとしたのがよくなかったのか、結果が安定しなかったりして、かなりてこずりました。

今度やる時は、Kafka、Flumeあたりを置いてやった方がいいのかなーと思いました。どれも使ったことないですけど!

まあ、APIの基礎みたいなものは少しは触れたと思うので、次はもうちょっと前へ進められるでしょう。