CLOVER🍀

That was when it all began.

Spark StreamingでTwitterからツイートを読み出す

個人的に、Streamingで読み込む対象の用意?に四苦八苦しているSpark Streamingです(笑)。

そのうちKafkaに手を出してみたいなぁと思いつつも、ここはいったん簡単なものだけ試してみることにしました。

spark-streaming-twitterを使って、Twitterからツイートを読み出してみようと思います。

Linking
http://spark.apache.org/docs/latest/streaming-programming-guide.html#linking

Advanced Sources
http://spark.apache.org/docs/latest/streaming-programming-guide.html#advanced-sources

「spark-streaming-twitter」を使うと、TwitterからStreamingして読み出し続けられる、と。中身は、どうやらTwitter4Jを使っているみたいです。

具体的には、「twitter4j.TwitterStream」です。

というわけで、今回はTwitter上のトレンドからツイートをフィルタリングして、収集対象となったツイートからWord Countしてみたいと思います。

準備

ビルド定義。
build.sbt

name := "spark-streaming-twitter"

version := "0.0.1"

scalaVersion := "2.11.7"

organization := "org.littlewings"

scalacOptions ++= Seq("-Xlint", "-deprecation", "-unchecked", "-feature")

updateOptions := updateOptions.value.withCachedResolution(true)

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-streaming" % "1.4.1" % "provided",
  "org.apache.spark" %% "spark-streaming-twitter" % "1.4.1" exclude("org.spark-project.spark", "unused"),
  "org.apache.lucene" % "lucene-analyzers-common" % "5.2.1",
  "org.apache.lucene" % "lucene-analyzers-kuromoji" % "5.2.1"
)

「spark-streaming-twitter」はもちろん必要ですが、「spark-streaming」も明示的に追加する必要があります。

sbt-assemblyも使います。
project/plugins.sbt

logLevel := Level.Warn

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.13.0")

build.sbtのSpark関連のライブラリに、こんな感じでexcludeが入っているのですが

  "org.apache.spark" %% "spark-streaming-twitter" % "1.4.1" exclude("org.spark-project.spark", "unused"),

これは最初、依存関係定義をこうしていたら

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-streaming" % "1.4.1" % "provided",
  "org.apache.spark" %% "spark-streaming-twitter" % "1.4.1",
  "org.apache.lucene" % "lucene-analyzers-common" % "5.2.1",
  "org.apache.lucene" % "lucene-analyzers-kuromoji" % "5.2.1"
)

assembly時にClassが衝突してしまったからです…。

[error] 1 error was encountered during merge
[trace] Stack trace suppressed: run last *:assembly for the full output.
[error] (*:assembly) deduplicate: different file contents found in the following:
[error] /path/to/.ivy2/cache/org.apache.spark/spark-streaming-twitter_2.11/jars/spark-streaming-twitter_2.11-1.4.1.jar:org/apache/spark/unused/UnusedStubClass.class
[error] /path/to/.ivy2/cache/org.spark-project.spark/unused/jars/unused-1.0.0.jar:org/apache/spark/unused/UnusedStubClass.class

どうも、「org.apache.spark.unused.UnusedStubClass」というクラスが、「org.spark-project.spark/unused」というアーティファクトと「org.apache.spark/spark-streaming-twitter」の両方に含まれてしまっているようです。

なので、「spark-streaming-twitter」から除去しましたよ、と。

あ、Word Countには、LuceneのKuromojiで形態素解析します。

プログラムを書く

それでは、TwitterからStreamingしてみるプログラムを書いてみます。

結果は、このように。
src/main/scala/org/littlewings/spark/TwitterStreaming.scala

package org.littlewings.spark

import org.apache.lucene.analysis.ja.JapaneseAnalyzer
import org.apache.lucene.analysis.tokenattributes.CharTermAttribute
import org.apache.spark.SparkConf
import org.apache.spark.streaming.twitter.TwitterUtils
import org.apache.spark.streaming.{Durations, StreamingContext}

object TwitterStreaming {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("Twitter Streaming")
    val ssc = new StreamingContext(conf, Durations.minutes(1L))

    val filter = if (args.isEmpty) Nil else args.toList
    val stream = TwitterUtils.createStream(ssc, None, filter)

    stream
      .flatMap { status =>
      val text = status.getText

      val analyzer = new JapaneseAnalyzer
      val tokenStream = analyzer.tokenStream("", text)
      val charAttr = tokenStream.addAttribute(classOf[CharTermAttribute])

      tokenStream.reset()

      try {
        Iterator
          .continually(tokenStream.incrementToken())
          .takeWhile(identity)
          .map(_ => charAttr.toString)
          .toVector
      } finally {
        tokenStream.end()
        tokenStream.close()
      }
    }
      .map(word => (word, 1))
      .reduceByKey((a, b) => a + b)
      .saveAsTextFiles("output/tweet")

    ssc.start()
    ssc.awaitTermination()
  }
}

Streaming時のクエリ(ツイートの絞り込み)は、起動引数でもらうようにしました。

    val filter = if (args.isEmpty) Nil else args.toList
    val stream = TwitterUtils.createStream(ssc, None, filter)

ここでのクエリは、Twitter4JのFilterQueryクラスに渡されます。
https://github.com/apache/spark/blob/v1.4.1/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala#L91

Lucene形態素解析とWord Countの部分は省略。

今回は、結果をローカルディスクに書き出すようにしました。

      .saveAsTextFiles("output/tweet")

ここまでやったら、Streaming開始のコードを書いておしまい。

    ssc.start()
    ssc.awaitTermination()

Twitter4Jを使うので、各種Key等が必要です。今回は、twitter4j.propertiesに書きました。
src/main/resources/twitter4j.properties

oauth.consumerKey=<your-consumer-key>
oauth.consumerSecret=<your-consumer-secret>
oauth.accessToken=<your-access-token>
oauth.accessTokenSecret=<your-access-token-secret>

コードの参考)
https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala
https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala

あとは、assembyして

> assembly

JARファイルを作成。

[info] SHA-1: 8a6f95d20169a836e47d82d3bbac3c8d227208a2
[info] Packaging /path/to/spark-streaming-twitter/target/scala-2.11/spark-streaming-twitter-assembly-0.0.1.jar ...
[info] Done packaging.
[success] Total time: 5 s, completed 2015/08/19 0:30:25

ここまでで、準備完了です。

実行する

それでは、実行してみましょう。

キーワードは、とりあえずトレンドから選んでみますか。

今回は、「#あなたがリア充になれない理由ベスト10」とします。

それでは、Spark Submitで実行。

$ spark-1.4.1-bin-hadoop2.6/bin/spark-submit --class org.littlewings.spark.TwitterStreaming --master local[4] /path/to/target/scala-2.11/spark-streaming-twitter-assembly-0.0.1.jar '#あなたがリア充になれない理由ベスト10'

走り出したジョブは、ずっと続くので適当にCtrl-Cで止めてください。

結果は、このように。

$ ll output/tweet-1439996100000
合計 44
drwxrwxr-x 2 xxxxx xxxxx 4096  819 23:55 ./
drwxrwxr-x 3 xxxxx xxxxx 4096  819 23:55 ../
-rw-rw-r-- 1 xxxxx xxxxx    8  819 23:55 ._SUCCESS.crc
-rw-rw-r-- 1 xxxxx xxxxx   16  819 23:55 .part-00000.crc
-rw-rw-r-- 1 xxxxx xxxxx   16  819 23:55 .part-00001.crc
-rw-rw-r-- 1 xxxxx xxxxx   16  819 23:55 .part-00002.crc
-rw-rw-r-- 1 xxxxx xxxxx   16  819 23:55 .part-00003.crc
-rw-r--r-- 1 xxxxx xxxxx    0  819 23:55 _SUCCESS
-rw-r--r-- 1 xxxxx xxxxx  826  819 23:55 part-00000
-rw-r--r-- 1 xxxxx xxxxx  798  819 23:55 part-00001
-rw-r--r-- 1 xxxxx xxxxx  782  819 23:55 part-00002
-rw-r--r-- 1 xxxxx xxxxx  975  819 23:55 part-00003

なんか、ファイルができています。

一応、できてるっぽいです。

$ head output/tweet-1439996100000/part-00001
(定例,1)
(エルレ,1)
(内,1)
(モン,1)
(uezw,1)
(コミケ,4)
(エロ,1)
(病院,1)
(不明,1)
(ウェア,1)
(科目,1)
(中,1)
(こっち,1)
(接客,1)
(9,19)
(マン,1)
(変,1)
(電池,1)
(不,1)
(早,1)
(位,181)
(樹,1)
(駅,1)
(ez,1)
(服,1)
(1,18)
(ワロタ,1)
(ペダル,2)
(離れる,1)
(利,1)
(ひる,1)
(ガロ,1)
(彩,1)
(劇場,1)
(eater,1)
(ゆる,1)
(了解,1)
(蛍,1)
(ど,1)
(大喜,1)
(5,20)
(うん,1)
(ダンジョン,1)
(アン,1)
(拡,1)
(よろしく,1)
(バット,1)
(プレ,4)
(理解,1)
(祭り,1)
(ハジ,1)
(りん,1)
(なれる,18)
(日,2)
(デジ,1)
(ド,1)
(ばら,1)
(q,1)
(笑,6)
(充,21)
(ヒードラン,1)
(業,1)
(ズボン,1)
(てる,1)
(笑う,1)
(何,1)
(かご,1)
(三,1)
(ちゃん,2)
(行方,1)
(次回,1)
(ラオウ,1)
(cx,1)
(せい,1)
(ガチ,2)

Kuromojiにmecab-ipadic-NEologdを適用してWord Countする

オマケとして、Word Countするのに普通のLucene Kuromojiではなくて、mecab-ipadic-NEologdを適用したバージョンで行ってみましょう。

mecab-ipadic-NEologdは、20150817日付の版を使っています。

build.sbtはこのように修正して、オリジナルのLucene Kuromojiを外します。

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-streaming" % "1.4.1" % "provided",
  "org.apache.spark" %% "spark-streaming-twitter" % "1.4.1" exclude("org.spark-project.spark", "unused"),
  "org.apache.lucene" % "lucene-analyzers-common" % "5.2.1"
  // "org.apache.lucene" % "lucene-analyzers-kuromoji" % "5.2.1"
)

プロジェクトのlibディレクトリに、mecab-ipadic-NEologdを適用したLucene Kuromojiを放り込みます。

$ mkdir <プロジェクトのディレクトリ(/path/to>/lib
$ cp lucene-analyzers-kuromoji-ipadic-neologd-5.2.1-20150817-SNAPSHOT.jar /path/to/lib/

sbt-assemblyでパッケージング。この時に、自分で入れたKuromojiも一緒にJARに入ります。

> assembly

もう1度実行。

$ spark-1.4.1-bin-hadoop2.6/bin/spark-submit --class org.littlewings.spark.TwitterStreaming --master local[4] /path/to/target/scala-2.11/spark-streaming-twitter-assembly-0.0.1.jar '#あなたがリア充になれない理由ベスト10'

結果のうちの、1ファイル。

$ cat output/tweet-1439996580000/part-00002 
(最終,1)
(要は,1)
(紀伊,1)
(xxcds,1)
(女子力,2)
(タケ,1)
(同人誌,1)
(浮き沈み,1)
(チュウ,1)
(バイブ,1)
(島,1)
(フォロワ,2)
(笑ってはいけない,1)
(別に,2)
(グリモア,2)
(キュヒョン,1)
(やめる,1)
(coinen,1)
(サッカー選手,1)
(キル,2)
(限定,1)
(ぶっちゃけ,1)
(wi,1)
(strpy,1)
(ありがとう,3)
(ゲーム,1)
(rt,1)
(ちぃ,1)
(凶器,1)
(71,1)
(佐野,1)
(バイクの日,1)
(少女,2)
(誕生,1)
(ぼっち,2)
(リサとガスパール,1)
(ダメ人間,1)
(オーバーマスタ,1)
(ネイマール,1)
(白猫,1)
(リフレク,1)
(脳内,1)
(銀魂,2)
(スタバ,1)
(社会学,1)
(http://,22)
(ゆかりん,1)
(しょう,2)
(風立ちぬ,1)
(マック,1)
(期間限定,1)
(連チャン,1)
(表示,5)
(同人,1)
(高校野球,1)
(可能,2)
(6,24)
(綾子,1)
(就活,1)
(スクショ,1)
(2,24)
(クシナダ,1)
(拡散,1)
(募集中,1)
(じゃなくて,1)
(猫ちゃん,1)
(サクシード,1)
(松本山雅,1)
(コラボ,1)
(z,2)
(わからん,1)
(dz,1)
(ちーちゃん,1)
(談話,1)
(駐車,1)
(35,1)
(オタク,1)
(一人,3)
(ちょ,1)
(安倍談話,1)
(ゲーセン,1)
(タメ,1)
(f,1)
(酒,1)
(選手権,1)
(顔面,1)
(おはようございます,1)
(駐車場,1)
(学,1)
(分かる,1)
(うまるちゃん,1)
(kxcahzwzb,1)
(東京大会,1)
(回,1)

よりナチュラルになった感じが…!