CLOVER🍀

That was when it all began.

Apache Bahirが提供する、Apache Spark向けのコネクター(DStream for Twitter)を試す

これは、なにをしたくて書いたもの?

  • ちょっとした用事で、Apache SparkのTwitter Streamingを使おうかと思ったものの、なくなっていることに気づく
  • どうやらApache Spark 2.0.0で、他のコネクターと一緒になくなったらしい
  • Apache Bahirというプロジェクトに移っているようなので、そちらを試してみようと

Apache Bahir?

Apache Bahirという、分散分析プラットフォーム向けのエクステンションを提供するプロジェクトがあるそうです。

Apache Bahir

Apache Bahir、トップレベルプロジェクトへ - ビッグデータ処理を強化 | マイナビニュース

中身を覗いてみると、現時点でApache SparkとApache Flinkのエクステンション(コネクター)があります。

Apache Spark 2.0.0で削除されたSpark StreamingのTwitter、Akka、MQTT、ZeroMQなどのコネクター
こちらに移っているようです。

Spark Release 2.0.0 | Apache Spark

というわけで、今回はApache Bahirから提供されている、Apache Spark向けのTwitterネクターを使ってみます。

とりあえず、動かしてみることが目標です。

環境

今回の環境は、こちら。

$ java -version
openjdk version "1.8.0_191"
OpenJDK Runtime Environment (build 1.8.0_191-8u191-b12-2ubuntu0.18.04.1-b12)
OpenJDK 64-Bit Server VM (build 25.191-b12, mixed mode)


> sbtVersion
[info] 1.2.8

準備

sbtでの依存関係の定義。

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-streaming" % "2.3.2" % Provided,
  "org.apache.bahir" %% "spark-streaming-twitter" % "2.3.2" % Compile,
  "org.scalatest" %% "scalatest" % "3.0.5" % Test
)

現時点でApache Bahirで提供されているApache Spark向けのコネクターの最新版は、Apache Spark 2.3.2向けに
提供されているものなので、こちらを使います。なお、Apache Spark自体の最新版は2.4.0です。

なので、利用するScalaのバージョンも2.11です(Apache Spark 2.4.0で、Scala 2.12が使えそうです)。

「spark-streaming」は、「spark-streaming-twitter」からはprovidedで指定してあるので、自分で使う時には
providedで依存関係に追加します。

ScalaTestを使うのは、テストコード側で起動すると、providedなスコープのものも起動時にクラスパスに入るようなので
それを利用するためです。

サンプルコード

Twitterから「spark」の入ったツイートを、Streamingで取得して表示し続けるだけのサンプル。 src/test/scala/org/littlewings/spark/bahir/TwitterStreamingSpec.scala

package org.littlewings.spark.bahir

import org.apache.spark.SparkConf
import org.apache.spark.streaming.twitter.TwitterUtils
import org.apache.spark.streaming.{Minutes, StreamingContext}
import org.scalatest.FunSuite

class TwitterStreamingSpec extends FunSuite {
  test("use Bahir Twitter Streaming") {
    val conf =
      new SparkConf().setMaster("local[*]").setAppName("tweet-streaming")
    val ssc = new StreamingContext(conf, Minutes(1L))

    val stream = TwitterUtils.createStream(ssc, None, Array("spark"))

    stream
      .foreachRDD { rdd =>
        rdd.foreach { status =>
          println(s"[${status.getCreatedAt} / ${status.getId} / ${status.getText}")
        }
      }

    ssc.start()
    ssc.awaitTermination()
  }
}

twitter4j.propertiesは、省略します。

非常にあっさりですが、ふつうに使えますね。
※というか、アーティファクトのグループIDはApache Sparkとは異なるものの、パッケージ名はApache Sparkのままですね

とりあえずApache Bahirのドキュメントを読もうとしたものの、ほとんど情報は載っていませんでした…。

Spark Streaming Twitter

Apache Bahir 2.3.2の時点でも、これだけのコネクターがあります。Apache Spark本体からなくなってしまったものなどで、
Apache Bahir側から提供されているものがあれば、使ってみるとよいかもしれません。

  • Apache Spark extensions

    • Spark data source for Apache CouchDB/Cloudant
    • Spark Structured Streaming data source for Akka
    • Spark Structured Streaming data source for MQTT
    • Spark DStream connector for Apache CouchDB/Cloudant
    • Spark DStream connector for Akka
    • Spark DStream connector for Google Cloud Pub/Sub
    • Spark DStream connector for PubNub
    • Spark DStream connector for MQTT
    • Spark DStream connector for Twitter
    • Spark DStream connector for ZeroMQ
  • Apache Flink extensions

    • Flink streaming connector for ActiveMQ
    • Flink streaming connector for Akka
    • Flink streaming connector for Flume
    • Flink streaming connector for InfluxDB
    • Flink streaming connector for Kudu
    • Flink streaming connector for Redis
    • Flink streaming connector for Netty