CLOVER🍀

That was when it all began.

ElasticsearchのApache Sparkサポート機能で遊ぶ

Elasticsearchに、Apache Spark向けのライブラリがあることは知っていたのですが、長らく手をつけていないままだったので、1度試してみることにしました。

Apache Spark support | Elasticsearch for Apache Hadoop [2.3] | Elastic

こちらを使うことで、Apache Sparkが提供するAPIをElasticsearchで使うことができるようになるみたいですね。内部的には、elasticsearch-hadoopに依存している模様。

日本語記事もあるようです。

楽しい可視化 : elasticsearchとSpark Streamingの出会い | NTTデータ先端技術株式会社

で、何をするかですが、まあ…Spark StreamingとTwitterですかね。今回は、以下のテーマでやってみることにしました。

  • Spark StreamingでTwitterからストリームでの取得を「#nowplaying」ハッシュタグで行い、Elasticsearchに書き込む。この時、ハッシュタグのみを抽出して、Arrayなフィールドとして保存します
  • 保存した結果を、Kibanaでハッシュタグの数を参照
  • Spark RDDでElasticsearchからデータを取得し、ハッシュタグの数をカウントし、そのうち上位5つを取り出す
  • Spark DataFrame APIとSpark SQLで、RDDと同じことを行う

Elasticsearchに登録するデータは、こういう構成とします。

id - Long
user_name - String
screen_name - String
tweet_text - String
hashtags - Array[String]
created_at - Timestamp

Stringなフィールドは、いずれもAnalyzeしません。インデックスの定義自体は、後述します。で、これに対して読み書きすると。

では、始めてみましょう。

準備

まずは、環境準備します。

ElasticsearchとKibanaは、初期状態で起動しているものとします。

アプリケーションのビルド定義は、以下のようにしました。
build.sbt

name := "spark-streaming-twitter-to-es"

version := "0.0.1-SNAPSHOT"

scalaVersion := "2.11.8"

organization := "org.littlewings"

scalacOptions ++= Seq("-Xlint", "-deprecation", "-unchecked", "-feature")

updateOptions := updateOptions.value.withCachedResolution(true)

fork in Test := false

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-streaming" % "1.6.1" % "provided",
  "org.apache.spark" %% "spark-streaming-twitter" % "1.6.1" % "provided",
  "org.elasticsearch" %% "elasticsearch-spark" % "2.3.2",
  "org.scalatest" %% "scalatest" % "2.2.6" % "test"
)

アプリケーション自体は、テストコード上で動かすものとします。

インデックステンプレートは以下のようにして
non-analyze-index-template.json

{
  "template": "spark*",
  "settings": {
    "index": {
      "number_of_shards": 1,
      "number_of_replicas": 0
    }
  },
  "mappings": {
    "_default_": {
      "_all": { "enabled": false },
      "_source": { "enabled": true },
      "dynamic_templates": [ {
          "string_template": {
            "mapping": {
              "index": "not_analyzed",
              "type": "string",
              "store": "yes"
            },
            "match_mapping_type": "string",
            "match": "*"
          }
      } ]
    } 
  }
}

Elasticsearchに登録。

$ curl -XPUT http://localhost:9200/_template/spark -d @non-analyze-index-template.json

あと、Spark StreamingのTwitterを使う機能は裏でTwitter4jを使っているので、twitter4j.propertiesも用意しておきます。
src/test/resources/twitter4j.properties

oauth.consumerKey=<your consumer-key>
oauth.consumerSecret=<your consumer-secret>
oauth.accessToken=<your access-token>
oauth.accessTokenSecret=<your access-token-secret>

それでは、ここからコードを書いていきます。

Spark StreamingでTwitterからデータを取り込み、Elasticsearchに放り込む

このあたりを見ながら設定を行い、コードを書いていきましょう。

Configuration

Writing data to Elasticsearch

で、できあがったコードはこちら。
src/test/scala/org/littlewings/elasticsearch/sparkstreaming/TwitterStreamingToEsSpec.scala

package org.littlewings.elasticsearch.sparkstreaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.twitter.TwitterUtils
import org.apache.spark.streaming.{Durations, StreamingContext}
import org.elasticsearch.spark.rdd.EsSpark
import org.scalatest.{FunSpec, Matchers}

class TwitterStreamingToEsSpec extends FunSpec with Matchers {
  describe("Spark Streaming Twitter") {
    it("write to Elasticsearch") {
      val conf =
        new SparkConf()
          .setAppName("Spark Streaming Twitter to Elasticsearch")
          .setMaster("local[*]")
          .set("es.nodes", "localhost")
          .set("es.port", "9200")
          .set("es.index.auto.create", "true")
          .set("es.mapping.id", "id")

      val ssc = new StreamingContext(conf, Durations.seconds(10))

      val filters = Array("#nowplaying")
      val stream = TwitterUtils.createStream(ssc, None, filters)

      stream
        .map { status =>
          val text = status.getText

          Map(
            "id" -> status.getId,
            "user_name" -> status.getUser.getName,
            "screen_name" -> status.getUser.getScreenName,
            "tweet_text" -> text,
            "hashtags" -> text.split("[  \\r\\n]+").filter(_.startsWith("#")).filter(_.size > 1).map(_.toLowerCase).distinct,
            "created_at" -> status.getCreatedAt
          )
        }
        .foreachRDD(rdd => EsSpark.saveToEs(rdd, "spark/twitter_streaming"))

      ssc.start()
      ssc.awaitTermination()
    }
  }
}

最初に、SparkConfで設定をしています。

      val conf =
        new SparkConf()
          .setAppName("Spark Streaming Twitter to Elasticsearch")
          .setMaster("local[*]")
          .set("es.nodes", "localhost")
          .set("es.port", "9200")
          .set("es.index.auto.create", "true")
          .set("es.mapping.id", "id")

この中で、ElasticsearchとSparkの連携に関する部分を設定できます。今回は、接続先の明示やインデックスの自動作成、ドキュメントのidへのマッピング(今回はidフィールドを利用)を設定しています。

設定については、こちらを参照してください。

Configuration | Elasticsearch for Apache Hadoop [2.3] | Elastic

そして、10秒おきに「#nowplaying」ハッシュタグが含まれるツイートを取得して

      val ssc = new StreamingContext(conf, Durations.seconds(10))

      val filters = Array("#nowplaying")
      val stream = TwitterUtils.createStream(ssc, None, filters)

Mapに変換します。この時、ハッシュタグのみツイート本体とは別に抽出して、シーケンスとして保存します。単純化のために、ハッシュタグはすべて小文字に変換しています。

        .map { status =>
          val text = status.getText

          Map(
            "id" -> status.getId,
            "user_name" -> status.getUser.getName,
            "screen_name" -> status.getUser.getScreenName,
            "tweet_text" -> text,
            "hashtags" -> text.split("[  \\r\\n]+").filter(_.startsWith("#")).filter(_.size > 1).map(_.toLowerCase).distinct,
            "created_at" -> status.getCreatedAt
          )

最後に、Elasticsearchに保存します。今回のElasticsearchへの保存先は、「spark/twitter_streaming」です。

        .foreachRDD(rdd => EsSpark.saveToEs(rdd, "spark/twitter_streaming"))

Implicit Conversionを使ってもよかったのですが、今回はEsSparkを直接使いました。

Implicit Conversionを使う場合は、こういう感じになるみたいですね。

import org.elasticsearch.spark._

...

rdd.saveToEs("spark/twitter_streaming")

これで、アプリケーションを起動すると、Twitterからツイートを取得してElasticsearchに保存するバッチ処理が延々と続きます。適当なところで止めましょう。

結果をKibanaで確認すると、こんな感じになりました。ハッシュタグのTermの個数でカウントしています。

#nowplayingハッシュタグが1番多いのはもっともですが、その他のハッシュタグもカウントされています。
2番目に多いのは、#listenliveのようです。この時点で、86個ですね。

では、これと同じ集計をSparkのAPIでやってみましょう。

のちに、このお題を選んだことを後悔しましたけれど。

Spark RDD(基本的なAPI)を使った集計

では、最初にRDDを使って集計してみたいと思います。

ドキュメントは、こちらを参考にします。

Reading data from Elasticsearch

ドキュメントどおりにImplicit Conversionを使ってもよいのですが、今回は裏にいるEsSparkを直接使いました。

できあがったコードは、こんな感じになりました。
src/test/scala/org/littlewings/elasticsearch/sparkstreaming/ReadFromEsSpec.scala

package org.littlewings.elasticsearch.sparkstreaming

import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.spark.rdd.EsSpark
import org.scalatest.{FunSpec, Matchers}

class ReadFromEsSpec extends FunSpec with Matchers {
  describe("Read From Elasticsearch") {
    it("hashtags top 5") {
      val conf =
        new SparkConf()
          .setAppName("read data from Elasticsearch")
          .setMaster("local[*]")
          .set("es.nodes", "localhost")
          .set("es.port", "9200")

      val sc = new SparkContext(conf)
      val rdd = EsSpark.esRDD(sc, "spark/twitter_streaming", "?q=*")

      val top5Hashtags =
        rdd
          .values
          .map(_.get("hashtags").asInstanceOf[Option[Seq[String]]])
          .map(_.getOrElse(Seq.empty))
          .filter(_.nonEmpty)
          .flatMap(hs => hs.map(h => (h -> 1)))
          .foldByKey(0)((acc, cur) => acc + cur)
          .sortBy(_._2, false)
          .take(5)

      top5Hashtags should have size (5)
      top5Hashtags(0) should be(("#nowplaying" -> 741))
      top5Hashtags(1) should be(("#listenlive" -> 86))
      top5Hashtags(2) should be(("#nowplaying:" -> 19))
      top5Hashtags(3) should be(("#radio" -> 18))
      top5Hashtags(4) should be(("#music" -> 18))

      sc.stop()
    }
  }
}

SparkConfの部分は、Streamingの時と同じなので割愛します。

SparkContext生成後、EsSparkを使ってRDDを生成します。この時、「インデックス/タイプ」の指定と、クエリが必要です。今回は「?q=*」なので全件対象です。

      val sc = new SparkContext(conf)
      val rdd = EsSpark.esRDD(sc, "spark/twitter_streaming", "?q=*")

最初、クエリの「?」の部分を付けるのを忘れていて、ちょっとハマりました…。

その後は、インデックスから取得したエントリ(の値の部分のみ)からStreamingで保存した「hashtags」フィールドをSeqとして抽出し、個々のハッシュタグにexplodeで分解、ハッシュタグと個数(初期値なので1)に変換してからfoldByKeyで集計します。

      val top5Hashtags =
        rdd
          .values
          .map(_.get("hashtags").asInstanceOf[Option[Seq[String]]])
          .map(_.getOrElse(Seq.empty))
          .filter(_.nonEmpty)
          .flatMap(hs => hs.map(h => (h -> 1)))
          .foldByKey(0)((acc, cur) => acc + cur)
          .sortBy(_._2, false)
          .take(5)

ソート後、上位5件を取得して完成です。

結果。Kibanaで見た時と、同じですね。#listenliveハッシュタグが86個あることになっています。

      top5Hashtags should have size (5)
      top5Hashtags(0) should be(("#nowplaying" -> 741))
      top5Hashtags(1) should be(("#listenlive" -> 86))
      top5Hashtags(2) should be(("#nowplaying:" -> 19))
      top5Hashtags(3) should be(("#radio" -> 18))
      top5Hashtags(4) should be(("#music" -> 18))

Spark DataFrame API & Spark SQL

最後に、DataFrame APIとSpark SQLを使って、同じことをやってみます。

Spark SQL support

hashtagsをexplodeするのに、とてもとてもハマりました…。

なにせ、初DataFrame API、Spark SQLです…。

ここは、両方一気に進めてしまいます。まず、テストコード全体の雛形から。

src/test/scala/org/littlewings/elasticsearch/sparkstreaming/DataFrameAndSqlFromEsSpec.scala 
package org.littlewings.elasticsearch.sparkstreaming

import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types._
import org.apache.spark.{SparkConf, SparkContext}
import org.scalatest.{FunSpec, Matchers}

class DataFrameAndSqlFromEsSpec extends FunSpec with Matchers {
  describe("Spark DataFrame and SQL with Elasticsearch") {
    // ここに、テストを書く!
  }
}
DataFrame API

最初に、DataFrame APIを使ったコードを載せます。

    it("DataFrame: hashtags top 5") {
      val conf =
        new SparkConf()
          .setAppName("DataFrame from Elasticsearch")
          .setMaster("local[*]")
          .set("es.nodes", "localhost")
          .set("es.port", "9200")

      val sc = new SparkContext(conf)
      val sqlContext = new SQLContext(sc)

      import sqlContext.implicits._

      val schema = StructType(Array(
        StructField("id", LongType, false),
        StructField("user_name", StringType, false),
        StructField("screen_name", StringType, false),
        StructField("tweet_text", StringType, false),
        StructField("hashtags", ArrayType(StringType), true),
        StructField("created_at", TimestampType, false)
      ))

      val tweetDf =
        sqlContext
          .read
          .format("es")
          .schema(schema)
          .load("spark/twitter_streaming")
          .cache()

      val top5Hashtags =
        tweetDf
          .explode("hashtags", "hashtag")((hashtags: Seq[String]) => hashtags)
          .groupBy('hashtag)
          .count()
          .orderBy('count.desc)
          .take(5)
          .map(row => (row.getString(0), row.getLong(1)))

      top5Hashtags should have size (5)
      top5Hashtags(0) should be(("#nowplaying" -> 741))
      top5Hashtags(1) should be(("#listenlive" -> 86))
      top5Hashtags(2) should be(("#nowplaying:" -> 19))
      top5Hashtags(3) should be(("#radio" -> 18))
      top5Hashtags(4) should be(("#music" -> 18))

      sc.stop()
    }

SQLContextを生成して、importを行うところまでは通常のSpark SQLの使い方と同じみたいです。

スキーマ定義は必須ではないようですが、今回の使い方だとhashtagsフィールドをStringだと推測されてしまうようで、以下のような例外が飛ぶようになっていたので明示することにしました。

Caused by: scala.MatchError: Buffer(#np, #nowplaying) (of class scala.collection.convert.Wrappers$JListWrapper)

定義したスキーマ

      val schema = StructType(Array(
        StructField("id", LongType, false),
        StructField("user_name", StringType, false),
        StructField("screen_name", StringType, false),
        StructField("tweet_text", StringType, false),
        StructField("hashtags", ArrayType(StringType), true),
        StructField("created_at", TimestampType, false)
      ))

こちらを使って、DataFrameを取得します。

      val tweetDf =
        sqlContext
          .read
          .format("es")
          .schema(schema)
          .load("spark/twitter_streaming")
          .cache()

format("es")で、Elasticsearchからデータを取得する意味となります。loadでは、インデックス名とタイプ名を指定します。

あとは、取得したデータからハッシュタグの集計を行います。

      val top5Hashtags =
        tweetDf
          .explode("hashtags", "hashtag")((hashtags: Seq[String]) => hashtags)
          .groupBy('hashtag)
          .count()
          .orderBy('count.desc)
          .take(5)
          .map(row => (row.getString(0), row.getLong(1)))

hashtagsがArrayなフィールドのため、explodeして1度平たく持つようにして、そこからgroupByするようにしています。

結果は、RDDを直接使っていた場合と同じです。

参考)
Xinh's Tech Blog: Reading JSON data in Spark DataFrames
Solved: Explode function in Data Frames - Cloudera Community

Spark SQL

続いて、Spark SQLSQL文を書く形でコードを書いてみます。

いきなり結果から。できあがったコードは、こちらです。

    it("SQL: hashtags top 10") {
      val conf =
        new SparkConf()
          .setAppName("DataFrame from Elasticsearch")
          .setMaster("local[*]")
          .set("es.nodes", "localhost")
          .set("es.port", "9200")

      val sc = new SparkContext(conf)
      val sqlContext = new SQLContext(sc)

      sqlContext.sql(
        """|CREATE TEMPORARY TABLE twitter_streaming
          |USING es
          |OPTIONS (resource 'spark/twitter_streaming', es.read.field.as.array.include 'hashtags')""".stripMargin
      )

      val twitterDf =
        sqlContext.sql(
          """|SELECT h.hashtag, COUNT(h.hashtag)
            |FROM (SELECT explode(hashtags) AS hashtag FROM twitter_streaming) AS h
            |GROUP BY h.hashtag
            |ORDER BY COUNT(h.hashtag) DESC
            |LIMIT 5""".stripMargin)

      val top5Hashtags =
        twitterDf
          .map(row => (row.getString(0) -> row.getLong(1)))
          .collect()

      top5Hashtags should have size (5)
      top5Hashtags(0) should be(("#nowplaying" -> 741))
      top5Hashtags(1) should be(("#listenlive" -> 86))
      top5Hashtags(2) should be(("#nowplaying:" -> 19))
      top5Hashtags(3) should be(("#radio" -> 18))
      top5Hashtags(4) should be(("#music" -> 18))

      sc.stop()
    }

CREATE TEMPORARY TABLEするようなのですが、(RDDからDataFrameを作らない場合は?)DataFrame APIの時と違ってスキーマが指定できないようなので「es.read.field.as.array.include」でhashtagsフィールドを強制的にArrayとして扱うようにしました。

      sqlContext.sql(
        """|CREATE TEMPORARY TABLE twitter_streaming
          |USING es
          |OPTIONS (resource 'spark/twitter_streaming', es.read.field.as.array.include 'hashtags')""".stripMargin
      )

あとは、SQLを書いて集計です。

      val twitterDf =
        sqlContext.sql(
          """|SELECT h.hashtag, COUNT(h.hashtag)
            |FROM (SELECT explode(hashtags) AS hashtag FROM twitter_streaming) AS h
            |GROUP BY h.hashtag
            |ORDER BY COUNT(h.hashtag) DESC
            |LIMIT 5""".stripMargin)

explodeをどうやって扱うかすごく困りましたが、サブクエリにすればいいことにちょっと気づき、その形で実装。

結果としては、RDDやDataFrame APIと同じ結果になって一安心…。

参考)
sql - Does SparkSQL support subquery? - Stack Overflow

まとめ

はじめてElasticsearchのApache Spark向けの機能を使ってみましたが、最初にお題をAggregation的なものに置いてしまったのでだいぶてこずりましたが、なんとか通せました。

スキーマの扱いとかは、ちょっとした注意ポイントになる感じでしょうかね。あとは、DataFrame APIやSpark SQLもはじめてだったので、それなりに大変でした…。

とはいえ、慣れればそれなりに使えそうなので、Apache Spark自体と合わせてちょっとずつ習熟していきたいですね。