CLOVER🍀

That was when it all began.

InfinispanのRemote Queryを試してみる

Infinispan 6.0.0から追加されたHot Rod ClientでのRemote Queryですが、以前はProtocol Buffersが必要だったこともあり、ちょっと敬遠していました。

で、最近になってまずはProtocol Buffersを使ってみたので、今度はRemote Queryを使ってみることにしました。

そもそもRemote Queryとは?というところですが、InfinispanのHot Rod Clientから、クエリを投げるためのモジュールです。背後にはやっぱり、Hiberneate Search&Luceneがいます。Hot Rod自体はJava以外のクライアントの提供も行っているようなので、現在はJavaのみですが今後は対象を広げていきたいみたい?

9.4.10. Querying via the Java Hot Rod client
http://infinispan.org/docs/6.0.x/user_guide/user_guide.html#_querying_via_the_java_hot_rod_client

いや、まあそれはもうハマりましたけど。

なお、サンプルコードとしては、ドキュメントの他に以下を参考にしています。

remote-query: Use JDG remotely through Hotrod
https://github.com/jboss-developer/jboss-jdg-quickstarts/tree/master/remote-query

https://github.com/infinispan/infinispan/blob/6.0.0.Final/client/hotrod-client/src/test/java/org/infinispan/client/hotrod/query/HotRodQueryTest.java

https://github.com/infinispan/protostream/tree/master/sample-domain-implementation

Infinispan Serverの準備

まずは、Infinispan Serverの準備から。普段はEmbedded Cacheばかり使っているので、Serverモジュールはなかなか慣れませんが、WildFlyJBoss ASのInfinispan Subsystemの説明を見ながら設定。

Infinispan Subsystem
https://docs.jboss.org/author/display/WFLY8/Infinispan+Subsystem

とりあえず、今回はこんな形で設定しました。
infinispan-server-6.0.0.Final/standalone/configuration/standalone.xml

                <local-cache name="indexingCache" start="EAGER">
                    <indexing index="LOCAL">
                      <property name="default.directory_provider">ram</property>
                      <property name="lucene_version">LUCENE_36</property>
                    </indexing>
                </local-cache>

インデックスはメモリ上での保持、クラスタリングは使用しません。

あと、ここの記述ですが

                    <indexing index="LOCAL">

indexの値には、「LOCAL」、「ALL」、「NONE」が取れるみたいです。indexLocalOnlyと同じようなものでしょうか?

設定したら、Infinispan Serverを起動しておいてください。

$ infinispan-server-6.0.0.Final/bin/standalone.sh

Hot Rod Client側の実装

では、続いてHot Rod Client側のコードを書いていきます。

まずは、依存関係の定義。
build.sbt

name := "infinispan-remote-query"

version := "0.0.1-SNAPSHOT"

scalaVersion := "2.10.3"

organization := "org.littlewings"

resolvers += "JBoss Public Maven Repository Group" at "http://repository.jboss.org/nexus/content/groups/public-jboss/"

scalacOptions ++= Seq("-Xlint", "-deprecation", "-unchecked")

{
  val infinispanVersion = "6.0.0.Final"
  libraryDependencies ++= Seq(
    "org.infinispan" % "infinispan-client-hotrod" % infinispanVersion excludeAll(
      ExclusionRule(organization = "org.jboss.marshalling", name = "jboss-marshalling-river"),
      ExclusionRule(organization = "org.jboss.logging", name = "jboss-logging")
    ),
    "org.infinispan" % "infinispan-query-dsl" % infinispanVersion,
    //"org.infinispan" % "infinispan-remote-query-client" % infinispanVersion,
    "org.infinispan" % "infinispan-remote-query-server" % infinispanVersion,
    "org.jboss.marshalling" % "jboss-marshalling-river" % "1.3.18.GA",
    "org.jboss.logging" % "jboss-logging" % "3.1.2.GA",
    "net.jcip" % "jcip-annotations" % "1.0",
    "org.jboss.remotingjmx" % "remoting-jmx" % "2.0.0.Final",
    "org.scalatest" %% "scalatest" % "2.0"
  )
}

いろいろあって、けっこうすごいことになりました。「infinispan-remote-query-server」がないと、Remote Queryは使用できません。ClientもServerも要るのね。
*ここで、JBossMavenリポジトリを参照する必要があります
*Remote QueryのClientアーティファクトは、Serverを付けると一緒についてきます

JMXを使用する必要があるのですが、今回はJBossの「remoting-jmx」というモジュールを追加しています。Infinispan 6.0.0 Serverに同梱されているのは、もうちょっと低いバージョンでしたがまあいいかぁと。

また、Infinispanの現時点での最新安定版は6.0.1.Finalなのですが、Serverモジュールは6.0.0.Finalしかダウンロードできないので、今回は6.0.0.Finalとしました。

インデックスに登録するクラス

では、インデックスに登録するクラスを作成します。何の変哲もない、Javaクラス(ここではScalaクラスですが…)を作成します。
src/main/scala/org/littlewings/infinispan/remotequery/Book.scala

package org.littlewings.infinispan.remotequery

import scala.beans.BeanProperty

import java.util.Objects

object Book {
  def apply(isbn: String, title: String, price: Int, summary: String): Book = {
    val book = new Book
    book.isbn = isbn
    book.title = title
    book.price = price
    book.summary = summary
    book
  }
}

class Book {
  @BeanProperty
  var isbn: String = _

  @BeanProperty
  var title: String = _

  @BeanProperty
  var price: Int = _

  @BeanProperty
  var summary: String = _

  override def hashCode: Int =
    Objects.hash(isbn, title, Integer.valueOf(price), summary)

  override def equals(other: Any): Boolean = other match {
    case o: Book =>
      isbn == o.isbn && title == o.title &&
      price == o.price && summary == o.summary
    case _ => false
  }

  override def toString: String =
    s"isbn = $isbn, title = $title, price = $price, summary = $summary"
}

BeanPropertyは、もしかして要らないかも…。

Embedded Cacheの時とは違い、Hibernate Searchのアノテーションは使用しません。

代わりに、Marshallerを実装する必要があります。
src/main/scala/org/littlewings/infinispan/remotequery/BookMarshaller.scala

package org.littlewings.infinispan.remotequery

import java.io.IOException

import org.infinispan.protostream.MessageMarshaller

class BookMarshaller extends MessageMarshaller[Book] {
  override def getTypeName: String =
    "example_book.Book"

  override def getJavaClass: Class[_ <: Book] =
    classOf[Book]

  @throws(classOf[IOException])
  override def readFrom(reader: MessageMarshaller.ProtoStreamReader): Book = {
    val isbn = reader.readString("isbn")
    val title = reader.readString("title")
    val price = reader.readInt("price")
    val summary = reader.readString("summary")

    val book = new Book
    book.isbn = isbn
    book.title = title
    book.price = price
    book.summary = summary
    book
  }

  @throws(classOf[IOException])
  override def writeTo(writer: MessageMarshaller.ProtoStreamWriter, book: Book): Unit = {
    writer.writeString("isbn", book.isbn)
    writer.writeString("title", book.title)
    writer.writeInt("price", book.price)
    writer.writeString("summary", book.summary)
  }
}

ProtoStreamReader/Writerから、インスタンス変数の値を読み出したり、書き出す処理を自分で実装する必要があります。

ところで、このProtoStreamというのは、Protocol Buffersのことだと思いますが、Infinispanに別モジュールが存在しています。

protostream
https://github.com/infinispan/protostream

Remote Queryを使うと、ProtoStreamに対しての依存関係も追加されるので、意識することはないと思います。

なお、ここで作成したMarshallerが定義しているこのメソッドの内容ですが

  override def getTypeName: String =
    "example_book.Book"

この後説明する、Protocol BuffersのIDLが関係してきます。

Protocol BuffersのIDLを書く

先ほどインデックスに登録するBookというクラスを作成しましたが、Remote Queryを使う際にはProtocol BuffersのIDLを書く必要があります。

ただ、Javaクラスを生成する必要はありません(そのためのMarshallerなのだと思いますが)。

で、今回このようなIDLを用意しました。
src/main/resources/book.proto

package example_book;

option java_package = "org.littlewings.infinispan.generated_by_proto";
option java_outer_classname = "BookProtos";

message Book {
    required string isbn = 1;
    required string title = 2;
    required int32 price = 3;
    required string summary = 4;
}

前述のBookMarshallerで

  override def getTypeName: String =
    "example_book.Book"

のような記述がありましたが、これはIDLでのpackageと

package example_book;

メッセージ定義の名前が関係していますね。

message Book {

Javaソースコードの自動生成はしないので、「java_package」とか「java_outer_classname」は関係ないみたいです。

Protocol BuffersのIDLをコンパイルする

Protocol BuffersのIDLからJavaソースコードは生成しませんが、代わりに「.proto」ファイルから以降の処理で使うファイルに変換する必要があります。

$ protoc --descriptor_set_out=src/main/resources/book.protobin src/main/resources/book.proto

これで、「book.proto」を入力として、「book.protobin」ファイルが生成されます。

Remote Queryで本当に必要なのは、このprotobinファイルになります。

Remote Queryを試してみる

ここまでに作成したインデキシング対象のクラスと、Protocol Buffersのファイルを利用してHot Rod ClientでRemote Queryを使用してみます。
src/test/scala/org/littlewings/infinispan/remotequery/RemoteQuerySpec.scala

package org.littlewings.infinispan.remotequery

import javax.management.{MBeanServerFactory, ObjectName}
import javax.management.remote.{JMXConnector, JMXConnectorFactory, JMXServiceURL}

import org.infinispan.client.hotrod.{RemoteCache, RemoteCacheManager, Search}
import org.infinispan.client.hotrod.configuration.ConfigurationBuilder
import org.infinispan.client.hotrod.impl.ConfigurationProperties
import org.infinispan.client.hotrod.marshall.ProtoStreamMarshaller
import org.infinispan.query.dsl.{Query, SortOrder}
import org.infinispan.query.remote.ProtobufMetadataManager
import org.infinispan.commons.util.Util

import org.scalatest.FunSpec
import org.scalatest.Matchers._

class RemoteQuerySpec extends FunSpec {
  val javaee6Book: Book = Book("978-4798124605",
                               "Beginning Java EE 6 GlassFish 3で始めるエンタープライズJava",
                               4410,
                               "エンタープライズJava入門書の決定版!Java EE 6は、大規模な情報システム構築に用いられるエンタープライズ環境向けのプログラミング言語です。")

  val javaee5Book: Book = Book("978-4798120546",
                               "マスタリングJavaEE5 第2版",
                               5670,
                               "EJB3.0、JPA、JSF、Webサービスを完全網羅。新たにJBoss AS、Hibernateにも対応!JavaEE5は、J2EEの高い機能性はそのままに、アプリケーションの開発生産性を高めることを主眼とした、サーバサイドJavaにおけるプラットフォーム、開発、デプロイメントに関する標準仕様です。")

  val jaxrsBook: Book = Book("978-4873114675",
                             "JavaによるRESTfulシステム構築",
                             3360,
                             "Java EE 6でサポートされたJAX-RSの特徴とRESTfulアーキテクチャ原則を使って、Javaでの分散Webサービスを設計開発する方法を学ぶ書籍。")

  val luceneBook: Book = Book("978-4774127804",
                              "Apache Lucene 入門 Java・オープンソース・全文検索システムの構築",
                              3360,
                              "Luceneは全文検索システムを構築するためのJavaのライブラリです。Luceneを使えば,一味違う高機能なWebアプリケーションを作ることができます。")

  val solrBook: Book = Book("978-4774161631",
                            "[改訂新版] Apache Solr入門 オープンソース全文検索エンジン",
                            3780,
                            "最新版Apaceh Solr Ver.4.5.1に対応するため大幅な書き直しと原稿の追加を行い、現在の開発環境に合わせて完全にアップデートしました。Apache Solrは多様なプログラミング言語に対応した全文検索エンジンです。")

  val books: Array[Book] = Array(javaee6Book,
                                 javaee5Book,
                                 jaxrsBook,
                                 luceneBook,
                                 solrBook)

  describe("remote query spec") {
    // ここに、テストを書く!
  }
}

何やら、import文がわらわらと付いている感じですね。

RemoteCacheManagerを何度も起動することになるので、ヘルパーメソッドを作成します。

  private def withCache(fun: RemoteCache[String, Book] => Unit): Unit = {
    val manager =
      new RemoteCacheManager(
        new ConfigurationBuilder()
          .addServer
          .host("localhost")
          .port(11222)
          .marshaller(new ProtoStreamMarshaller)
          .build
      )

    try {
      val cache = manager.getCache[String, Book]("indexingCache")
      fun(cache)
      cache.stop()
    } finally {
      manager.stop()
    }
  }

ここでのポイントは、ProtoStreamMarshallerを設定に追加しているところですね。

          .marshaller(new ProtoStreamMarshaller)

データ登録。

    it("put data") {
      withCache { cache =>
        registerProtofileToServer("/book.protobin")

        val context =
          ProtoStreamMarshaller.getSerializationContext(cache.getRemoteCacheManager)

        context.registerProtofile("/book.protobin")
        context.registerMarshaller(classOf[Book], new BookMarshaller)

        books.foreach(b => cache.put(b.isbn, b))
      }
    }

最初にある

        registerProtofileToServer("/book.protobin")

の部分ですが、これは先ほど作成したIDLから作られたバイナリファイルです。

これを、JMXオペレーションでInfinispan Serverに登録します。

  private def registerProtofileToServer(filePath: String): Unit = {
    val is = getClass.getResourceAsStream(filePath)

    val descriptor = 
      try {
        Util.readStream(is)
      } finally {
        is.close()
      }

    val serviceUrl = new JMXServiceURL(s"service:jmx:remoting-jmx://localhost:9999")
    val jmxConnector = JMXConnectorFactory.connect(serviceUrl)

    try {
      val jmxConnection = jmxConnector.getMBeanServerConnection()

      val jmxDomain = "jboss.infinispan"
      val objectName =
        new ObjectName(s"$jmxDomain:type=RemoteQuery,name=${ObjectName.quote("local")},component=${ProtobufMetadataManager.OBJECT_NAME}")

      jmxConnection.invoke(objectName,
                           "registerProtofile",
                           Array[AnyRef](descriptor),
                           Array(classOf[Array[Byte]].getName))

    } finally {
      jmxConnector.close()
    }
  }

ここで、JMXへの接続URLがこんな感じで書かれていますが、

    val serviceUrl = new JMXServiceURL(s"service:jmx:remoting-jmx://localhost:9999")

これを呼び出すために、sbtに以下の依存関係を追加していました。

    "org.jboss.remotingjmx" % "remoting-jmx" % "2.0.0.Final",

この後、ProtoStreamMarshallerを使って個々のMarshallerを登録していくのですが、先にこの処理をした方がよいみたいです。最初、「ProtoStreamMashaller→JMX登録」の順番でやっていたら、見事にハマりました。

続いて、RemoteCacheManagerとProtoStreamMarshallerを使ってSerializationContextを取得、protobinファイルと先ほど作成したBookクラスのClassクラス、BookMarshallerクラスのインスタンスを登録します。

        val context =
          ProtoStreamMarshaller.getSerializationContext(cache.getRemoteCacheManager)

        context.registerProtofile("/book.protobin")
        context.registerMarshaller(classOf[Book], new BookMarshaller)

こちら側はClientサイドになりますが、ここでもprotobinファイルを登録する必要があります。

データ登録は、まあ普通に。

        books.foreach(b => cache.put(b.isbn, b))

取得も同じことをやるだけですね。

    it("get data") {
      withCache { cache =>
        registerProtofileToServer("/book.protobin")

        val context =
          ProtoStreamMarshaller.getSerializationContext(cache.getRemoteCacheManager)

        context.registerProtofile("/book.protobin")
        context.registerMarshaller(classOf[Book], new BookMarshaller)

        cache.get(solrBook.isbn) should be (solrBook)
      }
    }

では、検索です。

簡単なサンプルとしては、こんな感じです。

    it("query #1") {
      withCache { cache =>
        registerProtofileToServer("/book.protobin")

        val context =
          ProtoStreamMarshaller.getSerializationContext(cache.getRemoteCacheManager)

        context.registerProtofile("/book.protobin")
        context.registerMarshaller(classOf[Book], new BookMarshaller)

        val queryFactory = Search.getQueryFactory(cache)

        val query: Query =
          queryFactory
            .from(classOf[Book])
            .having("isbn")
            .in("978-4798124605")
            .toBuilder
            .build

        val books = query.list[Book]
        books should have size 1
        books.get(0) should be (javaee6Book)
      }
    }

このQueryを組んでいる部分、以前ご紹介したInfinispanのQuery DSLと同じAPIが使用できます。

InfinispanのQuery DSLを試してみる
http://d.hatena.ne.jp/Kazuhira/20131215/1387107464

QueryFactoryを取得するクラスが、org.infinispan.client.hotrod.Searchということくらいですね。

その他のサンプルも。

LIKE。

    it("query #2") {
      withCache { cache =>
        registerProtofileToServer("/book.protobin")

        val context =
          ProtoStreamMarshaller.getSerializationContext(cache.getRemoteCacheManager)

        context.registerProtofile("/book.protobin")
        context.registerMarshaller(classOf[Book], new BookMarshaller)

        val queryFactory = Search.getQueryFactory(cache)

        val query: Query =
          queryFactory
            .from(classOf[Book])
            .having("summary")
            .like("%Lucene%")
            .toBuilder
            .build

        val books = query.list[Book]
        books should have size 1
        books.get(0) should be (luceneBook)
      }
    }

ORDER BYとAND。

    it("query #3") {
      withCache { cache =>
        registerProtofileToServer("/book.protobin")

        val context =
          ProtoStreamMarshaller.getSerializationContext(cache.getRemoteCacheManager)

        context.registerProtofile("/book.protobin")
        context.registerMarshaller(classOf[Book], new BookMarshaller)

        val queryFactory = Search.getQueryFactory(cache)

        val query: Query =
          queryFactory
            .from(classOf[Book])
            .orderBy("price", SortOrder.DESC)
            .having("title")
            .like("%Java%")
            .and
            .having("price")
            .between(3500, 6000)
            .toBuilder
            .build

        val books = query.list[Book]
        books should have size 2
        books should contain theSameElementsInOrderAs Array(javaee5Book, javaee6Book)
      }
    }

けっこう苦労したのですが、動かせてよかったです。

今回作成したコードは、以下にアップしています。

https://github.com/kazuhira-r/infinispan-examples/tree/master/infinispan-remote-query