個人的に、Streamingで読み込む対象の用意?に四苦八苦しているSpark Streamingです(笑)。
そのうちKafkaに手を出してみたいなぁと思いつつも、ここはいったん簡単なものだけ試してみることにしました。
spark-streaming-twitterを使って、Twitterからツイートを読み出してみようと思います。
Linking
http://spark.apache.org/docs/latest/streaming-programming-guide.html#linking
Advanced Sources
http://spark.apache.org/docs/latest/streaming-programming-guide.html#advanced-sources
「spark-streaming-twitter」を使うと、TwitterからStreamingして読み出し続けられる、と。中身は、どうやらTwitter4Jを使っているみたいです。
具体的には、「twitter4j.TwitterStream」です。
というわけで、今回はTwitter上のトレンドからツイートをフィルタリングして、収集対象となったツイートからWord Countしてみたいと思います。
準備
ビルド定義。
build.sbt
name := "spark-streaming-twitter" version := "0.0.1" scalaVersion := "2.11.7" organization := "org.littlewings" scalacOptions ++= Seq("-Xlint", "-deprecation", "-unchecked", "-feature") updateOptions := updateOptions.value.withCachedResolution(true) libraryDependencies ++= Seq( "org.apache.spark" %% "spark-streaming" % "1.4.1" % "provided", "org.apache.spark" %% "spark-streaming-twitter" % "1.4.1" exclude("org.spark-project.spark", "unused"), "org.apache.lucene" % "lucene-analyzers-common" % "5.2.1", "org.apache.lucene" % "lucene-analyzers-kuromoji" % "5.2.1" )
「spark-streaming-twitter」はもちろん必要ですが、「spark-streaming」も明示的に追加する必要があります。
sbt-assemblyも使います。
project/plugins.sbt
logLevel := Level.Warn addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.13.0")
build.sbtのSpark関連のライブラリに、こんな感じでexcludeが入っているのですが
"org.apache.spark" %% "spark-streaming-twitter" % "1.4.1" exclude("org.spark-project.spark", "unused"),
これは最初、依存関係定義をこうしていたら
libraryDependencies ++= Seq( "org.apache.spark" %% "spark-streaming" % "1.4.1" % "provided", "org.apache.spark" %% "spark-streaming-twitter" % "1.4.1", "org.apache.lucene" % "lucene-analyzers-common" % "5.2.1", "org.apache.lucene" % "lucene-analyzers-kuromoji" % "5.2.1" )
assembly時にClassが衝突してしまったからです…。
[error] 1 error was encountered during merge [trace] Stack trace suppressed: run last *:assembly for the full output. [error] (*:assembly) deduplicate: different file contents found in the following: [error] /path/to/.ivy2/cache/org.apache.spark/spark-streaming-twitter_2.11/jars/spark-streaming-twitter_2.11-1.4.1.jar:org/apache/spark/unused/UnusedStubClass.class [error] /path/to/.ivy2/cache/org.spark-project.spark/unused/jars/unused-1.0.0.jar:org/apache/spark/unused/UnusedStubClass.class
どうも、「org.apache.spark.unused.UnusedStubClass」というクラスが、「org.spark-project.spark/unused」というアーティファクトと「org.apache.spark/spark-streaming-twitter」の両方に含まれてしまっているようです。
なので、「spark-streaming-twitter」から除去しましたよ、と。
プログラムを書く
それでは、TwitterからStreamingしてみるプログラムを書いてみます。
結果は、このように。
src/main/scala/org/littlewings/spark/TwitterStreaming.scala
package org.littlewings.spark import org.apache.lucene.analysis.ja.JapaneseAnalyzer import org.apache.lucene.analysis.tokenattributes.CharTermAttribute import org.apache.spark.SparkConf import org.apache.spark.streaming.twitter.TwitterUtils import org.apache.spark.streaming.{Durations, StreamingContext} object TwitterStreaming { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("Twitter Streaming") val ssc = new StreamingContext(conf, Durations.minutes(1L)) val filter = if (args.isEmpty) Nil else args.toList val stream = TwitterUtils.createStream(ssc, None, filter) stream .flatMap { status => val text = status.getText val analyzer = new JapaneseAnalyzer val tokenStream = analyzer.tokenStream("", text) val charAttr = tokenStream.addAttribute(classOf[CharTermAttribute]) tokenStream.reset() try { Iterator .continually(tokenStream.incrementToken()) .takeWhile(identity) .map(_ => charAttr.toString) .toVector } finally { tokenStream.end() tokenStream.close() } } .map(word => (word, 1)) .reduceByKey((a, b) => a + b) .saveAsTextFiles("output/tweet") ssc.start() ssc.awaitTermination() } }
Streaming時のクエリ(ツイートの絞り込み)は、起動引数でもらうようにしました。
val filter = if (args.isEmpty) Nil else args.toList val stream = TwitterUtils.createStream(ssc, None, filter)
ここでのクエリは、Twitter4JのFilterQueryクラスに渡されます。
https://github.com/apache/spark/blob/v1.4.1/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala#L91
Luceneの形態素解析とWord Countの部分は省略。
今回は、結果をローカルディスクに書き出すようにしました。
.saveAsTextFiles("output/tweet")
ここまでやったら、Streaming開始のコードを書いておしまい。
ssc.start() ssc.awaitTermination()
Twitter4Jを使うので、各種Key等が必要です。今回は、twitter4j.propertiesに書きました。
src/main/resources/twitter4j.properties
oauth.consumerKey=<your-consumer-key> oauth.consumerSecret=<your-consumer-secret> oauth.accessToken=<your-access-token> oauth.accessTokenSecret=<your-access-token-secret>
コードの参考)
https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterAlgebirdCMS.scala
https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala
あとは、assembyして
> assembly
JARファイルを作成。
[info] SHA-1: 8a6f95d20169a836e47d82d3bbac3c8d227208a2 [info] Packaging /path/to/spark-streaming-twitter/target/scala-2.11/spark-streaming-twitter-assembly-0.0.1.jar ... [info] Done packaging. [success] Total time: 5 s, completed 2015/08/19 0:30:25
ここまでで、準備完了です。
実行する
それでは、実行してみましょう。
今回は、「#あなたがリア充になれない理由ベスト10」とします。
それでは、Spark Submitで実行。
$ spark-1.4.1-bin-hadoop2.6/bin/spark-submit --class org.littlewings.spark.TwitterStreaming --master local[4] /path/to/target/scala-2.11/spark-streaming-twitter-assembly-0.0.1.jar '#あなたがリア充になれない理由ベスト10'
走り出したジョブは、ずっと続くので適当にCtrl-Cで止めてください。
結果は、このように。
$ ll output/tweet-1439996100000 合計 44 drwxrwxr-x 2 xxxxx xxxxx 4096 8月 19 23:55 ./ drwxrwxr-x 3 xxxxx xxxxx 4096 8月 19 23:55 ../ -rw-rw-r-- 1 xxxxx xxxxx 8 8月 19 23:55 ._SUCCESS.crc -rw-rw-r-- 1 xxxxx xxxxx 16 8月 19 23:55 .part-00000.crc -rw-rw-r-- 1 xxxxx xxxxx 16 8月 19 23:55 .part-00001.crc -rw-rw-r-- 1 xxxxx xxxxx 16 8月 19 23:55 .part-00002.crc -rw-rw-r-- 1 xxxxx xxxxx 16 8月 19 23:55 .part-00003.crc -rw-r--r-- 1 xxxxx xxxxx 0 8月 19 23:55 _SUCCESS -rw-r--r-- 1 xxxxx xxxxx 826 8月 19 23:55 part-00000 -rw-r--r-- 1 xxxxx xxxxx 798 8月 19 23:55 part-00001 -rw-r--r-- 1 xxxxx xxxxx 782 8月 19 23:55 part-00002 -rw-r--r-- 1 xxxxx xxxxx 975 8月 19 23:55 part-00003
なんか、ファイルができています。
一応、できてるっぽいです。
$ head output/tweet-1439996100000/part-00001 (定例,1) (エルレ,1) (内,1) (モン,1) (uezw,1) (コミケ,4) (エロ,1) (病院,1) (不明,1) (ウェア,1) (科目,1) (中,1) (こっち,1) (接客,1) (9,19) (マン,1) (変,1) (電池,1) (不,1) (早,1) (位,181) (樹,1) (駅,1) (ez,1) (服,1) (1,18) (ワロタ,1) (ペダル,2) (離れる,1) (利,1) (ひる,1) (ガロ,1) (彩,1) (劇場,1) (eater,1) (ゆる,1) (了解,1) (蛍,1) (ど,1) (大喜,1) (5,20) (うん,1) (ダンジョン,1) (アン,1) (拡,1) (よろしく,1) (バット,1) (プレ,4) (理解,1) (祭り,1) (ハジ,1) (りん,1) (なれる,18) (日,2) (デジ,1) (ド,1) (ばら,1) (q,1) (笑,6) (充,21) (ヒードラン,1) (業,1) (ズボン,1) (てる,1) (笑う,1) (何,1) (かご,1) (三,1) (ちゃん,2) (行方,1) (次回,1) (ラオウ,1) (cx,1) (せい,1) (ガチ,2)
Kuromojiにmecab-ipadic-NEologdを適用してWord Countする
オマケとして、Word Countするのに普通のLucene Kuromojiではなくて、mecab-ipadic-NEologdを適用したバージョンで行ってみましょう。
mecab-ipadic-NEologdは、20150817日付の版を使っています。
build.sbtはこのように修正して、オリジナルのLucene Kuromojiを外します。
libraryDependencies ++= Seq( "org.apache.spark" %% "spark-streaming" % "1.4.1" % "provided", "org.apache.spark" %% "spark-streaming-twitter" % "1.4.1" exclude("org.spark-project.spark", "unused"), "org.apache.lucene" % "lucene-analyzers-common" % "5.2.1" // "org.apache.lucene" % "lucene-analyzers-kuromoji" % "5.2.1" )
プロジェクトのlibディレクトリに、mecab-ipadic-NEologdを適用したLucene Kuromojiを放り込みます。
$ mkdir <プロジェクトのディレクトリ(/path/to>/lib $ cp lucene-analyzers-kuromoji-ipadic-neologd-5.2.1-20150817-SNAPSHOT.jar /path/to/lib/
sbt-assemblyでパッケージング。この時に、自分で入れたKuromojiも一緒にJARに入ります。
> assembly
もう1度実行。
$ spark-1.4.1-bin-hadoop2.6/bin/spark-submit --class org.littlewings.spark.TwitterStreaming --master local[4] /path/to/target/scala-2.11/spark-streaming-twitter-assembly-0.0.1.jar '#あなたがリア充になれない理由ベスト10'
結果のうちの、1ファイル。
$ cat output/tweet-1439996580000/part-00002 (最終,1) (要は,1) (紀伊,1) (xxcds,1) (女子力,2) (タケ,1) (同人誌,1) (浮き沈み,1) (チュウ,1) (バイブ,1) (島,1) (フォロワ,2) (笑ってはいけない,1) (別に,2) (グリモア,2) (キュヒョン,1) (やめる,1) (coinen,1) (サッカー選手,1) (キル,2) (限定,1) (ぶっちゃけ,1) (wi,1) (strpy,1) (ありがとう,3) (ゲーム,1) (rt,1) (ちぃ,1) (凶器,1) (71,1) (佐野,1) (バイクの日,1) (少女,2) (誕生,1) (ぼっち,2) (リサとガスパール,1) (ダメ人間,1) (オーバーマスタ,1) (ネイマール,1) (白猫,1) (リフレク,1) (脳内,1) (銀魂,2) (スタバ,1) (社会学,1) (http://,22) (ゆかりん,1) (しょう,2) (風立ちぬ,1) (マック,1) (期間限定,1) (連チャン,1) (表示,5) (同人,1) (高校野球,1) (可能,2) (6,24) (綾子,1) (就活,1) (スクショ,1) (2,24) (クシナダ,1) (拡散,1) (募集中,1) (じゃなくて,1) (猫ちゃん,1) (サクシード,1) (松本山雅,1) (コラボ,1) (z,2) (わからん,1) (dz,1) (ちーちゃん,1) (談話,1) (駐車,1) (35,1) (オタク,1) (一人,3) (ちょ,1) (安倍談話,1) (ゲーセン,1) (タメ,1) (f,1) (酒,1) (選手権,1) (顔面,1) (おはようございます,1) (駐車場,1) (学,1) (分かる,1) (うまるちゃん,1) (kxcahzwzb,1) (東京大会,1) (回,1)
よりナチュラルになった感じが…!