CLOVER🍀

That was when it all began.

InfinispanのLucene Directory実装を使って、インデックスを分散して持つ

久々に、書いてみたくなっただけのエントリ。Infinispanが持つLucene Directory実装を使って、遊びます。

Infinispanはインメモリ・データグリッドと呼ばれる製品の一種で、複数のJava VM間でクラスタを構成し、比較的簡単にデータを共有することができます。このInfinispanには、Apache LuceneのDirectoryを実装した機能があります。

15. Infinispan as a storage for Lucene indexes
http://infinispan.org/docs/7.1.x/user_guide/user_guide.html#_infinispan_as_a_storage_for_lucene_indexes

Hibernate Searchでインデックスの保存先にInfinispanを選んだ時にクラスタ構成時にインデックスを複数Nodeで共有できるのは、Infinispanにこの機能があるからですね。

今回はこのInfinispanのLucene Directoryを使って、こういうものを作って遊んでみます。

  • インデックスに対して、インタラクティブにDocumentを登録できるIndexerを作る
  • インデックスに対して、インタラクティブにQueryを発行できるSearcherを作る
  • これらのプログラムをInfinispanのクラスタとして構成し、Indexerで登録したDocumentがSearcherから見えることを確認する

Infinispanクラスタ上のNodeについては基本的に各々の役割みたいなものは気にしませんが、今回はプログラムの特性上こんな形にします。

なお、インデックスの分散保持であり、分散検索ではありませんのでご注意を。また、更新についても通常のLuceneと同様、IndexWriterは(Infinispanの場合、クラスタ上で)ひとつしかオープンすることができません。

プログラムは、Scalaで書きます。

準備

まずは、sbtの設定。
build.sbt

name := "lucene-distributed-directory"

version := "0.0.1-SNAPSHOT"

organization := "org.littlewings"

scalaVersion := "2.11.6"

scalacOptions ++= Seq("-Xlint", "-unchecked", "-deprecation", "-feature")

updateOptions := updateOptions.value.withCachedResolution(true)

/*
// sbt上で繰り返しrunする場合は、これらをtrueにする
fork in run := true

connectInput := true
*/

libraryDependencies ++= Seq(
  "org.infinispan" % "infinispan-lucene-directory" % "7.1.1.Final",
  "net.jcip" % "jcip-annotations" % "1.0" % "provided",
  "org.apache.lucene" % "lucene-queryparser" % "4.10.3",
  "org.apache.lucene" % "lucene-analyzers-kuromoji" % "4.10.3"
)

Infinispanは、7.1.1.Finalを使用します。これに対応するLucenenは、4.10.3となります。Infinispan 7.2になれば、4.10.4になるみたいです。

設定ファイル

クラスタを構成するので、InfinispanとJGroupsの設定ファイルを用意します。

まずは、Infinispanの設定ファイル。
src/main/resources/infinispan.xml

<?xml version="1.0" encoding="UTF-8"?>
<infinispan
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="urn:infinispan:config:7.1 http://www.infinispan.org/schemas/infinispan-config-7.1.xsd"
    xmlns="urn:infinispan:config:7.1">
  <jgroups>
    <stack-file name="udp" path="jgroups.xml" />
  </jgroups>

  <cache-container name="cacheManager" shutdown-hook="REGISTER">
    <transport cluster="cluster" stack="udp" />
    <jmx duplicate-domains="true" />

    <replicated-cache name="metadataCache">
      <eviction max-entries="-1" strategy="NONE"/>
      <expiration max-idle="-1"/>
    </replicated-cache>

    <distributed-cache name="dataChunksCache"
                       owners="2"
                       l1-lifespan="300000">
      <eviction max-entries="-1" strategy="NONE"/>
      <expiration max-idle="-1"/>
    </distributed-cache>

    <replicated-cache name="locksCache">
      <eviction max-entries="-1" strategy="NONE"/>
      <expiration max-idle="-1"/>
    </replicated-cache>
  </cache-container>
</infinispan>

Cacheは、3つ定義しており、それぞれ以下の用途と定義となっています。

  • metadataCache … インデックスのメタデータを登録するCache。サイズが小さめのため、Replicated Cache(全Nodeに同じデータがコピーされる)とします
  • dataChunksCache … インデックスの実データを持つCache。今回はそれほど使いませんが、こちらはDistributed Cache(データのオーナー+バックアップ)として分配して保持します
  • locksCache … Luceneのロックに使うCache。容量はほぼ使用しないため、Replicated Cacheとします

dataChunksCacheについては、L1 Cacheも有効にしてみました。

JGroupsの設定ファイルは、こちら。小さめな設定にしています。
src/main/resources/jgroups.xml

<?xml version="1.0" encoding="UTF-8"?>
<config xmlns="urn:org:jgroups"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/JGroups-3.6.xsd">
  <UDP
      mcast_addr="${jgroups.udp.mcast_addr:228.6.7.8}"
      mcast_port="${jgroups.udp.mcast_port:46655}"
      tos="8"
      ucast_recv_buf_size="150k"
      ucast_send_buf_size="130k"
      mcast_recv_buf_size="150k"
      mcast_send_buf_size="130k"
      max_bundle_size="31k"
      ip_ttl="${jgroups.udp.ip_ttl:0}"
      enable_diagnostics="false"
      bundler_type="sender-sends-with-timer"

      thread_naming_pattern="pl"
      thread_pool.enabled="true"
      thread_pool.min_threads="2"
      thread_pool.max_threads="30"
      thread_pool.keep_alive_time="60000"
      thread_pool.queue_enabled="true"
      thread_pool.queue_max_size="100"
      thread_pool.rejection_policy="Discard"

      oob_thread_pool.enabled="true"
      oob_thread_pool.min_threads="2"
      oob_thread_pool.max_threads="30"
      oob_thread_pool.keep_alive_time="60000"
      oob_thread_pool.queue_enabled="false"
      oob_thread_pool.queue_max_size="100"
      oob_thread_pool.rejection_policy="Discard"

      internal_thread_pool.enabled="true"
      internal_thread_pool.min_threads="2"
      internal_thread_pool.max_threads="4"
      internal_thread_pool.keep_alive_time="60000"
      internal_thread_pool.queue_enabled="true"
      internal_thread_pool.queue_max_size="100"
      internal_thread_pool.rejection_policy="Discard"
      />

  <PING timeout="3000" num_initial_members="3"/>
  <MERGE3/>

  <FD_SOCK/>
  <FD_ALL timeout="15000" interval="3000"/>
  <VERIFY_SUSPECT timeout="1500"/>

  <pbcast.NAKACK2
      xmit_interval="1000"
      xmit_table_num_rows="100"
      xmit_table_msgs_per_row="10000"
      xmit_table_max_compaction_time="10000"
      max_msg_batch_size="100"/>

  <UNICAST3
      xmit_interval="500"
      xmit_table_num_rows="20"
      xmit_table_msgs_per_row="10000"
      xmit_table_max_compaction_time="10000"
      max_msg_batch_size="100"
      conn_expiry_timeout="0"/>

  <pbcast.STABLE stability_delay="500" desired_avg_gossip="5000" max_bytes="1m"/>
  <pbcast.GMS print_local_addr="true" join_timeout="3000" view_bundling="true"/>

  <tom.TOA/> <!-- the TOA is only needed for total order transactions-->

  <UFC max_credits="2m" min_threshold="0.40"/>
  <MFC max_credits="2m" min_threshold="0.40"/>
  <FRAG2 frag_size="30k" />
  <RSVP timeout="60000" resend_interval="500" ack_on_delivery="false" />
</config>

ちょっとしたトレイト・Implicit Class

IndexerとSearcherのために、ちょっとした簡易なトレイトなどを用意しました。

ログというか、コンソール表示用のトレイト。
src/main/scala/org/littlewings/infinispan/lucene/ConsoleLogger.scala

package org.littlewings.infinispan.lucene

trait ConsoleLogger {
  val nodeName: String

  def log(msg: => String): Unit = println(s"$nodeName> $msg")

  def show(msg: => String): Unit = println(msg)
}

どのNodeか分かるように、Node名を取るようにしています。

closeやstopを簡単にするために、InfinispanのLifecycleとJavaに含まれるAutoCloseableに対して、Implicit Class(かつValue Class)を仕込みます。
src/main/scala/org/littlewings/infinispan/lucene/ResourceWrappers.scala

package org.littlewings.infinispan.lucene

import org.infinispan.commons.api.Lifecycle

object ResourceWrappers {
  implicit class LifecycleWrapper[A <: Lifecycle](val underlying: A) extends AnyVal {
    def foreach(f: A => Unit): Unit =
      try {
        f(underlying)
      } finally {
        underlying.stop()
      }
  }

  implicit class AutoClosebleWrapper[A <: AutoCloseable](val underlying: A) extends AnyVal {
    def foreach(f: A => Unit): Unit =
      try {
        f(underlying)
      } finally {
        underlying.close()
      }
  }
}

これで、LifecycleやAutoCloseableなリソースを、foreachで閉じることができるようになりました。

それでは、メインのプログラムへ。

Indexer

それでは、インデックスにDocumentを登録する方のプログラム。コンソールで受け取った文字列を、そのままLuceneのDocumentとして登録します。DocumentのFieldはひとつとします。

骨格は、こんな感じ。
src/main/scala/org/littlewings/infinispan/lucene/Indexer.scala

package org.littlewings.infinispan.lucene

import scala.io.StdIn

import org.apache.lucene.analysis.ja.JapaneseAnalyzer
import org.apache.lucene.document.{ Document, Field, TextField }
import org.apache.lucene.index.{ IndexWriter, IndexWriterConfig }
import org.apache.lucene.store.Directory
import org.apache.lucene.util.Version
import org.infinispan.Cache
import org.infinispan.lucene.directory.DirectoryBuilder
import org.infinispan.manager.DefaultCacheManager

import org.littlewings.infinispan.lucene.ResourceWrappers._

object Indexer {
  def main(args: Array[String]): Unit = {
    new Indexer("Indexer").start()
  }
}

class Indexer(val nodeName: String) extends ConsoleLogger {
  def start(): Unit = {
    // 後で
  }

  private def indexingWhile(directory: Directory): Unit = {
    // 後で
  }
}

こちらは、割と小さめです。start、indexingWhileメソッドの中は、後述します。

まずは、startメソッド。ここで、Infinispanのクラスタを構成するNode、Cacheを起動してLuceneのDirectoryを作成します。

  def start(): Unit = {
    log("Start Indexer.")

    for {
      manager <- new DefaultCacheManager("infinispan.xml")
      metadataCache <- manager.getCache[Any, Any]("metadataCache")
      dataChunksCache <- manager.getCache[Any, Any]("dataChunksCache")
      locksCache <- manager.getCache[Any, Any]("locksCache")
      directory <- DirectoryBuilder.newDirectoryInstance(metadataCache,
                                                         dataChunksCache,
                                                         locksCache,
                                                         "myIndex")
                                   .chunkSize(30 * 1024 * 1024)  // とりあえず30k
                                   .create
    } indexingWhile(directory)

    log("Stop Indexer.")
  }

ここでは、Infinispanのクラスタ構成要素となるDefaultCacheManagerを作成し(Nodeみたいなもの)、そこから先ほどの設定ファイル(infinispan.xml)で定義したCacheを取得します。

これら3つのCacheを使ってインデックス名を指定しつつ、LuceneのDirectoryをDirectoryBuilderを使って作成します。チャンクサイズは、デフォルトが3kですが少し大きくしておきました。今回のプログラムでは、大した意味はありませんが。

続いて、Documentを登録する部分。

  private def indexingWhile(directory: Directory): Unit = {
    log("add Document. [exit] teminate, this prompt.")

    val analyzer = new JapaneseAnalyzer

    for (writer <- new IndexWriter(directory, new IndexWriterConfig(Version.LUCENE_4_10_3, analyzer))) {
      // なにか作っていないと、クラスタ参加メンバーがDirectoryのopen時に
      // org.apache.lucene.index.IndexNotFoundException: no segments* file found in InfinispanDirectory
      // となってしまうので、空コミット
      writer.commit()

      Iterator
        .continually(StdIn.readLine("addDocument> "))
        .withFilter(line => line != null && !line.isEmpty)
        .takeWhile(_ != "exit")
        .foreach { content =>
          val document = new Document
          document.add(new TextField("content", content, Field.Store.YES))
          writer.addDocument(document)
          writer.commit()
          show(s"[$content] added.")
      }
    }
  }

ここは、普通にLucene Directoryを使ったプログラミングになります。ちょっと微妙なのは、最初に何かDirectory内にデータができるようなことをしておかないと、空のインデックスを開こうとしてSearcherが失敗するため、とりあえずIndexWriterをコミットしています。

あとは、コンソールから入力した文字列をDocumentのフィールドとして登録します。これを繰り返し、「exit」と入力されるとプログラムは終了します。

Searcher

そして、インデックスに対して検索を行う側のプログラム。コンソールからLuceneのQueryParserで解釈可能なクエリを文字列として受け取り、そこからLuceneのQueryを作って結果を表示します。

こちらも、骨格から。
src/main/scala/org/littlewings/infinispan/lucene/Searcher.scala

package org.littlewings.infinispan.lucene

import scala.collection.JavaConverters._
import scala.util.{ Failure, Success, Try }
import scala.io.StdIn

import org.apache.lucene.analysis.ja.JapaneseAnalyzer
import org.apache.lucene.document.{ Document, Field, TextField }
import org.apache.lucene.index.DirectoryReader
import org.apache.lucene.queryparser.classic.QueryParser
import org.apache.lucene.search._
import org.apache.lucene.store.Directory
import org.infinispan.Cache
import org.infinispan.lucene.directory.DirectoryBuilder
import org.infinispan.manager.DefaultCacheManager

import org.littlewings.infinispan.lucene.ResourceWrappers._

object Searcher {
  def main(args: Array[String]): Unit =
    new Searcher(args(0)).start()
}

class Searcher(val nodeName: String) extends ConsoleLogger {
  def start(): Unit = {
    // 後で
  }

  private def queryWhile(directory: Directory): Unit = {
    // 後で
  }

  private def parseQuery(queryParser: QueryParser)(queryString: String): Try[Query] =
    // 後で
    }

  private def search(searcher: IndexSearcher, query: Query, limit: Int): (Int, Array[ScoreDoc]) = {
    // 後で
  }
}

Searcher側は複数起動することを想定しているので、仮のNode名を引数で受け取ることにしました(ただのコンソール表示用です)。

  def main(args: Array[String]): Unit =
    new Searcher(args(0)).start()

こちらも、実際の処理を開始するstartメソッドから。

  def start(): Unit = {
    log("Start Searcher.")

    for {
      manager <- new DefaultCacheManager("infinispan.xml")
      metadataCache <- manager.getCache[Any, Any]("metadataCache")
      dataChunksCache <- manager.getCache[Any, Any]("dataChunksCache")
      locksCache <- manager.getCache[Any, Any]("locksCache")
      directory <- DirectoryBuilder.newDirectoryInstance(metadataCache,
                                                         dataChunksCache,
                                                         locksCache,
                                                         "myIndex")
                                   .chunkSize(30 * 1024 * 1024)  // とりあえず30k
                                   .create
    } queryWhile(directory)

    log("Stop Searcher.")
  }

ここは、インデックスを作成する側とそう変わりません。

検索の中核は、queryWhileというメソッドなのですが、先に部品から。

文字列として入力されたクエリをパースするメソッド。

  private def parseQuery(queryParser: QueryParser)(queryString: String): Try[Query] =
    Try(queryParser.parse(queryString)).recoverWith {
      case e =>
        show(s"Invalid Query[$queryString], Reason: $e.")
        Failure(e)
    }

パースに失敗してもいいように、Tryで包んでいます。

続いて、IndexSearcherとパースされたQueryを使って検索を行うメソッド。こちらは、ヒット件数とScoreDocの配列を戻します。

  private def search(searcher: IndexSearcher, query: Query, limit: Int): (Int, Array[ScoreDoc]) = {
    show(s"  Input Query => [$query].")

    val totalHitCountCollector = new TotalHitCountCollector
    searcher.search(query, totalHitCountCollector)
    val totalHits = totalHitCountCollector.getTotalHits

    val docCollector = TopFieldCollector.create(Sort.RELEVANCE,
      limit,
      true,  // fillFields
      true,  // trackDocScores
      false,  // traxMaxScore
      false)  // docScoreInOrder

    searcher.search(query, docCollector)
    val topDocs = docCollector.topDocs
    val hits = topDocs.scoreDocs

    (totalHits, hits)
  }

これらを使って、インタラクティブにクエリを受け付けるメソッドはこうなりました。

  private def queryWhile(directory: Directory): Unit = {
    log("query while. [exit] teminate, this prompt.")

    val analyzer = new JapaneseAnalyzer
    val searcherManager = new SearcherManager(directory, new SearcherFactory)

    val queryParser = new QueryParser("content", analyzer)
    val limit = 1000

    def parser = parseQuery(queryParser) _

    Iterator
      .continually(StdIn.readLine(s"$nodeName:Query> "))
      .withFilter(line => line != null && !line.isEmpty)
      .takeWhile(_ != "exit")
      .map(parser)
      .withFilter(_.isSuccess)
      .foreach { query =>
        searcherManager.maybeRefreshBlocking()
        val searcher = searcherManager.acquire

        search(searcher, query.get, limit) match {
          case (totalHits, hits) if totalHits > 0 =>
            show(s"  ${totalHits}件ヒットしました")

            hits.foreach { h =>
              val hitDoc = searcher.doc(h.doc)

              show(s"     ScoreDoc[score/id] => [${h.score}/${h.doc}]: Doc => " +
                hitDoc
                .getFields
                .asScala
                .map(_.stringValue)
                .mkString("|"))
            }
          case _ =>
            println("  ヒット件数は0です")
        }

        searcherManager.release(searcher)
      }
  }

Indexerと同じく、こちらも「exit」と入力されると終了します。プログラムがScalaで書かれていることを除いては、普通のLuceneを使ったコードだと思います。

動作確認

それでは、動かしてみましょう。Indexerは、IndexWriterを開きっぱなしなこともあるのと、登録する役割は複数いらないので、ひとつのNodeとします。

Searcher側は、全部で3 Nodeとします。Indexerが登録したDocumentが検索できることと、いきなり3つ全部起動しても面白くないので最後のひとつは別のタイミングで起動することにします。

では、まずIndexerを起動。

$ sbt 'runMain org.littlewings.infinispan.lucene.Indexer'

起動すると、こんな感じで入力待ちになります。

Indexer> add Document. [exit] teminate, this prompt.
addDocument>

続いて、Searcherを起動。

# Searcher Node 1
$ sbt 'runMain org.littlewings.infinispan.lucene.Searcher S1'

# Searcher Node 2
$ sbt 'runMain org.littlewings.infinispan.lucene.Searcher S2'

メンバークラスタに増えたことを検知すると、増えるにしたがってこんな感じのログがコンソールに出力されます。

INFO: ISPN000094: Received new cluster view for channel cluster: [xxxxx-48496|2] (3) [xxxxx-48496, xxxxx-10467, xxxxx-12776]

これで3 Nodeです。

Searcher側は、こんな感じで入力待ちになります。

S2> query while. [exit] teminate, this prompt.
S2:Query>

では、IndexerでDocumentを登録。とりあえず、3つ。

addDocument> あなたとJAVA、今すぐダウンロード!!
[あなたとJAVA、今すぐダウンロード!!] added.
addDocument> 全文検索エンジンApache Lucene
[全文検索エンジンApache Lucene] added.
addDocument> すもももももももものうち
[すもももももももものうち] added.

これを、Searcher側で検索することができます。

S1:Query> *:*
  Input Query => [*:*].
  3件ヒットしました
     ScoreDoc[score/id] => [1.0/0]: Doc => あなたとJAVA、今すぐダウンロード!!
     ScoreDoc[score/id] => [1.0/1]: Doc => 全文検索エンジンApache Lucene
     ScoreDoc[score/id] => [1.0/2]: Doc => すもももももももものうち

S2:Query> java
  Input Query => [content:java].
  1件ヒットしました
     ScoreDoc[score/id] => [0.614891/0]: Doc => あなたとJAVA、今すぐダウンロード!!

どちらのNodeからも、登録したDocumentが見えています。クラスタに参加した時に、データがNode間で共有されたからですね。

さらに、IndexerでDocumentを追加。

addDocument> Java, Scala, Groovy, Clojure
[Java, Scala, Groovy, Clojure] added.

この追加分も、検索できます。

S1:Query> groovy
  Input Query => [content:groovy].
  1件ヒットしました
     ScoreDoc[score/id] => [0.8465736/3]: Doc => Java, Scala, Groovy, Clojure

さらに、Node追加。

$ sbt 'runMain org.littlewings.infinispan.lucene.Searcher S3'

クラスタに、4 Node存在することになります。

INFO: ISPN000094: Received new cluster view for channel cluster: [xxxxx-48496|3] (4) [xxxxx-48496, xxxxx-10467, xxxxx-12776, xxxxx-9514]

この追加されたNodeからも、検索を行うことができます。

S3:Query> Clojure
  Input Query => [content:clojure].
  1件ヒットしました
     ScoreDoc[score/id] => [0.8465736/3]: Doc => Java, Scala, Groovy, Clojure
S3:Query> Java
  Input Query => [content:java].
  2件ヒットしました
     ScoreDoc[score/id] => [0.643841/3]: Doc => Java, Scala, Groovy, Clojure
     ScoreDoc[score/id] => [0.5633609/0]: Doc => あなたとJAVA、今すぐダウンロード!!

OKそうですね!

ということで

InfinispanのLucene Directory実装を使って、Node間でインデックスを共有して遊んでみました。久々にこのコードを書きましたが、やっぱり複数プロセスでクラスタを組む形にするとそこそこ大変…。普段は、UnitTest形式でひとつのJava VMプロセス内で簡易にクラスタを組んで遊んでいることが多いので。

個人的にはScala、Infinispan、Luceneと趣味ド真ん中の内容なので、意外と楽しかったりします。

今回作成したコードは、こちらに置いています。
https://github.com/kazuhira-r/infinispan-getting-started/tree/master/embedded-lucene-distributed-directory