CLOVER🍀

That was when it all began.

Infinispan(Server)の検索機能を使ってみる

Embedded Modeの検索機能に続き、Server Moduleの検索機能を使ってみます。Remote Queryというやつですね。

13.4.10. Querying via the Java Hot Rod client
http://infinispan.org/docs/7.0.x/user_guide/user_guide.html#_querying_via_the_java_hot_rod_client

ドキュメント以外に参考にしたのは、JBoss Data GridのQuickstartsです。というか、これがないとムリです。ドキュメントだけだと使い方がわかりません…。

https://github.com/jboss-developer/jboss-jdg-quickstarts/tree/master/remote-query

このRemote Queryで使われる要素は、LuceneHibernate Search、InfinispanのQuery DSL、そしてProtocol Buffersですね。

Hot Rod Clientで、検索を行うためのAPIで、Infinispan 6.0から登場しました。

Infinispan 6.0までは、Protocol BuffersのIDLを書いて、コンパイルしてサーバーJMX操作で登録…とかやっていたのですが、7.0からはサーバー側でIDLのコンパイルが可能になったようです。

今回はその機能も利用してみます。

依存関係の定義

まずは、Remote Queryを使うための依存関係を含めたビルドファイルを定義します。
build.sbt

name := "remote-query"

version := "0.0.1-SNAPSHOT"

scalaVersion := "2.11.4"

organization := "org.littlewings"

scalacOptions ++= Seq("-Xlint", "-unchecked", "-deprecation", "-feature")

updateOptions := updateOptions.value.withCachedResolution(true)

fork in Test := true

val infinispanVersion = "7.0.2.Final"
libraryDependencies ++= Seq(
  "org.infinispan" % "infinispan-client-hotrod" % infinispanVersion,
  "org.infinispan" % "infinispan-remote-query-client" % infinispanVersion,
  "org.infinispan" % "infinispan-query-dsl" % infinispanVersion,
  "net.jcip" % "jcip-annotations" % "1.0" % "provided",
  "org.scalatest" %% "scalatest" % "2.2.3" % "test"
)

Infinispanの各モジュールへの依存関係は、全部必要です。どれかを指定すれば、他のものも引っ張ってきてくれるというわけではなさそうです。Uber JARsを使えばいいという話もありますが。

Cacheに登録するクラス

それでは、コードを作成していきます。まずは、Cacheに登録する(検索対象となるクラス)を定義します。お題は書籍で。
src/main/scala/org/littlewings/infinispan/query/Book.scala

package org.littlewings.infinispan.query

object Book {
  def apply(isbn: String, title: String, price: Int, summary: String): Book = {
    val book = new Book
    book.isbn = isbn
    book.title = title
    book.price = price
    book.summary = summary
    book
  }
}

class Book {
  var isbn: String = _
  var title: String = _
  var price: Int = _
  var summary: String = _
}

後でMarshallerを書くため、Serializableですらありません。また、Scalaで書いていますがJavaBeansである必要もありません。コンパニオンオブジェクトがあるのは、ただの利便性のためだけです。

Procol BuffersのIDLを書く

上記で作成したBookクラスに対応する、Protocol BuffersのIDLを作成します。
src/main/resources/book.proto

package remote_query;

message Book {
    required string isbn = 1;
    required string title = 2;
    required int32 price = 3;
    required string summary = 4;
}

Marshallerを作成する

BookクラスとIDLをつなぎ合わせるための、Marshallerを作成します。
src/main/scala/org/littlewings/infinispan/query/BookMarshaller.scala

package org.littlewings.infinispan.query

import java.io.IOException

import org.infinispan.protostream.MessageMarshaller

class BookMarshaller extends MessageMarshaller[Book] {
  override def getTypeName: String = "remote_query.Book"

  override def getJavaClass: Class[_ <: Book] = classOf[Book]

  @throws(classOf[IOException])
  override def readFrom(reader: MessageMarshaller.ProtoStreamReader): Book = {
    val isbn = reader.readString("isbn")
    val title = reader.readString("title")
    val price = reader.readInt("price")
    val summary = reader.readString("summary")
    Book(isbn, title, price, summary)
  }

  @throws(classOf[IOException])
  override def writeTo(writer: MessageMarshaller.ProtoStreamWriter, book: Book): Unit = {
    writer.writeString("isbn", book.isbn)
    writer.writeString("title", book.title)
    writer.writeInt("price", book.price)
    writer.writeString("summary", book.summary)
  }
}

IDLの定義に合わせて、Bookの内容をMarshall/Unmarshallするコードを書きます。

getTypeNameメソッドで返却する値は、先のIDLの「パッケージ名.メッセージ名」になります。

  override def getTypeName: String = "remote_query.Book"

getJavaClassメソッドでは、ここではBookクラスのClassクラスを返却すればOKです。

テストコード

それでは、動作確認用のテストコードを書いてみます。
src/test/scala/org/littlewings/infinispan/query/QuerySpec.scala

package org.littlewings.infinispan.query

import scala.io.Source

import org.infinispan.client.hotrod.{ RemoteCacheManager, Search }
import org.infinispan.client.hotrod.configuration.ConfigurationBuilder
import org.infinispan.client.hotrod.marshall.ProtoStreamMarshaller
import org.infinispan.protostream.FileDescriptorSource
import org.infinispan.query.dsl.{ Query, SortOrder }

import org.scalatest.FunSpec
import org.scalatest.Matchers._

class QuerySpec extends FunSpec {
  private def sourceBooks: Seq[Book] =
    Array(
      Book("978-4798042169",
        "わかりやすいJavaEEウェブシステム入門",
        3456,
        "JavaEE7準拠。ショッピングサイトや業務システムで使われるJavaEE学習書の決定版!"),
      Book("978-4798124605",
        "Beginning Java EE 6 GlassFish 3で始めるエンタープライズJava",
        4410,
        "エンタープライズJava入門書の決定版!Java EE 6は、大規模な情報システム構築に用いられるエンタープライズ環境向けのプログラミング言語です。"),
      Book("978-4774127804",
        "Apache Lucene 入門 〜Java・オープンソース・全文検索システムの構築",
        3200,
        "Luceneは全文検索システムを構築するためのJavaのライブラリです。Luceneを使えば,一味違う高機能なWebアプリケーションを作ることができます。"),
      Book("978-4774161631",
        "[改訂新版] Apache Solr入門 オープンソース全文検索エンジン",
        3780,
        "最新版Apaceh Solr Ver.4.5.1に対応するため大幅な書き直しと原稿の追加を行い、現在の開発環境に合わせて完全にアップデートしました。Apache Solrは多様なプログラミング言語に対応した全文検索エンジンです。"),
      Book("978-4048662024",
        "高速スケーラブル検索エンジン ElasticSearch Server",
        3024,
        "Apache Solrを超える全文検索エンジンとして注目を集めるElasticSearch Serverの日本初の解説書です。多くのサンプルを用いた実践的入門書になっています。"),
      Book("978-1933988177",
        "Lucene in Action",
        6301,
        "New edition of top-selling book on the new version of Lucene. the coreopen-source technology behind most full-text search and Intelligent Web applications.")
    )

  describe("Infinispan Remote Query Spec") {
    it("Search") {
      // ここに、テストコードを書く!
    }
  }
}

テストはひとつだけ用意しました。

では、順に見ていきましょう。

まずは、Infinispan Serverに接続するためのRemoteCacheManagerのインスタンスの作成。この時、Configurationのmarshallerの設定にProtoStreamMarshallerを追加します。

      val manager =
        new RemoteCacheManager(
          new ConfigurationBuilder()
            .addServer
            .host("localhost")
            .port(11222)
            .marshaller(new ProtoStreamMarshaller)
            .build
        )

      try {
        // 初期化や検索など
      } finally {
        manager.stop()
      }

ひととおりの処理が終わったら、RemoteCacheManager#stopを呼び出して終了です。

try〜finallyの中を掘り下げます。

とりあえず、メインで操作するCacheを取得しておきます。このCacheの定義は、インデキシングを有効にしたもので、作成内容については後述します。

        val cache = manager.getCache[String, Book]("indexingCache")

次に、Procol Buffers用のメタデータ用のCacheを取得し、ここに先ほど作成したIDLの内容を放り込みます。

        // ここはServerが起動している間、1回だけでよさそう
        val metaCache = manager.getCache[String, String]("___protobuf_metadata")
        metaCache.put("/book.proto",
          Source.fromInputStream(getClass.getResourceAsStream("/book.proto"), "UTF-8").mkString)
        val errors = metaCache.get(".errors")
        if (errors != null) {
          throw new IllegalStateException("Some files contain errors: " + errors)
        }

この部分ですが、ドキュメントに記載がありません。参考として記載したQuickstartsを見て、必要なことを知りました。なお、「.erros」というキーでエラー情報が取得できるようですが、ここにどんな値が入るのかはわかってないです…。

ちなみに、Infinispan 6.0の頃は、ここでコンパイル済みのProtol Buffersの定義をJMXで放り込んでいました。

このオペレーションは、Infinispan Serverに対して1度(複数のHot Rod Clientからつなぐ場合は、誰かひとり)実行すればいいみたいですが、毎回やってもいいのではないかと思うのですが…どうなんでしょう。

次に、SerializationContextというものを取得し、作成したMarshallerのインスタンスを登録します。

        // ここは、接続ごとに毎回必要
        val context = ProtoStreamMarshaller.getSerializationContext(manager)
        context.registerProtoFiles(FileDescriptorSource.fromResources("/book.proto"))
        context.registerMarshaller(new BookMarshaller)

ここでも、IDLのパスが必要です…。かつ、接続毎にこのオペレーションは実行する必要がありそうです。

ここまでで、下準備は完成です。

あとは、データを登録して

        sourceBooks.foreach(b => cache.put(b.isbn, b))

SearchクラスよりQueryFactoryを取得します。このQueryFactoryを使用して、クエリを組み立てて実行します。

        val queryFactory = Search.getQueryFactory(cache)
        val query: Query =
          queryFactory
            .from(classOf[Book])
            .having("title")
            .like("%Java%")
            .and
            .having("title")
            .like("%全文検索%")
            .toBuilder
            .orderBy("price", SortOrder.ASC)
            .build

        query.getResultSize should be (1)

        val books = query.list.asInstanceOf[java.util.List[Book]]

        books should have size 1
        books.get(0).title should be ("Apache Lucene 入門 〜Java・オープンソース・全文検索システムの構築")

これでOKです。この後、finallyでRemoteCacheManager#stopにつながる、と読んでください。

クエリを組み立てるためのAPIは、Embedded Modeと同じQuery DSLのものを使用します。

Cacheの設定

残すは、Infinispan ServerのCacheの設定ですね。ここでは、インデキシング用のCacheを新規に作成するものとします。

Infinispan Serverを起動します。

$ infinispan-server-7.0.2.Final/bin/standalone.sh

管理CLIで接続します。

$ infinispan-server-7.0.2.Final/bin/jboss-cli.sh -c

インデキシングを有効にしたCacheを登録します。名前は、「indexingCache」です。

[standalone@localhost:9990 /] /subsystem=infinispan/cache-container=local/local-cache=indexingCache:add(start=EAGER, indexing=LOCAL, indexing-properties={default.directory_provider=ram, lucene_version=LUCENE_4_10_2})
{"outcome" => "success"}

ここでは、local-cacheでインデックスの保存先は、RAM(メモリ)とします。

また、start属性はEAGERにするのがポイントで、これを指定しておかないとうまくいきません。

起動中のログに、以下のような内容が表れればOKです。

23:32:40,298 INFO  [org.hibernate.search.backend.impl.lucene.BatchSyncProcessor] (MSC service thread 1-1) HSEARCH000230: Starting sync consumer thread for index 'org.infinispan.query.remote.indexing.ProtobufValueWrapper'

以前に使った時も、「EAGERにしなくてはいけない」というのにハマったはずなのですが、また踏んでしまいました…。

なお、ここがうまくいっていない(EAGERにしていなかった)場合は、こんな例外がスローされます。

org.infinispan.client.hotrod.exceptions.HotRodClientException: org.hibernate.search.exception.SearchException: Can't build query for type org.infinispan.query.remote.indexing.ProtobufValueWrapper which is neither indexed nor has any indexed sub-types.

こんな感じで、Hot Rod ClientでRemote Queryが使えますよ、と。それにしても、これに毎度ハマっているような気がします…。

今回作成したコードは、こちらに置いています。
https://github.com/kazuhira-r/infinispan-getting-started/tree/master/remote-query