CLOVER🍀

That was when it all began.

Infinispan+Hibernate Searchでクラスタを構成した時の、インデックスの更新について

前回書いたエントリの、タイトルをひっくり返したものです。

Hibernate Search+Infinispanでクラスタを構成した時の、インデックスの更新について
http://d.hatena.ne.jp/Kazuhira/20150807/1438966241

今回は、

  • Infinispanの検索機能を使うにあたって、Hibernate Searchを使う
  • Hibernate Searchが使用するLuceneのインデックスの保存先は、Infinispanとする
  • 今回のシナリオで使うInfinispanのCacheは、すべてクラスタ化されたものとする

という感じでやってみたいと思います。

Infinispanのドキュメントでいくと、ここが該当します。

14. Querying Infinispan
http://infinispan.org/docs/7.2.x/user_guide/user_guide.html#sid-68355061

前回は、Hibernate Searchのインデックス保存先がInfinispanで、かつクラスタ化されていた、というものでした。

結論

で、今回も結論から先に出しますが、こちらの構成の場合はクラスタに参加しているどのCacheからも更新可能です。

前回は、最初に更新したNodeがロックを取りっぱなしになってしまうため、最初のNodeでしか更新できなかったのですが。

準備

今回は、作成したプログラムを順に書いていきます。

まずはビルド定義。
build.sbt

name := "embedded-clustered-update-index"

version := "0.0.1-SNAPSHOT"

scalaVersion := "2.11.7"

organization := "org.littleiwngs"

scalacOptions ++= Seq("-Xlint", "-deprecation", "-unchecked", "-feature")

updateOptions := updateOptions.value.withCachedResolution(true)

fork in Test := true

parallelExecution in Test := false

libraryDependencies ++= Seq(
  "org.infinispan" % "infinispan-query" % "7.2.3.Final",
  "net.jcip" % "jcip-annotations" % "1.0" % "provided",
  "org.hibernate" % "hibernate-search-engine" % "5.2.0.Final",
  "org.apache.lucene" % "lucene-analyzers-kuromoji" % "4.10.4",
  "org.scalatest" %% "scalatest" % "2.2.5" % "test"
)

hibernate-search-engine」は書かなくてもいい気がするんですけど、sbtが引き込んでくれませんでした…。

Infinispanの設定は、こちら。
src/test/resources/infinispan.xml

<?xml version="1.0" encoding="UTF-8"?>
<infinispan
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="urn:infinispan:config:7.2 http://www.infinispan.org/schemas/infinispan-config-7.2.xsd"
        xmlns="urn:infinispan:config:7.2">
    <jgroups>
        <stack-file name="udp" path="jgroups.xml"/>
    </jgroups>

    <cache-container name="indexingCacheManager" shutdown-hook="DONT_REGISTER">
        <transport cluster="cluster" stack="udp"/>
        <jmx duplicate-domains="true"/>

        <distributed-cache name="queryCache">
            <indexing index="LOCAL">
                <property name="default.directory_provider">infinispan</property>
                <property name="default.exclusive_index_use">true</property>
                <property name="default.indexmanager">org.infinispan.query.indexmanager.InfinispanIndexManager</property>
                <property name="default.reader.strategy">shared</property>
                <property name="analyzer">org.apache.lucene.analysis.ja.JapaneseAnalyzer</property>
                <property name="lucene_version">LUCENE_4_10_4</property>
            </indexing>
        </distributed-cache>

        <distributed-cache name="LuceneIndexesData" mode="SYNC" remote-timeout="25000">
            <transaction mode="NONE"/>
            <state-transfer enabled="true" timeout="480000" await-initial-transfer="true"/>
            <indexing index="NONE"/>
            <locking striping="false" acquire-timeout="10000" concurrency-level="500" write-skew="false"/>
            <eviction max-entries="-1" strategy="NONE"/>
            <expiration max-idle="-1"/>
        </distributed-cache>

        <replicated-cache name="LuceneIndexesMetadata" mode="SYNC" remote-timeout="25000">
            <transaction mode="NONE"/>
            <state-transfer enabled="true" timeout="480000" await-initial-transfer="true"/>
            <indexing index="NONE"/>
            <locking striping="false" acquire-timeout="10000" concurrency-level="500" write-skew="false"/>
            <eviction max-entries="-1" strategy="NONE"/>
            <expiration max-idle="-1"/>
        </replicated-cache>

        <replicated-cache name="LuceneIndexesLocking" mode="SYNC" remote-timeout="25000">
            <transaction mode="NONE"/>
            <state-transfer enabled="true" timeout="480000" await-initial-transfer="true"/>
            <indexing index="NONE"/>
            <locking striping="false" acquire-timeout="10000" concurrency-level="500" write-skew="false"/>
            <eviction max-entries="-1" strategy="NONE"/>
            <expiration max-idle="-1"/>
        </replicated-cache>
    </cache-container>
</infinispan>

JGroupsの設定は、端折ります。

インデックスの設定を行っているのは、こちらです。今回は、Distributed Cacheとして用意しました。

        <distributed-cache name="queryCache">
            <indexing index="LOCAL">
                <property name="default.directory_provider">infinispan</property>
                <property name="default.exclusive_index_use">true</property>
                <property name="default.indexmanager">org.infinispan.query.indexmanager.InfinispanIndexManager</property>
                <property name="default.reader.strategy">shared</property>
                <property name="analyzer">org.apache.lucene.analysis.ja.JapaneseAnalyzer</property>
                <property name="lucene_version">LUCENE_4_10_4</property>
            </indexing>
        </distributed-cache>

それ以外は、LuceneのDirectory関係でHibernate Searchが使用するCacheになります。

Entityとテストコード

Hibernate Searchがインデックス化に使用する、Entityクラスを作成します。
src/test/scala/org/littlewings/infinispan/query/Contents.scala

package org.littlewings.infinispan.query

import java.util.Objects

import org.hibernate.search.annotations.{Field, Indexed}

object Contents {
  def apply(id: String, value: String): Contents = {
    val c = new Contents
    c.id = id
    c.value = value
    c
  }
}

@Indexed
@SerialVersionUID(1L)
class Contents extends Serializable {
  @Field
  var id: String = _

  @Field
  var value: String = _

  override def hashCode: Int = Objects.hash(id, value)

  override def equals(o: Any): Boolean = o match {
    case other: Contents => Objects.equals(id, other.id) && Objects.equals(value, other.value)
    case _ => false
  }
}

このパターンで使う場合は、EntityクラスはJPAのEntityである必要はありません。

そして、テストコードの雛形はこちら。
src/test/scala/org/littlewings/infinispan/query/EmbeddedClusteredIndexUpdateSpec.scala

package org.littlewings.infinispan.query

import org.apache.lucene.search.{Sort, SortField}
import org.infinispan.Cache
import org.infinispan.manager.DefaultCacheManager
import org.infinispan.query.{CacheQuery, Search}
import org.scalatest.FunSpec
import org.scalatest.Matchers._

class EmbeddedClusteredIndexUpdateSpec extends FunSpec {
  describe("EmbeddedClusteredIndexUpdateSpec") {
    // ここに、テストを書く!
  }

  protected def withCache[K, V](cacheName: String, numInstances: Int = 1)(f: Cache[K, V] => Unit): Unit = {
    val managers = (1 to numInstances).map(_ => new DefaultCacheManager("infinispan.xml"))

    try {
      val caches = managers.map(_.getCache[K, V](cacheName))

      f(caches.head)

      caches.foreach(_.stop)
    } finally {
      managers.foreach(_.stop)
    }
  }
}

簡単にクラスタを構成してテストするための、ヘルパーメソッド付きです。

これらを使って、テストコードとともに動作を確認していきます。

まずは、各Cacheから更新するパターンを書いてみましょう。

最初に、Cacheをひとつ作成してデータ登録。

      withCache[String, Contents]("queryCache") { firstCache =>
        val entries1 = List(
          Contents("1", "はじめてのSpring Boot"),
          Contents("2", "高速スケーラブル検索エンジン ElasticSearch Server"),
          Contents("3", "わかりやすいJava EE ウェブシステム入門")
        )

        entries1.foreach(e => firstCache.put(e.id, e))

検索できることを確認します。

        val firstCacheQuery1 = createQuery(firstCache, classOf[Contents], "Spring")

        val firstResult1 = firstCacheQuery1.list()
        firstCacheQuery1.getResultSize should be(1)
        firstResult1.get(0) should be(Contents("1", "はじめてのSpring Boot"))

ここで、createQueryというメソッドの実装は、このようになっています。Infinipanのクエリを作成しますが、その途中でLuceneのクエリをHibernate SearchのAPIを使って組み立てます。

  protected def createQuery(cache: Cache[_, _], entityClass: Class[_], queryWord: String): CacheQuery = {
    val searchManager = Search.getSearchManager(cache)
    val query =
      searchManager
        .buildQueryBuilderForClass(entityClass)
        .get
        .keyword()
        .onField("value")
        .matching(queryWord)
        .createQuery

    searchManager
      .getQuery(query, entityClass)
      .sort(new Sort(new SortField("id", SortField.Type.INT)))
  }

2つ目のCacheを作って、検索してみます。最初のCacheで作成したエントリが、共有されていることがわかります。

        withCache[String, Contents]("queryCache") { secondCache =>
          val secondCacheQuery1 = createQuery(secondCache, classOf[Contents], "ElasticSearch")

          val secondResult1 = secondCacheQuery1.list()
          secondCacheQuery1.getResultSize should be(1)
          secondResult1.get(0) should be(Contents("2", "高速スケーラブル検索エンジン ElasticSearch Server"))

そして、このCacheに対してエントリ追加。

          val entries2 = List(
            Contents("4", "[改訂新版] Apache Solr入門 〜オープンソース全文検索エンジン"),
            Contents("5", "Beginning Java EE 6 GlassFish 3で始めるエンタープライズJava (Programmer’s SELECTION)"),
            Contents("6", "Spring3入門 ――Javaフレームワーク・より良い設計とアーキテクチャ")
          )

          entries2.foreach(e => secondCache.put(e.id, e))

Hibernate Search+Infinispanでは、この時にLuceneのインデックスの更新に失敗していましたが、こちらのパターンでは何事もなく終了します。

検索も可能です。

          val secondCacheQuery2 = createQuery(secondCache, classOf[Contents], "Apache Solr")

          val secondResult2 = secondCacheQuery2.list()
          secondCacheQuery2.getResultSize should be(1)
          secondResult2.get(0) should be(Contents("4", "[改訂新版] Apache Solr入門 〜オープンソース全文検索エンジン"))
        }

最初に作成したCacheからも、追加したエントリが見えています。

        val firstCacheQuery2 = createQuery(firstCache, classOf[Contents], "Spring")

        val resultMaster2 = firstCacheQuery2.list()
        firstCacheQuery2.getResultSize should be(2)
        resultMaster2.get(0) should be(Contents("1", "はじめてのSpring Boot"))
        resultMaster2.get(1) should be(Contents("6", "Spring3入門 ――Javaフレームワーク・より良い設計とアーキテクチャ"))
      }

OKそうですね。

MassIndexerを使ってはどうか

Hibernate Search+Infinispanの場合、MassIndexerの場合も最初のNodeしか更新できなかったので、こちらも確認しておくことにします。

InfinispanでMassIndexerを使用して、インデックスをリビルドするために以下のようなメソッドを用意しました。

  protected def rebuildIndex(cache: Cache[_, _]): Unit =
    Search.getSearchManager(cache).getMassIndexer.start()

これを使って、インデックスをリビルドしつつテストしてみます。

エントリを登録して、いきなりリビルド。

    it("rebuild index") {
      withCache[String, Contents]("queryCache") { firstCache =>
        val entries1 = List(
          Contents("1", "はじめてのSpring Boot"),
          Contents("2", "高速スケーラブル検索エンジン ElasticSearch Server"),
          Contents("3", "わかりやすいJava EE ウェブシステム入門")
        )

        entries1.foreach(e => firstCache.put(e.id, e))

        rebuildIndex(firstCache)

まあ、普通に検索できます。

        val firstCacheQuery1 = createQuery(firstCache, classOf[Contents], "Spring")

        val firstResult1 = firstCacheQuery1.list()
        firstCacheQuery1.getResultSize should be(1)
        firstResult1.get(0) should be(Contents("1", "はじめてのSpring Boot"))

2つ目のCacheから検索とエントリの追加。

        withCache[String, Contents]("queryCache") { secondCache =>
          val secondCacheQuery1 = createQuery(secondCache, classOf[Contents], "ElasticSearch")

          val secondResult1 = secondCacheQuery1.list()
          secondCacheQuery1.getResultSize should be(1)
          secondResult1.get(0) should be(Contents("2", "高速スケーラブル検索エンジン ElasticSearch Server"))

          val entries2 = List(
            Contents("4", "[改訂新版] Apache Solr入門 〜オープンソース全文検索エンジン"),
            Contents("5", "Beginning Java EE 6 GlassFish 3で始めるエンタープライズJava (Programmer’s SELECTION)"),
            Contents("6", "Spring3入門 ――Javaフレームワーク・より良い設計とアーキテクチャ")
          )

          entries2.foreach(e => secondCache.put(e.id, e))

こちらも、動作します。

2つ目のCacheでリビルド。

          rebuildIndex(secondCache)

検索。

          val secondCacheQuery2 = createQuery(secondCache, classOf[Contents], "Apache Solr")

          val secondResult2 = secondCacheQuery2.list()
          secondCacheQuery2.getResultSize should be(1)
          secondResult2.get(0) should be(Contents("4", "[改訂新版] Apache Solr入門 〜オープンソース全文検索エンジン"))
        }

最初のNodeでも検索。

        val firstCacheQuery2 = createQuery(firstCache, classOf[Contents], "Spring")

        val resultMaster2 = firstCacheQuery2.list()
        firstCacheQuery2.getResultSize should be(2)
        resultMaster2.get(0) should be(Contents("1", "はじめてのSpring Boot"))
        resultMaster2.get(1) should be(Contents("6", "Spring3入門 ――Javaフレームワーク・より良い設計とアーキテクチャ"))

      }

※この後、最初のNodeに対して更新をかけても大丈夫です

OKそうですね。

で、この差は?

とまあ、Infinispanのクエリ機能としてHibernate Searchを使い、さらにインデックスをInfinispanに保存しても、何事もなく各Cacheからインデックスの更新ができてしまいました。

この仕掛けですが、Infinispanの提供するorg.hibernate.search.indexes.spi.IndexManagerの実装が効いているみたいです。

設定としては、Cacheの設定にしていたこの部分ですね。

                <property name="default.indexmanager">org.infinispan.query.indexmanager.InfinispanIndexManager</property>

よって、IndexManagerの設定をコメントアウトしたりすると

                <!-- <property name="default.indexmanager">org.infinispan.query.indexmanager.InfinispanIndexManager</property> -->

Hibernate Search+Infinispanの時のように、最初に更新したNode以外はインデックスの更新ができなくなります。

ERROR: HSEARCH000058: Exception occurred org.apache.lucene.store.LockObtainFailedException: Lock obtain timed out: org.infinispan.lucene.locking.BaseLuceneLock@7152bd86
Primary Failure:
	Entity org.littlewings.infinispan.query.Contents  Id S:4  Work Type  org.hibernate.search.backend.UpdateLuceneWork

org.apache.lucene.store.LockObtainFailedException: Lock obtain timed out: org.infinispan.lucene.locking.BaseLuceneLock@7152bd86
	at org.apache.lucene.store.Lock.obtain(Lock.java:89)
	at org.apache.lucene.index.IndexWriter.<init>(IndexWriter.java:755)
	at org.hibernate.search.backend.impl.lucene.IndexWriterHolder.createNewIndexWriter(IndexWriterHolder.java:131)
	at org.hibernate.search.backend.impl.lucene.IndexWriterHolder.getIndexWriter(IndexWriterHolder.java:97)
	at org.hibernate.search.backend.impl.lucene.AbstractWorkspaceImpl.getIndexWriter(AbstractWorkspaceImpl.java:112)
	at org.hibernate.search.backend.impl.lucene.LuceneBackendQueueTask.applyUpdates(LuceneBackendQueueTask.java:81)
	at org.hibernate.search.backend.impl.lucene.LuceneBackendQueueTask.run(LuceneBackendQueueTask.java:47)
	at org.hibernate.search.backend.impl.lucene.SyncWorkProcessor$Consumer.applyChangesets(SyncWorkProcessor.java:145)
	at org.hibernate.search.backend.impl.lucene.SyncWorkProcessor$Consumer.run(SyncWorkProcessor.java:135)
	at java.lang.Thread.run(Thread.java:745)

覚えておきましょう。

ちなみに、先ほど設定していたIndexManagerやDirectoryProviderの設定ですが、(Distributed Cacheの場合は)auto-config=trueと同じ意味だったりします。

        <distributed-cache name="queryCache">
            <indexing index="LOCAL" auto-config="true">
                <property name="analyzer">org.apache.lucene.analysis.ja.JapaneseAnalyzer</property>
                <property name="lucene_version">LUCENE_4_10_4</property>
            </indexing>
        </distributed-cache>

auto-configは、Cacheの種類によって定義が変わります。詳細は、こちらのドキュメントを参照してください。

14.3.2. Automatic configuration
http://infinispan.org/docs/7.2.x/user_guide/user_guide.html#_automatic_configuration

まとめ

というわけで、Infinispan+Hibernate Search…InfinispanのクエリとしてHibernate Searchを使う場合は、IndexManagerの設定さえ気をつければクラスタ構成でもインデックス更新が可能なことがわかりました。

Infinispanの提供するIndexManager、Hibernate Searchで使う場合は、いきなり設定してもダメだったので…使うことを想定してないんでしょうかね…。

とりあえず、またひとつ理解が進みました、と。

今回作成したコードは、こちらに置いています。

https://github.com/kazuhira-r/infinispan-getting-started/tree/master/embedded-clustered-update-index

ちょっと気になること

Infinispanのクエリを使う時、SearchManager#getQueryでCacheQueryを取得するのですが

https://github.com/infinispan/infinispan/blob/7.2.3.Final/query/src/main/java/org/infinispan/query/SearchManager.java#L36

たぶん、これ単一Nodeでの検索な気がします。

ドキュメントには記載がありませんが、SearchManager#getClusteredQueryなるものが増えています。

https://github.com/infinispan/infinispan/blob/7.2.3.Final/query/src/main/java/org/infinispan/query/SearchManager.java#L61

パッと見た感じ、分散検索してくれそうな気もするんですけど、どうでしょう?

https://github.com/infinispan/infinispan/blob/7.2.3.Final/query/src/main/java/org/infinispan/query/clustered/ClusteredCacheQueryImpl.java#L130

そのうち、見てみましょう