CLOVER🍀

That was when it all began.

LuceneのSearcherManagerを使う

Lucene 3.5の時に追加されたという、SearcherManagerを使ってみたいと思います。そういえば、なんか追加されてたなぁという気がしたので。

Lucene 3.5 と Solr 3.5 - 大幅な RAM 削減,SearchManager,ディープページングのサポート
http://www.infoq.com/jp/news/2011/12/lucene-3.5-and-solr-3.5

なぜか、上記記事のタイトルはSearchManagerになってますが。

ここの説明通り、IndexSearcherの共有を簡単にするクラスのようです。SearcherManager#acquireでIndexSearcherを取得し、SeacherManager#releaseでIndexSearcherを返却します。

この時、IndexSearcherが内部的に持っているIndexReaderの参照をインクリメント/デクリメントし、カウンタが0になればIndexReaderを閉じてしまいます。まあ、0になったら閉じてしまうというのは、IndexReaderがやってしまうのですが…。

SearcherManagerを作成するには、以下のいずれかのコンストラクタを使用します。

// Directoryから作成する
public SearcherManager(Directory dir,
                       SearcherFactory searcherFactory)
                throws IOException

// IndexWriterから作成する
public SearcherManager(IndexWriter writer,
                       boolean applyAllDeletes,
                       SearcherFactory searcherFactory)
                throws IOException

SearcherFactoryは、nullを指定するとコンストラクタ中で勝手にSearcherFactoryのインスタンスを作成します。

あとは、検索時には先ほどの

// managerはSearcherManagerのインスタンス
IndexSearcher s = manager.acquire();

でIndexSearcherを取得し、検索が終わったら

manager.release(s);

でSearcherManagerにIndexSearcherを返却します。

この間、インデックスに更新があった場合は、

manager.maybeRefresh();

manager.maybeRefreshBlocking();

を呼び出して、IndexSearcherをリフレッシュします。この後で、IndexSearcherを取り直さないと意味ないですからね?

両者の違いは、1スレッドからしか使っていない場合は差が無いのですが、2スレッド以上で共有していて、なおかつ他のスレッドがすでにリフレッシュを依頼している場合はmaybeRefreshはすぐにリターンしfalseを返却します。maybeRefreshBlockingは、自分が処理を依頼して完了するまで待ち続けます。よって、こちらの戻り値はvoidです。

あと、SearcherManagerのどちらのコンストラクタを使うかで、ちょっと動きが違うみたいですが、まずは使ってみるということで。

build.sbt

name := "lucene-searchermanager"

version := "0.0.1-SNAPSHOT"

scalaVersion := "2.10.3"

organization := "littlewings"

libraryDependencies += "org.apache.lucene" % "lucene-analyzers-kuromoji" % "4.5.0"

Lucene 4.5、今回初めて使います。あと、別にKuromojiを入れなくてもよかったなぁ…。

で、こんなプログラムを書きました。
src/main/scala/LuceneSearcherManager.scala

import scala.collection.JavaConverters._

import org.apache.lucene.analysis.Analyzer
import org.apache.lucene.analysis.ja.JapaneseAnalyzer
import org.apache.lucene.document.{Document, Field, StringField, TextField}
import org.apache.lucene.index.{DirectoryReader, IndexWriter, IndexWriterConfig}
import org.apache.lucene.search.{IndexSearcher, Query, MatchAllDocsQuery, SearcherFactory, SearcherManager, Sort}
import org.apache.lucene.search.{TopFieldCollector, TotalHitCountCollector}
import org.apache.lucene.store.{Directory, RAMDirectory}
import org.apache.lucene.util.Version

object LuceneSearcherManager {
  private var currentId: Int = 1

  def main(args: Array[String]): Unit = {
    val luceneVersion = Version.LUCENE_45
    val analyzer = new JapaneseAnalyzer(luceneVersion)

    for {
      directory <- new RAMDirectory
      writer <- new IndexWriter(directory,
                                     new IndexWriterConfig(luceneVersion, analyzer)
                                       .setMaxBufferedDocs(100)
                                       .setRAMBufferSizeMB(100))
    } {
      (1 to 3).foreach(i => registerDocument(writer))
      writer.commit()

      commandWhile(directory, writer)
    }
  }

  private def commandWhile(directory: Directory, writer: IndexWriter): Unit = {
    /** 後で **/
  }

  private def registerDocument(writer: IndexWriter): Unit = {
    val document = new Document
    document.add(new StringField("id", currentId.toString, Field.Store.YES))
    document.add(new StringField("contents", "contents-%s".format(currentId), Field.Store.YES))
    writer.addDocument(document)

    currentId += 1
  }

  implicit class AutoCloseableWrapper[A <: AutoCloseable](val underlying: A) extends AnyVal {
    def foreach(fun: A => Unit): Unit =
      try {
        fun(underlying)
      } finally {
        underlying.close()
      }
  }
}

最初にRAMDirectoryにDocumentを3つ登録して、それから何かしらのコマンド実行をしようというプログラムです。ここは、キーボード入力からインタラクティブにやります。

まずは、SearcherManagerの作成。

    //val searcherManager = new SearcherManager(writer, true, new SearcherFactory)
    val searcherManager = new SearcherManager(directory, new SearcherFactory)

IndexWriterの方をコメントアウトしていますが、それはまた後で。

続いて、コマンド実行する部分を。

    Iterator
      .continually(readLine("Command> "))
      .withFilter(l => l != null && !l.isEmpty)
      .takeWhile(_ != "exit")
      .foreach {
        case "add" =>
          registerDocument(writer)
          println(s"Ducument added[${currentId - 1}]")
        case "search" =>
          val searcher = searcherManager.acquire

          val query = new MatchAllDocsQuery

          val totalHitCountCollector = new TotalHitCountCollector
          searcher.search(query, totalHitCountCollector)
          println(s"Total Document => ${totalHitCountCollector.getTotalHits}")

          val docCollector = TopFieldCollector.create(Sort.RELEVANCE,
                                                      100,
                                                      true,
                                                      false,
                                                      false,
                                                      false)
          searcher.search(query, docCollector)

          for (h <- docCollector.topDocs.scoreDocs) {
            val hitDoc = searcher.doc(h.doc)
            println(s"Score,ID[${h.score}:${h.doc}]: Doc => " +
                    hitDoc
                      .getFields
                      .asScala
                      .map(_.stringValue)
                      .mkString(" | "))
          }

          searcherManager.release(searcher)
        case "commit" =>
          writer.commit()
          println("IndexWriter committed.")
        case "refresh" =>
          searcherManager.maybeRefresh()
          println("SearcherManager maybeRefreshed.")
        case command => println(s"Unknown Command, [$command]")
      }

「add」でDocumentを追加し、「search」で現在の登録されているDocumentをすべて検索して表示します。「commit」でIndexWriterをコミットし、refreshでSearcherManager#maybeRefreshを呼び出します。

では、動かしてみます。

> run
[info] Running LuceneSearcherManager 
Command> 

検索。

Command> search
Total Document => 3
Score,ID[NaN:0]: Doc => 1 | contents-1
Score,ID[NaN:1]: Doc => 2 | contents-2
Score,ID[NaN:2]: Doc => 3 | contents-3

とりあえず、3つ出力されますね。追加してみます。

Command> add
Ducument added[4]

もう1度検索。

Total Document => 3
Score,ID[NaN:0]: Doc => 1 | contents-1
Score,ID[NaN:1]: Doc => 2 | contents-2
Score,ID[NaN:2]: Doc => 3 | contents-3

追加したDocumentが見えません。では、コミットします。

Command> commit
IndexWriter committed.

もう1度検索。

Command> search
Total Document => 3
Score,ID[NaN:0]: Doc => 1 | contents-1
Score,ID[NaN:1]: Doc => 2 | contents-2
Score,ID[NaN:2]: Doc => 3 | contents-3

それでも見えません。今度は、maybeRefreshを。

Command> refresh
SearcherManager maybeRefreshed.

もう1度。

Total Document => 4
Score,ID[NaN:0]: Doc => 1 | contents-1
Score,ID[NaN:1]: Doc => 2 | contents-2
Score,ID[NaN:2]: Doc => 3 | contents-3
Score,ID[NaN:3]: Doc => 4 | contents-4

今度は見えるようになりました。

今回は、IndexSearcherは検索の度に毎回取得しているのですが、基本的にインデックスを更新したらリフレッシュする必要があります。もちろん、必要に応じて先にコミットした上で、ですが。

当然のことながら、コミットせずにリフレッシュだけしても、あまり意味がありません。IndexWriterに対して大量に書き込みをして、インデックスに反映されてることもあるかも、ですけど。

で、先にコメントアウトしていた、IndexWriterを使ったSearcherManagerの生成。

    val searcherManager = new SearcherManager(writer, true, new SearcherFactory)
    //val searcherManager = new SearcherManager(directory, new SearcherFactory)

こちらに変えると、少し事情が変わります。

実行してみましょう。

> run
[info] Running LuceneSearcherManager

検索、追加。

Command> search
Total Document => 3
Score,ID[NaN:0]: Doc => 1 | contents-1
Score,ID[NaN:1]: Doc => 2 | contents-2
Score,ID[NaN:2]: Doc => 3 | contents-3
Command> add
Ducument added[4]

この後、リフレッシュしてみます。

Command> refresh
SearcherManager maybeRefreshed.

で、検索。

Total Document => 4
Score,ID[NaN:0]: Doc => 1 | contents-1
Score,ID[NaN:1]: Doc => 2 | contents-2
Score,ID[NaN:2]: Doc => 3 | contents-3
Score,ID[NaN:3]: Doc => 4 | contents-4

なんと、コミットせずに変更が反映されましたが…。

これ、IndexWriterを渡した場合は、IndexSearcherが内部的に利用しているIndexReaderは、このIndexWriterから作っていることからこんな動きになっているっぽいですね。内部的に、1度フラッシュしてるみたいです。

こんなところで。

では、今回のソースを。
src/main/scala/LuceneSearcherManager.scala

import scala.collection.JavaConverters._

import org.apache.lucene.analysis.Analyzer
import org.apache.lucene.analysis.ja.JapaneseAnalyzer
import org.apache.lucene.document.{Document, Field, StringField, TextField}
import org.apache.lucene.index.{DirectoryReader, IndexWriter, IndexWriterConfig}
import org.apache.lucene.search.{IndexSearcher, Query, MatchAllDocsQuery, SearcherFactory, SearcherManager, Sort}
import org.apache.lucene.search.{TopFieldCollector, TotalHitCountCollector}
import org.apache.lucene.store.{Directory, RAMDirectory}
import org.apache.lucene.util.Version

object LuceneSearcherManager {
  private var currentId: Int = 1

  def main(args: Array[String]): Unit = {
    val luceneVersion = Version.LUCENE_45
    val analyzer = new JapaneseAnalyzer(luceneVersion)

    for {
      directory <- new RAMDirectory
      writer <- new IndexWriter(directory,
                                     new IndexWriterConfig(luceneVersion, analyzer)
                                       .setMaxBufferedDocs(100)
                                       .setRAMBufferSizeMB(100))
    } {
      (1 to 3).foreach(i => registerDocument(writer))
      writer.commit()

      commandWhile(directory, writer)
    }
  }

  private def commandWhile(directory: Directory, writer: IndexWriter): Unit = {
    val searcherManager = new SearcherManager(writer, true, new SearcherFactory)
    //val searcherManager = new SearcherManager(directory, new SearcherFactory)

    Iterator
      .continually(readLine("Command> "))
      .withFilter(l => l != null && !l.isEmpty)
      .takeWhile(_ != "exit")
      .foreach {
        case "add" =>
          registerDocument(writer)
          println(s"Ducument added[${currentId - 1}]")
        case "search" =>
          val searcher = searcherManager.acquire

          val query = new MatchAllDocsQuery

          val totalHitCountCollector = new TotalHitCountCollector
          searcher.search(query, totalHitCountCollector)
          println(s"Total Document => ${totalHitCountCollector.getTotalHits}")

          val docCollector = TopFieldCollector.create(Sort.RELEVANCE,
                                                      100,
                                                      true,
                                                      false,
                                                      false,
                                                      false)
          searcher.search(query, docCollector)

          for (h <- docCollector.topDocs.scoreDocs) {
            val hitDoc = searcher.doc(h.doc)
            println(s"Score,ID[${h.score}:${h.doc}]: Doc => " +
                    hitDoc
                      .getFields
                      .asScala
                      .map(_.stringValue)
                      .mkString(" | "))
          }

          searcherManager.release(searcher)
        case "commit" =>
          writer.commit()
          println("IndexWriter committed.")
        case "refresh" =>
          searcherManager.maybeRefresh()
          println("SearcherManager maybeRefreshed.")
        case command => println(s"Unknown Command, [$command]")
      }
  }

  private def registerDocument(writer: IndexWriter): Unit = {
    val document = new Document
    document.add(new StringField("id", currentId.toString, Field.Store.YES))
    document.add(new StringField("contents", "contents-%s".format(currentId), Field.Store.YES))
    writer.addDocument(document)

    currentId += 1
  }

  implicit class AutoCloseableWrapper[A <: AutoCloseable](val underlying: A) extends AnyVal {
    def foreach(fun: A => Unit): Unit =
      try {
        fun(underlying)
      } finally {
        underlying.close()
      }
  }
}