Elasticsearch Advent Calendarで出ていた、こちらのエントリを見て、面白そうだなぁと思いまして。
Elasticsearch、Logstash、Kibana、Kuromojiでタグクラウドを作る - Taste of Tech Topics
これを見てパッと思ったのは、タグクラウドにする単語を「名詞」にカテゴライズされるもので絞り込めたらなぁと。
ただ、KuromojiのPartOfSpeechFilterは「除外する品詞を指定する」ものになるので、これをやろうと思うとちょっと大変です。
なので、ここはちょっと方法を変えて、Elasticsearchに取り込む前に形態素解析することにしました。
※完全に遊んでいるネタなので、ご了承くださいませ
今回のテーマは、以下でやります。
- Elasticsearchにツイートを愚直に取り込む(ツイートの取得はTwitter4JでUserTimelineを、取り込みはElasticsearch Sparkで)
- 取り込んだインデックスをSparkで取り出し、形態素解析して別のインデックスに取り込む
- 形態素解析にはmecab-ipadic-NEologdを適用したKuromoji(http://maven.codelibs.org/org/codelibs/lucene-analyzers-kuromoji-ipadic-neologd/)を使用する
余談
ちなみにこれ、エントリに起こすにあたり完全にやり直したもので、最初はGroovyでTwitter4J+Jestでインデックスに取り込む際に
手元でビルドした最新のLucene Kuromoji+NEologd辞書で遊んでいたものを、SparkやCodeLibsのKuromojiを使うように
置き換えたものです。
では、やってみましょう。
準備
まずは、sbtの定義から。
build.sbt
name := "tweet-to-es-with-reindex-spark" version := "0.0.1-SNAPSHOT" scalaVersion := "2.11.8" organization := "org.littlewings" scalacOptions ++= Seq("-Xlint", "-deprecation", "-unchecked", "-feature") updateOptions := updateOptions.value.withCachedResolution(true) resolvers += "CodeLibs Repository" at "http://maven.codelibs.org/" libraryDependencies ++= Seq( "org.twitter4j" % "twitter4j-core" % "4.0.5" % Compile, "org.elasticsearch.client" % "rest" % "5.1.1" % Compile, "org.apache.spark" %% "spark-core" % "2.0.2" % Provided exclude("org.scalatest", "scalatest_2.11"), "org.elasticsearch" %% "elasticsearch-spark-20" % "5.1.1" % Compile exclude("org.scalatest", "scalatest_2.11"), "org.codelibs" % "lucene-analyzers-kuromoji-ipadic-neologd" % "6.2.1-20161201" % Compile, "org.scalatest" %% "scalatest" % "3.0.1" % Test )
なぜかElasticsearchのREST Clientが入っていますが、これは再取り込み時にインデックスを削除するためだけに
使っています。コードの実行は、テストコードとして行うのでScalaTestを使用しています(テストそのものを書く
わけではありませんが)。
なお、ElasticsearchおよびKibanaは起動しているものとします。
Twitte4Jを使いますので、各種Keyをご用意ください。
src/test/resources/twitter4j.properties
oauth.consumerKey=<your-consumer-key> oauth.consumerSecret=<your-consumer-secret> oauth.accessToken=<your-access-token> oauth.accessTokenSecret=<your-access-token-secret>
ツイートの取り込み
それでは、まずはツイートの取り込み側から作成します。できあがったコードは、こんな感じ。
src/test/scala/org/littlewings/elasticsearch/spark/TweetToEsSparkSuite.scala
package org.littlewings.elasticsearch.spark import org.apache.spark.{SparkConf, SparkContext} import org.elasticsearch.spark.rdd.EsSpark import org.scalatest.FunSuite import twitter4j.{Paging, TwitterFactory} import scala.collection.JavaConverters._ class TweetToEsSparkSuite extends FunSuite { test("tweet to elasticsearch") { val screenName = "kazuhira_r" // ツイート取得対象の、screen-name val pagingStart = 1 val pagingEnd = Option.empty[Int] // 可能な限り取得。絞る場合は Option(n: Int) val limit = 200 val elasticsearchHost = "localhost" val elasticsearchPort = 9200 val indexName = "tweet" val indexType = s"${screenName}" val conf = new SparkConf() .setAppName("tweet to elasticsearch") .setMaster("local[*]") .set("es.nodes", elasticsearchHost) .set("es.port", elasticsearchPort.toString) .set("es.index.auto.create", "true") .set("es.mapping.id", "id") val sc = new SparkContext(conf) val twitter = TwitterFactory.getSingleton try { Iterator .from(pagingStart) .map(currentPage => new Paging(currentPage, limit)) .map(paging => (paging, twitter.getUserTimeline(screenName, paging))) // ツイートを取得 .takeWhile { case (paging, responseList) => // 取得可能な限り、もしくは指定の範囲繰り返しツイートを取得 (pagingEnd.isEmpty || pagingEnd.map(p => paging.getPage <= p).getOrElse(false)) && (responseList != null && !responseList.isEmpty) } .foreach { case (paging, responseList) => { val tweets = responseList .asScala .map { status => // tweet statusから必要な情報を抽出 Map( "id" -> status.getId, "screen_name" -> screenName, "created_at" -> status.getCreatedAt, "favorite_count" -> status.getFavoriteCount, "retweeted_count" -> status.getRetweetCount, "retweet" -> status.isRetweet.toString, "text" -> status.getText ) } // Elasticsearchに保存 EsSpark.saveToEs(sc.makeRDD(tweets), s"${indexName}/${indexType}") } } } finally { sc.stop() } } }
指定のscreen-nameのユーザーのUserTimelineを取得して
val screenName = "kazuhira_r" // ツイート取得対象の、screen-name //////////////////// try { Iterator .from(pagingStart) .map(currentPage => new Paging(currentPage, limit)) .map(paging => (paging, twitter.getUserTimeline(screenName, paging))) // ツイートを取得 .takeWhile { case (paging, responseList) => // 取得可能な限り、もしくは指定の範囲繰り返しツイートを取得 (pagingEnd.isEmpty || pagingEnd.map(p => paging.getPage <= p).getOrElse(false)) && (responseList != null && !responseList.isEmpty) } //////////////////// .foreach { case (paging, responseList) => { val tweets = responseList .asScala .map { status => // tweet statusから必要な情報を抽出 Map( "id" -> status.getId, "screen_name" -> screenName, "created_at" -> status.getCreatedAt, "favorite_count" -> status.getFavoriteCount, "retweeted_count" -> status.getRetweetCount, "retweet" -> status.isRetweet.toString, "text" -> status.getText ) }
Elasticsearchに取り込みます。
// Elasticsearchに保存 EsSpark.saveToEs(sc.makeRDD(tweets), s"${indexName}/${indexType}")
この時は、とりあえず取得できるツイートはすべて取得してインデックスに放り込んでいます。
インデックスとタイプの名前は、「tweet/[screen-name]」にしました。
val indexName = "tweet" val indexType = s"${screenName}" EsSpark.saveToEs(sc.makeRDD(tweets), s"${indexName}/${indexType}")
このコードを実行すると、指定のscreen-nameのユーザーのタイムラインがElasticsearchに
取り込まれます。
形態素解析しつつ、別のインデックスを作成
では、形態素解析する側。
src/test/scala/org/littlewings/elasticsearch/spark/ReindexingEsSparkSuite.scala
package org.littlewings.elasticsearch.spark import org.apache.http.HttpHost import org.apache.lucene.analysis._ import org.apache.lucene.analysis.tokenattributes.CharTermAttribute import org.apache.spark.{SparkConf, SparkContext} import org.codelibs.neologd.ipadic.lucene.analysis.ja._ import org.codelibs.neologd.ipadic.lucene.analysis.ja.tokenattributes.PartOfSpeechAttribute import org.elasticsearch.client.RestClient import org.elasticsearch.spark.rdd.EsSpark import org.scalatest.FunSuite import scala.collection.JavaConverters._ object MorphologicalAnalyzer { // Analyzer val analyzer: Analyzer = new JapaneseAnalyzer // 形態素解析してSeqにして返す def toToken(text: String, filter: (String, AnalyzedTokenAttribute) => Boolean): Seq[String] = { val tokenStream = analyzer.tokenStream("", text) val tokenAttribute = AnalyzedTokenAttribute( charTerm = tokenStream.addAttribute(classOf[CharTermAttribute]), partOfSpeech = tokenStream.addAttribute(classOf[PartOfSpeechAttribute]) // 品詞 ) tokenStream.reset() try { Iterator .continually(tokenStream.incrementToken()) .takeWhile(identity) .flatMap { _ => val token = tokenAttribute.charTerm.toString // 対象とするTokenの絞り込み if (filter(token, tokenAttribute)) Some(token) else None } .toVector } finally { tokenStream.end() tokenStream.close() } } } // TokenStreamの属性を保持 case class AnalyzedTokenAttribute(charTerm: CharTermAttribute, partOfSpeech: PartOfSpeechAttribute) class ReindexingEsSparkSuite extends FunSuite { test("reindexing es to spark") { val screenName = "kazuhira_r" val pagingStart = 1 val pagingEnd = -1 val limit = 200 val elasticsearchHost = "localhost" val elasticsearchPort = 9200 val srcIndexName = "tweet" val destIndexName = "tokenized_tweet" val indexType = s"${screenName}" // 前のインデックスはとりあえず削除 val client = RestClient.builder(new HttpHost(elasticsearchHost, elasticsearchPort)).build try { client.performRequest("DELETE", s"/${destIndexName}", Map.empty[String, String].asJava) } catch { case e: Exception => // ignore, index not found... e.printStackTrace() } finally { client.close() } val conf = new SparkConf() .setAppName("reindexing elasticsearch, with morphological") .setMaster("local[*]") .set("es.nodes", elasticsearchHost) .set("es.port", elasticsearchPort.toString) .set("es.index.auto.create", "true") .set("es.mapping.id", "id") val sc = new SparkContext(conf) try { val rdd = EsSpark.esRDD(sc, s"${srcIndexName}/${indexType}", "?q=*") val analyzedRdd = rdd .values .filter(tweet => !tweet("retweet").toString.toBoolean) // RTは除外 .map(tweet => tweet + // srcのインデックスに入っていたテキストを、形態素解析してツイートに追加 ("tokens" -> MorphologicalAnalyzer.toToken( tweet("text") .asInstanceOf[String] .replaceAll("""https?://\S+""", "") // URL削除 .replaceAll("""@\S+""", ""), // twitter-id削除 (token, analyzedTokenAttribute) => token.size > 4 && // 長さ5文字以上 analyzedTokenAttribute.partOfSpeech.getPartOfSpeech.contains("名詞") // 名詞のみで絞り込み )) ) // Elasticsearchに保存 EsSpark.saveToEs(analyzedRdd, s"${destIndexName}/${indexType}") } finally { sc.stop() } } }
Sparkを使っている関係で、Analyzerはシリアライズしなくて済むようにobjectにまとめました。
object MorphologicalAnalyzer { // Analyzer val analyzer: Analyzer = new JapaneseAnalyzer // 形態素解析してSeqにして返す def toToken(text: String, filter: (String, AnalyzedTokenAttribute) => Boolean): Seq[String] = { val tokenStream = analyzer.tokenStream("", text) val tokenAttribute = AnalyzedTokenAttribute( charTerm = tokenStream.addAttribute(classOf[CharTermAttribute]), partOfSpeech = tokenStream.addAttribute(classOf[PartOfSpeechAttribute]) // 品詞 ) tokenStream.reset() try { Iterator .continually(tokenStream.incrementToken()) .takeWhile(identity) .flatMap { _ => val token = tokenAttribute.charTerm.toString // 対象とするTokenの絞り込み if (filter(token, tokenAttribute)) Some(token) else None } .toVector } finally { tokenStream.end() tokenStream.close() } } } // TokenStreamの属性を保持 case class AnalyzedTokenAttribute(charTerm: CharTermAttribute, partOfSpeech: PartOfSpeechAttribute)
で、Elasticsearch Sparkで最初に取り込んだインデックスからツイートを取り出し、形態素解析して新しいインデックスに
取り込みます。
val rdd = EsSpark.esRDD(sc, s"${srcIndexName}/${indexType}", "?q=*") val analyzedRdd = rdd .values .filter(tweet => !tweet("retweet").toString.toBoolean) // RTは除外 .map(tweet => tweet + // srcのインデックスに入っていたテキストを、形態素解析してツイートに追加 ("tokens" -> MorphologicalAnalyzer.toToken( tweet("text") .asInstanceOf[String] .replaceAll("""https?://\S+""", "") // URL削除 .replaceAll("""@\S+""", ""), // twitter-id削除 (token, analyzedTokenAttribute) => token.size > 4 && // 長さ5文字以上 analyzedTokenAttribute.partOfSpeech.getPartOfSpeech.contains("名詞") // 名詞のみで絞り込み )) ) // Elasticsearchに保存 EsSpark.saveToEs(analyzedRdd, s"${destIndexName}/${indexType}")
URLを削除していたり、長さ5文字以上のツイートを対象にしているところは元エントリと同じですが、
RTは対象外にしたり、PartOfSpeechAttributeを使って「名詞」を含む品詞にのみ絞り込んでいます。
なお、取り込み先のインデックスは、今回は開始時に削除するようにしました。このためだけに、REST Clientを
使っています…。
// 前のインデックスはとりあえず削除 val client = RestClient.builder(new HttpHost(elasticsearchHost, elasticsearchPort)).build try { client.performRequest("DELETE", s"/${destIndexName}", Map.empty[String, String].asJava) } catch { case e: Exception => // ignore, index not found... e.printStackTrace() } finally { client.close() }
取り込み先のインデックスの名前は、「tokenized_tweet」にしました。タイプの名前は同じです。
val srcIndexName = "tweet" val destIndexName = "tokenized_tweet" val indexType = s"${screenName}"
では、こちらを実行して新しいインデックスに形態素解析および絞り込みを行ったツイートを取り込みます。
タグクラウドを作成する
それでは、Kibanaでタグクラウドを見てみましょう。
形態素解析したデータが入っている方のインデックスを、Kibanaで選択します。
あとは、時間の範囲を指定して「Visualize」から「Tag cloud」を選択して、形態素解析したフィールド「tokens」で
タグクラウドを作成します。
なんか、それっぽいのができました。
※自分のツイートで作っているので、内容については突っ込まないでください…
ところで、英単語が全部小文字になっているのと、「プレッシャー」のような単語の「ー」がないのは
LowerCaseFilterやJapaneseKatakanaStemFilterが適用されているからですね。
このあたりが気になるなら、Analyzerをカスタマイズしてもよいでしょう。先ほどはJapaneseAnalyzerをそのまま
使いましたが、次はJapaneseAnalyzerをバラしてJapaneseKatakanaStemFilterとLowerCaseFilterを外してみます。
// Analyzer // val analyzer: Analyzer = new JapaneseAnalyzer val analyzer: Analyzer = new StopwordAnalyzerBase { override def createComponents(fieldName: String): TokenStreamComponents = { val tokenizer = new JapaneseTokenizer(null, true, JapaneseTokenizer.Mode.SEARCH) var tokenStream: TokenStream = new JapaneseBaseFormFilter(tokenizer) tokenStream = new JapanesePartOfSpeechStopFilter(tokenStream, JapaneseAnalyzer.getDefaultStopTags) tokenStream = new CJKWidthFilter(tokenStream) tokenStream = new StopFilter(tokenStream, JapaneseAnalyzer.getDefaultStopSet) // tokenStream = new JapaneseKatakanaStemFilter(tokenStream) // tokenStream = new LowerCaseFilter(tokenStream) new TokenStreamComponents(tokenizer, tokenStream) } }
その他、原型に戻されるのが嫌、などあれば、JapaneseTokenizerだけを使ったAnalyzerを使用するのも
よいかもしれません。
この定義で、再度形態素解析して取り込みなおすと、結果はこのようになります。
表記揺れがあった場合等は重複しちゃうでしょうけど、ちょっと自然っぽくなりましたね。
まとめ
元のエントリと違って、Elasticsearchの外(Spark側)でいろいろやってしまう感じにしてしまいましたが、
まあSparkで遊びたかったということで。
それにしても、Kibana便利ですね。
オマケ
最初に余談で書いていた、Groovyで作ったスクリプトも貼っておきます(CodeLibs Kuromojiを使うようには
しています)。こちらは取り込みと形態素解析を一緒にやってしまいますし、インデックスの削除もやりませんが
簡単に使うならこちら。
使い方は、screen-nameを起動引数に指定。
$ groovy tweet-to-elasticsearch.groovy cero_t
取り込まれ先のインデックス名は、「tweet」です。
tweet-to-elasticsearch.groovy
@Grab('io.searchbox:jest:2.0.4') import io.searchbox.client.JestClientFactory import io.searchbox.client.config.HttpClientConfig import io.searchbox.core.Index @Grab('org.twitter4j:twitter4j-core:4.0.5') import twitter4j.Paging import twitter4j.Status import twitter4j.TwitterFactory @GrabResolver(name = 'CodeLibs Repository', root = 'http://maven.codelibs.org/') @Grab('org.codelibs:lucene-analyzers-kuromoji-ipadic-neologd:6.2.1-20161201') import org.apache.lucene.analysis.tokenattributes.CharTermAttribute import org.codelibs.neologd.ipadic.lucene.analysis.ja.JapaneseAnalyzer import org.codelibs.neologd.ipadic.lucene.analysis.ja.tokenattributes.PartOfSpeechAttribute def screenName = args[0] // screen-nameは起動引数で def currentPage = 1 def pagingStart = 1 def pagingEnd = -1 def limit = 200 def indexName = 'tweet' def indexType = 'tweet_' + screenName.toLowerCase() def elasticsearchUrl = 'http://localhost:9200' def jestClientFactory = new JestClientFactory() jestClientFactory.httpClientConfig = new HttpClientConfig.Builder(elasticsearchUrl).multiThreaded(true).build() def jestClient = jestClientFactory.object def twitter = TwitterFactory.singleton def paging = new Paging(currentPage, limit) def responseList = twitter.getUserTimeline(screenName, paging) def analyzer = new JapaneseAnalyzer() while (responseList) { println("Current page = $currentPage") responseList .grep { !it.retweet } // .grep { !it.text.startsWith('@') } .each { status -> def id = status.id def favoriteCount = status.favoriteCount def retweetCount = status.retweetCount def createdAt = status.createdAt def text = status.text def textForAnalyze = text .replaceAll("https?://\\S+", '') .replaceAll("@\\S+", '') def tokenStream = analyzer.tokenStream('', textForAnalyze) def charTermAttr = tokenStream.addAttribute(CharTermAttribute) def partOfSpeechAttr = tokenStream.addAttribute(PartOfSpeechAttribute) tokens = [] tokenStream.reset() while (tokenStream.incrementToken()) { def partOfSpeech = partOfSpeechAttr.partOfSpeech if (partOfSpeech.contains('名詞')) {// || partOfSpeech.contains('記号')) { def token = charTermAttr.toString() if (!token.contains('http') && !token.contains('https')) { if (token.size() > 4) { tokens << charTermAttr.toString() } } } } tokenStream.end() tokenStream.close() def tweet = [ id: id, screen_name: screenName, favorite_count: favoriteCount, retweet_count: retweetCount, created_at: createdAt, text: text, tokens: tokens ] def index = new Index.Builder(tweet).index(indexName).type(indexType).id("${status.user.screenName}_${id}").build() jestClient.execute(index) } if (pagingEnd == -1 || currentPage <= pagingEnd) { currentPage += 1 paging = new Paging(currentPage, limit) responseList = twitter.getUserTimeline(screenName, paging) } else { break } } println("Read page = $currentPage") jestClient.shutdownClient()