CLOVER🍀

That was when it all began.

InfinispanのContinuous Query(over Hot Rod)を試す

Infinispanに8.0.0.Finalから搭載されているContinuous Queryですが、Hot Rod(Client/Server Mode)でも使うことができます。

EmbeddedでContinuous Queryは試したことがありましたが、Hot Rodではまだ試していなかったので、今回使ってみることにしました。

Continuous Queryとは?

平たく言うとListenerなのですが、イベント通知時の条件をQueryとして登録しておき、そのQueryにマッチする/しないが切り替わる時にイベントを受信することができます。
※現在のInfinispanのContinous Queryでは、Queryにマッチし続けるような更新については検知しないのですが、9.0になると可能になりそうな感じです

準備と利用するための前提

まずは、ビルド定義から。
build.sbt

name := "remote-continuous-query"

organization := "org.littlewings"

version := "0.0.1-SNAPSHOT"

scalaVersion := "2.11.8"

updateOptions := updateOptions.value.withCachedResolution(true)

scalacOptions ++= Seq("-Xlint", "-deprecation", "-unchecked", "-feature")

parallelExecution in Test := false

libraryDependencies ++= Seq(
  "org.infinispan" % "infinispan-client-hotrod" % "8.2.4.Final" % Compile,
  "org.infinispan" % "infinispan-query-dsl" % "8.2.4.Final" % Compile,
  "org.infinispan" % "infinispan-remote-query-client" % "8.2.4.Final" % Compile,
  "org.infinispan" % "infinispan-server-hotrod" % "8.2.4.Final" % Test,
  "org.infinispan" % "infinispan-remote-query-server" % "8.2.4.Final" % Test,
  "net.jcip" % "jcip-annotations" % "1.0" % Provided,
  "org.scalatest" %% "scalatest" % "3.0.0" % Test
)

Hot RodでのContinuous Queryを使うのに必要なモジュールは、こちらです。

  "org.infinispan" % "infinispan-client-hotrod" % "8.2.4.Final" % Compile,
  "org.infinispan" % "infinispan-query-dsl" % "8.2.4.Final" % Compile,
  "org.infinispan" % "infinispan-remote-query-client" % "8.2.4.Final" % Compile,

Server系のものも入っていますが、こちらはテスト時にHot Rod ServerをEmbeddedに起動するために使うので、Continuous Queryとはあまり関係ないです。なお、EmbeddedにHot Rod Serverを使う場合は、infinispan-remote-query-serverを入れておかないとContinuous Queryが使えません。

  "org.infinispan" % "infinispan-server-hotrod" % "8.2.4.Final" % Test,
  "org.infinispan" % "infinispan-remote-query-server" % "8.2.4.Final" % Test,

また、Hot RodでContinuous Queryを使うための注意点として、Protocol Buffersの利用が前提となります。なので、登録するEntityクラスに対してのProtocol BuffersのIDLが必要です。

InfinispanにはEntityからアノテーションでProtocol BufferesのIDLを生成する機能があるので、今回はそちらを利用しました。

というか、Hot RodのQuery系でProtocol Buffersが必要なのいつもコロッと忘れていて、その度に踏んでいるのですが…ええ。

さて、あとはコードを書いてきましょう。

Entity

今回のお題は、書籍とします。@ProtoMessageとか@ProtoFieldとか付いているのが、Protocol BufferesのIDLを生成するためのInfinispanのアノテーションです。
src/test/scala/org/littlewings/infinispan/remotecq/Book.scala

package org.littlewings.infinispan.remotecq

import org.infinispan.protostream.annotations.{ProtoField, ProtoMessage}
import org.infinispan.protostream.descriptors.Type

object Book {
  def apply(isbn: String, title: String, price: Int): Book = {
    val book = new Book

    book.isbn = isbn
    book.title = title
    book.price = price

    book
  }
}

@ProtoMessage(name = "Book")
class Book extends Serializable {
  private[remotecq] var isbn: String = _
  private[remotecq] var title: String = _
  private[remotecq] var price: Int = _

  @ProtoField(number = 1, name = "isbn", required = true, `type` = Type.STRING)
  def getIsbn: String = isbn

  def setIsbn(isbn: String): Unit = this.isbn = isbn

  @ProtoField(number = 2, name = "title", required = true, `type` = Type.STRING)
  def getTitle: String = title

  def setTitle(title: String): Unit = this.title = title

  @ProtoField(number = 3, name = "price", required = true, defaultValue = "0", `type` = Type.INT32)
  def getPrice: Int = price

  def setPrice(price: Int): Unit = this.price = price
}

テストコードの雛形

テストコード全体の雛形を作成します。Hot Rod ServerをEmbeddedに起動するメソッド、RemoteCacheを使うためのメソッド、そしてProtocol BufferesのIDLを生成してRemoteCacheに登録するところまでを共通的に用意します。
src/test/scala/org/littlewings/infinispan/remotecq/RemoteContinuousQuerySpec.scala

package org.littlewings.infinispan.remotecq

import java.util.concurrent.TimeUnit

import org.infinispan.client.hotrod.configuration.ConfigurationBuilder
import org.infinispan.client.hotrod.marshall.ProtoStreamMarshaller
import org.infinispan.client.hotrod.{RemoteCache, RemoteCacheManager, Search}
import org.infinispan.manager.DefaultCacheManager
import org.infinispan.protostream.annotations.ProtoSchemaBuilder
import org.infinispan.query.dsl.Query
import org.infinispan.query.remote.client.ProtobufMetadataManagerConstants
import org.infinispan.server.hotrod.HotRodServer
import org.infinispan.server.hotrod.configuration.HotRodServerConfigurationBuilder
import org.scalatest.{FunSpec, Matchers}

class RemoteContinuousQuerySpec extends FunSpec with Matchers {
  describe("Remote Continuous Query Spec") {
    // ここに、テストを書く!
  }

  protected def registerProtocolBufIdl[K, V](cache: RemoteCache[K, V], clazz: Class[_]): Unit = {
    val manager = cache.getRemoteCacheManager

    val context = ProtoStreamMarshaller.getSerializationContext(manager)
    val protoSchemaBuilder = new ProtoSchemaBuilder
    val idl =
      protoSchemaBuilder
        .fileName(clazz.getName)
        .addClass(clazz)
        .build(context)

    val metaCache = manager.getCache[String, String](ProtobufMetadataManagerConstants.PROTOBUF_METADATA_CACHE_NAME)
    metaCache.put(clazz.getName + ".proto", idl)
  }

  protected def withRemoteCache[K, V](cacheName: String)(fun: RemoteCache[K, V] => Unit): Unit = {
    val manager = new RemoteCacheManager(
      new ConfigurationBuilder()
        .addServer()
        .host("localhost")
        .port(11222)
        .marshaller(new ProtoStreamMarshaller)
        .build
    )

    try {
      fun(manager.getCache[K, V](cacheName))
    } finally {
      manager.stop()
    }
  }

  protected def withRemoteCacheServer(fun: => Unit): Unit = {
    val embeddedCacheManager = new DefaultCacheManager("infinispan.xml")

    val host = "localhost"
    val port = 11222

    val hotRodServer = new HotRodServer

    try {
      hotRodServer
        .start(
          new HotRodServerConfigurationBuilder()
            .host(host)
            .port(port)
            .build,
          embeddedCacheManager)

      fun
    } finally {
      hotRodServer.stop
      embeddedCacheManager.stop()
    }
  }
}

Hot Rod Serverの起動は、本質的でないので端折ります。

RemoteCacheは、まずRemoteCacheManagerを作成する際に、ProtoStreamMarshallerをMarshallerとして登録しておく必要があります。

    val manager = new RemoteCacheManager(
      new ConfigurationBuilder()
        .addServer()
        .host("localhost")
        .port(11222)
        .marshaller(new ProtoStreamMarshaller)
        .build
    )

そして、EntityのアノテーションをもとにProtocol BuffersのIDLを生成し、RemoteCacheに登録するためのコードは以下の通りです。ここも、そういうものだくらいに流してもらえればいいかなと思います。

  protected def registerProtocolBufIdl[K, V](cache: RemoteCache[K, V], clazz: Class[_]): Unit = {
    val manager = cache.getRemoteCacheManager

    val context = ProtoStreamMarshaller.getSerializationContext(manager)
    val protoSchemaBuilder = new ProtoSchemaBuilder
    val idl =
      protoSchemaBuilder
        .fileName(clazz.getName)
        .addClass(clazz)
        .build(context)

    val metaCache = manager.getCache[String, String](ProtobufMetadataManagerConstants.PROTOBUF_METADATA_CACHE_NAME)
    metaCache.put(clazz.getName + ".proto", idl)
  }

ちなみに、このコードを使う時のInfinispanの設定は以下のようにしました(EmbeddedなHot Rod Serverが使います)。
src/test/resources/infinispan.xml

<?xml version="1.0" encoding="UTF-8"?>
<infinispan
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="urn:infinispan:config:8.2 http://www.infinispan.org/schemas/infinispan-config-8.2.xsd"
        xmlns="urn:infinispan:config:8.2">
    <cache-container default-cache="hotRodBasicCache">
        <jmx duplicate-domains="true"/>

        <local-cache name="hotRodBasicCache">
            <data-container key-equivalence="org.infinispan.commons.equivalence.AnyServerEquivalence"
                            value-equivalence="org.infinispan.commons.equivalence.AnyServerEquivalence"/>
        </local-cache>

        <local-cache name="namedCache"/>
    </cache-container>
</infinispan>

Continuous Queryに対するListenerを作る

では、Continous Queryで、Queryにマッチした時のイベントを受け取るListenerを作成します。

ContinuousQueryListenerというインターフェースを実装したクラスを作成しますが、このインターフェースはEmbeddedとHot Rod共通です。
src/test/scala/org/littlewings/infinispan/remotecq/BookContinousQueryListener.scala

package org.littlewings.infinispan.remotecq

import org.infinispan.query.api.continuous.ContinuousQueryListener

class BookContinousQueryListener extends ContinuousQueryListener[String, Book] {
  var joiningBooks: Vector[Book] = Vector.empty
  var leavingBooks: Vector[String] = Vector.empty

  override def resultJoining(key: String, value: Book): Unit = {
    joiningBooks = joiningBooks :+ value
  }

  override def resultLeaving(key: String): Unit =
    leavingBooks = leavingBooks :+ key
}

メソッドとしては、Queryにマッチするようになった時に呼び出されキーと値を受け取るresultJoiningと、Queryにマッチしたものがマッチしなくなった時に呼び出されるresultLeavingを実装することになります。

今回は、受け取ったキーや値を保存しておくようにしました(テストで確認します)。

使ってみる

それでは、テストを書きつつContinuous Queryを使ってみます。

追加

まずは追加時。

    it("add Entry") {
      withRemoteCacheServer {
        withRemoteCache[String, Book]("namedCache") { cache =>
          registerProtocolBufIdl(cache, classOf[Book])

          val books = Array(
            Book("978-1782169970", "Infinispan Data Grid Platform Definitive Guide", 4947),
            Book("978-1785285332", "Getting Started With Hazelcast - Second Edition", 3848),
            Book("978-1783988181", "Mastering Redis", 6172)
          )

          val continousQuery = Search.getContinuousQuery(cache)

          val query =
            Search
              .getQueryFactory(cache)
              .from(classOf[Book])
              .having("price")
              .gte(4000)
              .toBuilder[Query]
              .build

          val listener = new BookContinousQueryListener
          continousQuery.addContinuousQueryListener(query, listener)

          books.foreach(b => cache.put(b.isbn, b))

          TimeUnit.SECONDS.sleep(1L)

          listener.joiningBooks.map(_.isbn) should be(books.filter(_.price > 4000).map(_.isbn))
        }
      }
    }

最初の3行で、EmbeddedなHot Rod Serverの起動、RemoteCacheManagerおよびRemoteCacheの構築、Protocol BuffersのIDLの生成とRemoteCacheへの登録を行います。

Continuous Queryの登録は、SearchからのContinuousQueryの取得、Queryの構築とContinuousQueryに対するQueryとListenerの追加で完了します。

          val continousQuery = Search.getContinuousQuery(cache)

          val query =
            Search
              .getQueryFactory(cache)
              .from(classOf[Book])
              .having("price")
              .gte(4000)
              .toBuilder[Query]
              .build

          val listener = new BookContinousQueryListener
          continousQuery.addContinuousQueryListener(query, listener)

この例では、priceが4,000円以上のものが対象となります。

では、書籍を登録してみます。対象はこちら。

          val books = Array(
            Book("978-1782169970", "Infinispan Data Grid Platform Definitive Guide", 4947),
            Book("978-1785285332", "Getting Started With Hazelcast - Second Edition", 3848),
            Book("978-1783988181", "Mastering Redis", 6172)
          )

登録したら、Listenerに非同期に通知されるので、確認してみます。

          books.foreach(b => cache.put(b.isbn, b))

          TimeUnit.SECONDS.sleep(1L)

          listener.joiningBooks.map(_.isbn) should be(books.filter(_.price > 4000).map(_.isbn))

4000円以上の本だけが通知されていることが確認できました。

削除

削除。

    it("delete Entry") {
      withRemoteCacheServer {
        withRemoteCache[String, Book]("namedCache") { cache =>
          registerProtocolBufIdl(cache, classOf[Book])

          val books = Array(
            Book("978-1782169970", "Infinispan Data Grid Platform Definitive Guide", 4947),
            Book("978-1785285332", "Getting Started With Hazelcast - Second Edition", 3848),
            Book("978-1783988181", "Mastering Redis", 6172)
          )

          val continousQuery = Search.getContinuousQuery(cache)

          val query =
            Search
              .getQueryFactory(cache)
              .from(classOf[Book])
              .having("price")
              .gte(4000)
              .toBuilder[Query]
              .build

          val listener = new BookContinousQueryListener
          continousQuery.addContinuousQueryListener(query, listener)

          books.foreach(b => cache.put(b.isbn, b))
          books.foreach(b => cache.remove(b.isbn))

          TimeUnit.SECONDS.sleep(1L)

          listener.joiningBooks.map(_.isbn) should be(books.filter(_.price > 4000).map(_.isbn))
          listener.leavingBooks should be(books.filter(_.price > 4000).map(_.isbn))
        }
      }
    }

追加後に削除するわけですが、ここではもともとQueryにマッチしていた書籍が削除された場合に、Listenerに通知されます。

          listener.leavingBooks should be(books.filter(_.price > 4000).map(_.isbn))
Entityを更新してQueryにマッチさせるようにする

今度は、最初に登録していたEntityを更新して、Queryにマッチするように更新した場合の挙動を見てみましょう。

ソースコードは、こちら。

    it("join Entry") {
      withRemoteCacheServer {
        withRemoteCache[String, Book]("namedCache") { cache =>
          registerProtocolBufIdl(cache, classOf[Book])

          val book = Book("978-1782169970", "Infinispan Data Grid Platform Definitive Guide", 4947)

          val continousQuery = Search.getContinuousQuery(cache)

          val query =
            Search
              .getQueryFactory(cache)
              .from(classOf[Book])
              .having("price")
              .gte(5000)
              .toBuilder[Query]
              .build

          val listener = new BookContinousQueryListener
          continousQuery.addContinuousQueryListener(query, listener)

          cache.put(book.isbn, book)

          TimeUnit.SECONDS.sleep(1L)

          listener.joiningBooks should be(empty)

          val bookUpdated = Book("978-1782169970", "Infinispan Data Grid Platform Definitive Guide", 5100)

          cache.put(bookUpdated.isbn, bookUpdated)

          TimeUnit.SECONDS.sleep(1L)

          listener.joiningBooks.map(_.isbn) should be(Array(bookUpdated.isbn))
        }
      }
    }

今回は、書籍を1冊に絞って、Queryの条件を5,000円以上にしてみました。

          val book = Book("978-1782169970", "Infinispan Data Grid Platform Definitive Guide", 4947)

          val continousQuery = Search.getContinuousQuery(cache)

          val query =
            Search
              .getQueryFactory(cache)
              .from(classOf[Book])
              .having("price")
              .gte(5000)
              .toBuilder[Query]
              .build

この条件だと、最初の登録時にはQueryの条件を下回っているのでListenerには通知されませんが、

          cache.put(book.isbn, book)

          TimeUnit.SECONDS.sleep(1L)

          listener.joiningBooks should be(empty)

Queryにマッチするように更新されると、通知されるようになります。

          val bookUpdated = Book("978-1782169970", "Infinispan Data Grid Platform Definitive Guide", 5100)

          cache.put(bookUpdated.isbn, bookUpdated)

          TimeUnit.SECONDS.sleep(1L)

          listener.joiningBooks.map(_.isbn) should be(Array(bookUpdated.isbn))
Entityを更新してQueryにマッチさせないようにする

最後は、もともとQueryにマッチしていたEntityを、更新してQueryにマッチしないようにした場合の挙動を見てみましょう。

ソースコードは、こちら。

    it("leave Entry") {
      withRemoteCacheServer {
        withRemoteCache[String, Book]("namedCache") { cache =>
          registerProtocolBufIdl(cache, classOf[Book])

          val book = Book("978-1782169970", "Infinispan Data Grid Platform Definitive Guide", 4947)

          val continousQuery = Search.getContinuousQuery(cache)

          val query =
            Search
              .getQueryFactory(cache)
              .from(classOf[Book])
              .having("price")
              .gte(4000)
              .toBuilder[Query]
              .build

          val listener = new BookContinousQueryListener
          continousQuery.addContinuousQueryListener(query, listener)

          cache.put(book.isbn, book)

          TimeUnit.SECONDS.sleep(1L)

          listener.joiningBooks.map(_.isbn) should be(Array(book.isbn))

          val bookUpdated = Book("978-1782169970", "Infinispan Data Grid Platform Definitive Guide", 3900)

          cache.put(bookUpdated.isbn, bookUpdated)

          TimeUnit.SECONDS.sleep(1L)

          listener.leavingBooks should be(Array(bookUpdated.isbn))
        }
      }
    }

本は1冊で、Queryの条件は4,000円以上。

          val book = Book("978-1782169970", "Infinispan Data Grid Platform Definitive Guide", 4947)

          val continousQuery = Search.getContinuousQuery(cache)

          val query =
            Search
              .getQueryFactory(cache)
              .from(classOf[Book])
              .having("price")
              .gte(4000)
              .toBuilder[Query]
              .build

これを登録すると、joiningの方に通知されます。

          cache.put(book.isbn, book)

          TimeUnit.SECONDS.sleep(1L)

          listener.joiningBooks.map(_.isbn) should be(Array(book.isbn))

今度は、値段を下げて更新。

          val bookUpdated = Book("978-1782169970", "Infinispan Data Grid Platform Definitive Guide", 3900)

          cache.put(bookUpdated.isbn, bookUpdated)

          TimeUnit.SECONDS.sleep(1L)

          listener.leavingBooks should be(Array(bookUpdated.isbn))

leavingの方に通知されることがわかります。

だいたい、こんなところですね。

Continuous Query(over Hot Rod)の舞台裏

少し、Hot RodのContinuous Queryの裏を見てみました。

やっぱり、Queryを使うにはProtocol Buffersが必要なようでContinuous Queryを追加すると、裏で以下のようなFilterConverterが追加されるように動作します。
https://github.com/infinispan/infinispan/blob/8.2.4.Final/remote-query/remote-query-server/src/main/java/org/infinispan/query/remote/impl/filter/JPAContinuousQueryProtobufCacheEventFilterConverter.java

このFilterConverterはFactoryによって生成されるのですが、
https://github.com/infinispan/infinispan/blob/8.2.4.Final/remote-query/remote-query-server/src/main/java/org/infinispan/query/remote/impl/filter/JPAContinuousQueryProtobufCacheEventFilterConverterFactory.java

このFactoryは、Hot Rod Serverの起動時にService Providerの仕組みで登録されます。
https://github.com/infinispan/infinispan/blob/8.2.4.Final/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodServer.scala#L120

これで、Listenerに通知するイベントがフィルタリングされるというわけですね。

フィルタリングを行い、実際にListenerを呼び出すかどうかは、通常のListenerの仕組みによって行われます。
https://github.com/infinispan/infinispan/blob/8.2.4.Final/core/src/main/java/org/infinispan/notifications/cachelistener/CacheNotifierImpl.java#L1194

ところで、ユーザーはContinousQueryListenerインターフェースの実装を作成する必要がありますが、ContinuosQuery自体はInfinispanのListenerではありません。

ContinuousQueryListenerのインスタンスをContinuousQueryに登録した時に、裏でContinuous Queryが提供する(Hot Rodの)Listenerにラップされます。
https://github.com/infinispan/infinispan/blob/8.2.4.Final/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/event/ContinuousQueryImpl.java#L87

Hot RodのListenerを登録したという情報は、Server側へ送信されClientListenerRegistryによってEmbeddedなListenerに代替されてイベント受信を実現します。
https://github.com/infinispan/infinispan/blob/8.2.4.Final/server/hotrod/src/main/scala/org/infinispan/server/hotrod/ClientListenerRegistry.scala

あとは、EmbeddedなListenerが受信したイベントを介して、Hot RodのListenerにも通知されるという仕組みです。

最後の方は、Hot RodのListener自体の仕掛けでしたが、こういった感じで実現されているようです、と。

まとめ

Hot RodでのContinuous Queryを試してみました。

Protocol Buffersが必要なところは(個人的にはコロッと忘れて)ハマりましたが、そのくらいであとはそれほど困らずに使えました。

オフィシャルのInfinispan 8.0のドキュメントにはContinuous Queryについての記載はないのですが、Infinispan 9.0ではドキュメントが作られているので、こちらで見てみるとよいかもです。

Continuous Queryの裏を追っていく過程で、ClientListenerの仕組みなども見れて個人的には勉強になりました。

今回作成したソースコードは、こちらに置いています。
https://github.com/kazuhira-r/infinispan-getting-started/tree/master/remote-continuous-query