CLOVER🍀

That was when it all began.

ElasticsearchとKibanaでタグクラウドを作って遊ぶ(Elasticsearch Spark利用)

Elasticsearch Advent Calendarで出ていた、こちらのエントリを見て、面白そうだなぁと思いまして。

Elasticsearch、Logstash、Kibana、Kuromojiでタグクラウドを作る - Taste of Tech Topics

これを見てパッと思ったのは、タグクラウドにする単語を「名詞」にカテゴライズされるもので絞り込めたらなぁと。

ただ、KuromojiのPartOfSpeechFilterは「除外する品詞を指定する」ものになるので、これをやろうと思うとちょっと大変です。

なので、ここはちょっと方法を変えて、Elasticsearchに取り込む前に形態素解析することにしました。
※完全に遊んでいるネタなので、ご了承くださいませ

今回のテーマは、以下でやります。

余談

ちなみにこれ、エントリに起こすにあたり完全にやり直したもので、最初はGroovyでTwitter4J+Jestでインデックスに取り込む際に
手元でビルドした最新のLucene Kuromoji+NEologd辞書で遊んでいたものを、SparkやCodeLibsのKuromojiを使うように
置き換えたものです。

では、やってみましょう。

準備

まずは、sbtの定義から。
build.sbt

name := "tweet-to-es-with-reindex-spark"

version := "0.0.1-SNAPSHOT"

scalaVersion := "2.11.8"

organization := "org.littlewings"

scalacOptions ++= Seq("-Xlint", "-deprecation", "-unchecked", "-feature")

updateOptions := updateOptions.value.withCachedResolution(true)

resolvers += "CodeLibs Repository" at "http://maven.codelibs.org/"

libraryDependencies ++= Seq(
  "org.twitter4j" % "twitter4j-core" % "4.0.5" % Compile,
  "org.elasticsearch.client" % "rest" % "5.1.1" % Compile,
  "org.apache.spark" %% "spark-core" % "2.0.2" % Provided exclude("org.scalatest", "scalatest_2.11"),
  "org.elasticsearch" %% "elasticsearch-spark-20" % "5.1.1" % Compile exclude("org.scalatest", "scalatest_2.11"),
  "org.codelibs" % "lucene-analyzers-kuromoji-ipadic-neologd" % "6.2.1-20161201" % Compile,
  "org.scalatest" %% "scalatest" % "3.0.1" % Test
)

なぜかElasticsearchのREST Clientが入っていますが、これは再取り込み時にインデックスを削除するためだけに
使っています。コードの実行は、テストコードとして行うのでScalaTestを使用しています(テストそのものを書く
わけではありませんが)。

なお、ElasticsearchおよびKibanaは起動しているものとします。

Twitte4Jを使いますので、各種Keyをご用意ください。
src/test/resources/twitter4j.properties

oauth.consumerKey=<your-consumer-key>
oauth.consumerSecret=<your-consumer-secret>
oauth.accessToken=<your-access-token>
oauth.accessTokenSecret=<your-access-token-secret>

ツイートの取り込み

それでは、まずはツイートの取り込み側から作成します。できあがったコードは、こんな感じ。
src/test/scala/org/littlewings/elasticsearch/spark/TweetToEsSparkSuite.scala

package org.littlewings.elasticsearch.spark

import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.spark.rdd.EsSpark
import org.scalatest.FunSuite
import twitter4j.{Paging, TwitterFactory}

import scala.collection.JavaConverters._

class TweetToEsSparkSuite extends FunSuite {
  test("tweet to elasticsearch") {
    val screenName = "kazuhira_r"  // ツイート取得対象の、screen-name

    val pagingStart = 1
    val pagingEnd = Option.empty[Int] // 可能な限り取得。絞る場合は Option(n: Int)
    val limit = 200

    val elasticsearchHost = "localhost"
    val elasticsearchPort = 9200
    val indexName = "tweet"
    val indexType = s"${screenName}"

    val conf =
      new SparkConf()
        .setAppName("tweet to elasticsearch")
        .setMaster("local[*]")
        .set("es.nodes", elasticsearchHost)
        .set("es.port", elasticsearchPort.toString)
        .set("es.index.auto.create", "true")
        .set("es.mapping.id", "id")

    val sc = new SparkContext(conf)

    val twitter = TwitterFactory.getSingleton

    try {
      Iterator
        .from(pagingStart)
        .map(currentPage => new Paging(currentPage, limit))
        .map(paging => (paging, twitter.getUserTimeline(screenName, paging)))  // ツイートを取得
        .takeWhile { case (paging, responseList) =>
          // 取得可能な限り、もしくは指定の範囲繰り返しツイートを取得
          (pagingEnd.isEmpty || pagingEnd.map(p => paging.getPage <= p).getOrElse(false)) &&
            (responseList != null && !responseList.isEmpty)
        }
        .foreach { case (paging, responseList) => {
          val tweets =
            responseList
              .asScala
              .map { status =>
                // tweet statusから必要な情報を抽出
                Map(
                  "id" -> status.getId,
                  "screen_name" -> screenName,
                  "created_at" -> status.getCreatedAt,
                  "favorite_count" -> status.getFavoriteCount,
                  "retweeted_count" -> status.getRetweetCount,
                  "retweet" -> status.isRetweet.toString,
                  "text" -> status.getText
                )
              }

          // Elasticsearchに保存
          EsSpark.saveToEs(sc.makeRDD(tweets), s"${indexName}/${indexType}")
        }
        }
    } finally {
      sc.stop()
    }
  }
}

指定のscreen-nameのユーザーのUserTimelineを取得して

    val screenName = "kazuhira_r"  // ツイート取得対象の、screen-name

////////////////////

    try {
      Iterator
        .from(pagingStart)
        .map(currentPage => new Paging(currentPage, limit))
        .map(paging => (paging, twitter.getUserTimeline(screenName, paging)))  // ツイートを取得
        .takeWhile { case (paging, responseList) =>
          // 取得可能な限り、もしくは指定の範囲繰り返しツイートを取得
          (pagingEnd.isEmpty || pagingEnd.map(p => paging.getPage <= p).getOrElse(false)) &&
            (responseList != null && !responseList.isEmpty)
        }

////////////////////
        .foreach { case (paging, responseList) => {
          val tweets =
            responseList
              .asScala
              .map { status =>
                // tweet statusから必要な情報を抽出
                Map(
                  "id" -> status.getId,
                  "screen_name" -> screenName,
                  "created_at" -> status.getCreatedAt,
                  "favorite_count" -> status.getFavoriteCount,
                  "retweeted_count" -> status.getRetweetCount,
                  "retweet" -> status.isRetweet.toString,
                  "text" -> status.getText
                )
              }

Elasticsearchに取り込みます。

          // Elasticsearchに保存
          EsSpark.saveToEs(sc.makeRDD(tweets), s"${indexName}/${indexType}")

この時は、とりあえず取得できるツイートはすべて取得してインデックスに放り込んでいます。

インデックスとタイプの名前は、「tweet/[screen-name]」にしました。

    val indexName = "tweet"
    val indexType = s"${screenName}"

          EsSpark.saveToEs(sc.makeRDD(tweets), s"${indexName}/${indexType}")

このコードを実行すると、指定のscreen-nameのユーザーのタイムラインがElasticsearchに
取り込まれます。

形態素解析しつつ、別のインデックスを作成

では、形態素解析する側。
src/test/scala/org/littlewings/elasticsearch/spark/ReindexingEsSparkSuite.scala

package org.littlewings.elasticsearch.spark

import org.apache.http.HttpHost
import org.apache.lucene.analysis._
import org.apache.lucene.analysis.tokenattributes.CharTermAttribute
import org.apache.spark.{SparkConf, SparkContext}
import org.codelibs.neologd.ipadic.lucene.analysis.ja._
import org.codelibs.neologd.ipadic.lucene.analysis.ja.tokenattributes.PartOfSpeechAttribute
import org.elasticsearch.client.RestClient
import org.elasticsearch.spark.rdd.EsSpark
import org.scalatest.FunSuite

import scala.collection.JavaConverters._

object MorphologicalAnalyzer {
  // Analyzer
  val analyzer: Analyzer = new JapaneseAnalyzer

  // 形態素解析してSeqにして返す
  def toToken(text: String, filter: (String, AnalyzedTokenAttribute) => Boolean): Seq[String] = {
    val tokenStream = analyzer.tokenStream("", text)

    val tokenAttribute = AnalyzedTokenAttribute(
      charTerm = tokenStream.addAttribute(classOf[CharTermAttribute]),
      partOfSpeech = tokenStream.addAttribute(classOf[PartOfSpeechAttribute])  // 品詞
    )

    tokenStream.reset()

    try {
      Iterator
        .continually(tokenStream.incrementToken())
        .takeWhile(identity)
        .flatMap { _ =>
            val token = tokenAttribute.charTerm.toString

            // 対象とするTokenの絞り込み
            if (filter(token, tokenAttribute)) Some(token)
            else None
        }
        .toVector
    } finally {
      tokenStream.end()
      tokenStream.close()
    }
  }
}

// TokenStreamの属性を保持
case class AnalyzedTokenAttribute(charTerm: CharTermAttribute, partOfSpeech: PartOfSpeechAttribute)

class ReindexingEsSparkSuite extends FunSuite {
  test("reindexing es to spark") {
    val screenName = "kazuhira_r"

    val pagingStart = 1
    val pagingEnd = -1
    val limit = 200

    val elasticsearchHost = "localhost"
    val elasticsearchPort = 9200

    val srcIndexName = "tweet"
    val destIndexName = "tokenized_tweet"
    val indexType = s"${screenName}"

    // 前のインデックスはとりあえず削除
    val client = RestClient.builder(new HttpHost(elasticsearchHost, elasticsearchPort)).build
    try {
      client.performRequest("DELETE", s"/${destIndexName}", Map.empty[String, String].asJava)
    } catch {
      case e: Exception =>
        // ignore, index not found...
        e.printStackTrace()
    } finally {
      client.close()
    }

    val conf =
      new SparkConf()
        .setAppName("reindexing elasticsearch, with morphological")
        .setMaster("local[*]")
        .set("es.nodes", elasticsearchHost)
        .set("es.port", elasticsearchPort.toString)
        .set("es.index.auto.create", "true")
        .set("es.mapping.id", "id")

    val sc = new SparkContext(conf)

    try {
      val rdd = EsSpark.esRDD(sc, s"${srcIndexName}/${indexType}", "?q=*")

      val analyzedRdd =
        rdd
          .values
          .filter(tweet => !tweet("retweet").toString.toBoolean) // RTは除外
          .map(tweet => tweet +
          // srcのインデックスに入っていたテキストを、形態素解析してツイートに追加
          ("tokens" -> MorphologicalAnalyzer.toToken(
            tweet("text")
              .asInstanceOf[String]
              .replaceAll("""https?://\S+""", "") // URL削除
              .replaceAll("""@\S+""", ""), // twitter-id削除
            (token, analyzedTokenAttribute) =>
              token.size > 4 && // 長さ5文字以上
                analyzedTokenAttribute.partOfSpeech.getPartOfSpeech.contains("名詞") // 名詞のみで絞り込み
          ))
        )

      // Elasticsearchに保存
      EsSpark.saveToEs(analyzedRdd, s"${destIndexName}/${indexType}")

    } finally {
      sc.stop()
    }
  }
}

Sparkを使っている関係で、Analyzerはシリアライズしなくて済むようにobjectにまとめました。

object MorphologicalAnalyzer {
  // Analyzer
  val analyzer: Analyzer = new JapaneseAnalyzer

  // 形態素解析してSeqにして返す
  def toToken(text: String, filter: (String, AnalyzedTokenAttribute) => Boolean): Seq[String] = {
    val tokenStream = analyzer.tokenStream("", text)

    val tokenAttribute = AnalyzedTokenAttribute(
      charTerm = tokenStream.addAttribute(classOf[CharTermAttribute]),
      partOfSpeech = tokenStream.addAttribute(classOf[PartOfSpeechAttribute])  // 品詞
    )

    tokenStream.reset()

    try {
      Iterator
        .continually(tokenStream.incrementToken())
        .takeWhile(identity)
        .flatMap { _ =>
            val token = tokenAttribute.charTerm.toString

            // 対象とするTokenの絞り込み
            if (filter(token, tokenAttribute)) Some(token)
            else None
        }
        .toVector
    } finally {
      tokenStream.end()
      tokenStream.close()
    }
  }
}

// TokenStreamの属性を保持
case class AnalyzedTokenAttribute(charTerm: CharTermAttribute, partOfSpeech: PartOfSpeechAttribute)

で、Elasticsearch Sparkで最初に取り込んだインデックスからツイートを取り出し、形態素解析して新しいインデックスに
取り込みます。

      val rdd = EsSpark.esRDD(sc, s"${srcIndexName}/${indexType}", "?q=*")

      val analyzedRdd =
        rdd
          .values
          .filter(tweet => !tweet("retweet").toString.toBoolean) // RTは除外
          .map(tweet => tweet +
          // srcのインデックスに入っていたテキストを、形態素解析してツイートに追加
          ("tokens" -> MorphologicalAnalyzer.toToken(
            tweet("text")
              .asInstanceOf[String]
              .replaceAll("""https?://\S+""", "") // URL削除
              .replaceAll("""@\S+""", ""), // twitter-id削除
            (token, analyzedTokenAttribute) =>
              token.size > 4 && // 長さ5文字以上
                analyzedTokenAttribute.partOfSpeech.getPartOfSpeech.contains("名詞") // 名詞のみで絞り込み
          ))
        )

      // Elasticsearchに保存
      EsSpark.saveToEs(analyzedRdd, s"${destIndexName}/${indexType}")

URLを削除していたり、長さ5文字以上のツイートを対象にしているところは元エントリと同じですが、
RTは対象外にしたり、PartOfSpeechAttributeを使って「名詞」を含む品詞にのみ絞り込んでいます。

なお、取り込み先のインデックスは、今回は開始時に削除するようにしました。このためだけに、REST Clientを
使っています…。

    // 前のインデックスはとりあえず削除
    val client = RestClient.builder(new HttpHost(elasticsearchHost, elasticsearchPort)).build
    try {
      client.performRequest("DELETE", s"/${destIndexName}", Map.empty[String, String].asJava)
    } catch {
      case e: Exception =>
        // ignore, index not found...
        e.printStackTrace()
    } finally {
      client.close()
    }

取り込み先のインデックスの名前は、「tokenized_tweet」にしました。タイプの名前は同じです。

    val srcIndexName = "tweet"
    val destIndexName = "tokenized_tweet"
    val indexType = s"${screenName}"

では、こちらを実行して新しいインデックスに形態素解析および絞り込みを行ったツイートを取り込みます。

タグクラウドを作成する

それでは、Kibanaでタグクラウドを見てみましょう。

形態素解析したデータが入っている方のインデックスを、Kibanaで選択します。

あとは、時間の範囲を指定して「Visualize」から「Tag cloud」を選択して、形態素解析したフィールド「tokens」で
タグクラウドを作成します。

なんか、それっぽいのができました。
※自分のツイートで作っているので、内容については突っ込まないでください…

ところで、英単語が全部小文字になっているのと、「プレッシャー」のような単語の「ー」がないのは
LowerCaseFilterやJapaneseKatakanaStemFilterが適用されているからですね。

このあたりが気になるなら、Analyzerをカスタマイズしてもよいでしょう。先ほどはJapaneseAnalyzerをそのまま
使いましたが、次はJapaneseAnalyzerをバラしてJapaneseKatakanaStemFilterとLowerCaseFilterを外してみます。

  // Analyzer
  // val analyzer: Analyzer = new JapaneseAnalyzer
  val analyzer: Analyzer = new StopwordAnalyzerBase {
    override def createComponents(fieldName: String): TokenStreamComponents = {
      val tokenizer = new JapaneseTokenizer(null, true, JapaneseTokenizer.Mode.SEARCH)
      var tokenStream: TokenStream = new JapaneseBaseFormFilter(tokenizer)
      tokenStream = new JapanesePartOfSpeechStopFilter(tokenStream, JapaneseAnalyzer.getDefaultStopTags)
      tokenStream = new CJKWidthFilter(tokenStream)
      tokenStream = new StopFilter(tokenStream, JapaneseAnalyzer.getDefaultStopSet)
      // tokenStream = new JapaneseKatakanaStemFilter(tokenStream)
      // tokenStream = new LowerCaseFilter(tokenStream)
      new TokenStreamComponents(tokenizer, tokenStream)
    }
  }

その他、原型に戻されるのが嫌、などあれば、JapaneseTokenizerだけを使ったAnalyzerを使用するのも
よいかもしれません。

この定義で、再度形態素解析して取り込みなおすと、結果はこのようになります。

表記揺れがあった場合等は重複しちゃうでしょうけど、ちょっと自然っぽくなりましたね。

まとめ

元のエントリと違って、Elasticsearchの外(Spark側)でいろいろやってしまう感じにしてしまいましたが、
まあSparkで遊びたかったということで。

それにしても、Kibana便利ですね。

オマケ

最初に余談で書いていた、Groovyで作ったスクリプトも貼っておきます(CodeLibs Kuromojiを使うようには
しています)。こちらは取り込みと形態素解析を一緒にやってしまいますし、インデックスの削除もやりませんが
簡単に使うならこちら。

使い方は、screen-nameを起動引数に指定。

$ groovy tweet-to-elasticsearch.groovy cero_t

取り込まれ先のインデックス名は、「tweet」です。
tweet-to-elasticsearch.groovy

@Grab('io.searchbox:jest:2.0.4')
import io.searchbox.client.JestClientFactory
import io.searchbox.client.config.HttpClientConfig
import io.searchbox.core.Index

@Grab('org.twitter4j:twitter4j-core:4.0.5')
import twitter4j.Paging
import twitter4j.Status
import twitter4j.TwitterFactory

@GrabResolver(name = 'CodeLibs Repository', root = 'http://maven.codelibs.org/')
@Grab('org.codelibs:lucene-analyzers-kuromoji-ipadic-neologd:6.2.1-20161201')
import org.apache.lucene.analysis.tokenattributes.CharTermAttribute
import org.codelibs.neologd.ipadic.lucene.analysis.ja.JapaneseAnalyzer
import org.codelibs.neologd.ipadic.lucene.analysis.ja.tokenattributes.PartOfSpeechAttribute

def screenName = args[0]  // screen-nameは起動引数で
def currentPage = 1
def pagingStart = 1
def pagingEnd = -1
def limit = 200

def indexName = 'tweet'
def indexType = 'tweet_' + screenName.toLowerCase()
def elasticsearchUrl = 'http://localhost:9200'

def jestClientFactory = new JestClientFactory()
jestClientFactory.httpClientConfig =
  new HttpClientConfig.Builder(elasticsearchUrl).multiThreaded(true).build()
def jestClient = jestClientFactory.object

def twitter = TwitterFactory.singleton
def paging = new Paging(currentPage, limit)
def responseList = twitter.getUserTimeline(screenName, paging)

def analyzer = new JapaneseAnalyzer()

while (responseList) {
  println("Current page = $currentPage")

  responseList
    .grep { !it.retweet }
  //    .grep { !it.text.startsWith('@') }
    .each { status ->
      def id = status.id
      def favoriteCount = status.favoriteCount
      def retweetCount = status.retweetCount
      def createdAt = status.createdAt
      def text = status.text

      def textForAnalyze =
        text
        .replaceAll("https?://\\S+", '')
        .replaceAll("@\\S+", '')

      def tokenStream = analyzer.tokenStream('', textForAnalyze)
      def charTermAttr = tokenStream.addAttribute(CharTermAttribute)
      def partOfSpeechAttr = tokenStream.addAttribute(PartOfSpeechAttribute)

      tokens = []
      tokenStream.reset()

      while (tokenStream.incrementToken()) {
        def partOfSpeech = partOfSpeechAttr.partOfSpeech

        if (partOfSpeech.contains('名詞')) {// || partOfSpeech.contains('記号')) {
          def token = charTermAttr.toString()

          if (!token.contains('http') && !token.contains('https')) {
            if (token.size() > 4) {
              tokens << charTermAttr.toString()
            }
          }
        }
      }

      tokenStream.end()
      tokenStream.close()

      def tweet =
        [
          id: id,
          screen_name: screenName,
          favorite_count: favoriteCount,
          retweet_count: retweetCount,
          created_at: createdAt,
          text: text,
          tokens: tokens
        ]

      def index = new Index.Builder(tweet).index(indexName).type(indexType).id("${status.user.screenName}_${id}").build()
      jestClient.execute(index)
  }

  if (pagingEnd == -1 || currentPage <= pagingEnd) {
    currentPage += 1
    paging = new Paging(currentPage, limit)
    responseList = twitter.getUserTimeline(screenName, paging)
  } else {
    break
  }
}

println("Read page = $currentPage")


jestClient.shutdownClient()