CLOVER🍀

That was when it all began.

UpdateをサポートしたInfinispanのContinuous Queryを試す

Infinispan 8から搭載された機能として、Continuous Queryがあります。これ自体は、前に試していました。

InfinispanのContinuous Queryを試す - CLOVER

ただこの時はJoin/Leaveはサポートしていたものの、Updateは無視されるようになっていました。Infinispan 9から
Updateも拾うようになっているので、改めて試してみたいと思います。

ドキュメントは、こちらです。

Continuous Query

Continuous Queryとは?

Continuous Queryとは、先ほどに載せたエントリにも書いていますが、早い話がイベントの通知条件をQueryで
絞り込めるLisnterです。

Infinispanの内部的には、Continuous Queryを使うと内部的にCluster Listenerを生成しCacheに登録するのですが、
その際にInfinispanのFilterの仕組みでイベントが発生したエントリに対してフィルタリングを行い、条件に合致
した場合は通知を行います。

この仕組みを抽象化したのが、Continuous Queryです。

お題

で、今回のお題は書籍をテーマに、次のようなQueryでイベントを絞り込むことを考えます。
※都合上、その他の条件をもろもろ付けますが、基本は価格で絞り込むと見てもらえると…

select ... from book b where b.price >= 3000 and b.price <= 5000 (and more...)

要するに、価格が3,000円から5,000円の間に入るエントリの登録、更新、またこの範囲から出て行くような更新、削除を
すると、Continuous QueryのListenerに対してイベントが通知されます。

今回は、これでいってみましょう。

そもそものContinuous Queryの発火条件

ドキュメントに記載があります。

1. If the query on both the old and new values evaluate false, then the event is suppressed.
2. If the query on the old value evaluates false and on the new value evaluates true, then a Join event is sent.
3. If the query on both the old and new values evaluate true, then an Update event is sent.
4. If the query on the old value evaluates true and on the new value evaluates false, then a Leave event is sent.
5. If the query on the old value evaluates true and the entry is removed or expired, then a Leave event is sent.

http://infinispan.org/docs/9.0.x/user_guide/user_guide.html#continuous_query_execution

Queryが真となる条件にエントリが初めてなった時が「Join」、Join後に更新して、その結果も真となる場合は「Update」、
そしてJoin後に更新によりQueryが偽となる場合や削除される場合、そして有効期限切れした場合は「Leave」
となります。

こちらは押さえておきましょう。

ところで、そもそもなんで「Update」はこれまで対象外にしていたのに、ここでできるようにしたんでしょう…?デザイン時には、
「それはaggregationでやることだ」って書いてたはずなのに…?

infinispan-designs/Continuous-query-design-and-indexless-queries.asciidoc at master · infinispan/infinispan-designs · GitHub

まあ、個人的にはできた方がいいと思うので、いいんですけどね…。

準備

前置きが長くなりました。では、準備を進めていきましょう。

sbt上の依存関係は、以下とします。

libraryDependencies ++= Seq(
  "org.infinispan" % "infinispan-query" % "9.0.0.Final" % Compile,
  "net.jcip" % "jcip-annotations" % "1.0" % Provided,
  "org.scalatest" %% "scalatest" % "3.0.1" % Test
)

Continuous Queryを使うには、「infinispan-query」があればOKです。あとはテスト用にScalaTest、Scalaを使っている
都合上jcip-annotationsを使用しています。

Infinispanの設定は、次のとおりとします。
src/test/resources/infinispan.xml

<?xml version="1.0" encoding="UTF-8"?>
<infinispan
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="urn:infinispan:config:9.0 http://www.infinispan.org/schemas/infinispan-config-9.0.xsd"
        xmlns="urn:infinispan:config:9.0">
    <jgroups>
        <stack-file name="udp" path="default-configs/default-jgroups-udp.xml"/>
    </jgroups>
    <cache-container>
        <jmx duplicate-domains="true"/>
        <transport cluster="test-cluster" stack="udp"/>

        <distributed-cache name="bookCache"/>

        <distributed-cache name="indexedBookCache">
            <indexing index="LOCAL" auto-config="true">
                <indexed-entities>
                    <indexed-entity>org.littlewings.infinispan.continuousquery.IndexedBook</indexed-entity>
                </indexed-entities>
                <property name="lucene_version">LUCENE_CURRENT</property>
            </indexing>
        </distributed-cache>
    </cache-container>
</infinispan>

インデックスを有効にしたCacheとそうでないCacheを、それぞれDistributed Cacheで定義。インデックスを有効にした方のCacheに定義してある
Entityクラスについては、後述します。

テストコードの雛形は、こんな感じで。
src/test/scala/org/littlewings/infinispan/continuousquery/ContinuousQuerySpec.scala

package org.littlewings.infinispan.continuousquery

import org.infinispan.Cache
import org.infinispan.manager.DefaultCacheManager
import org.infinispan.objectfilter.ParsingException
import org.infinispan.query.Search
import org.scalatest.{FunSuite, Matchers}

class ContinuousQuerySpec extends FunSuite with Matchers {
  // ここに、テストコードを書く
  def withCache[K, V](cacheName: String, numInstances: Int = 1)(fun: Cache[K, V] => Unit): Unit = {
    val managers = (1 to numInstances).map(_ => new DefaultCacheManager("infinispan.xml"))
    managers.foreach(_.getCache[K, V](cacheName))

    try {
      val cache = managers(0).getCache[K, V](cacheName)
      fun(cache)
      cache.stop()
    } finally {
      managers.foreach(_.stop())
    }
  }
}

簡易的にクラスタを構成するための、簡易メソッド付き。

Continuous Queryを使う

それでは、Continuous Queryを使っていきます。

まずはCacheに格納するためのEntityがないと話が始まらないので、こちらを作成。
src/main/scala/org/littlewings/infinispan/continuousquery/Book.scala

package org.littlewings.infinispan.continuousquery

import scala.beans.BeanProperty

object Book {
  def apply(isbn: String, title: String, price: Int): Book = {
    val book = new Book
    book.isbn = isbn
    book.title = title
    book.price = price
    book
  }
}

@SerialVersionUID(1L)
class Book extends Serializable {
  @BeanProperty
  var isbn: String = _

  @BeanProperty
  var title: String = _

  @BeanProperty
  var price: Int = _

  def newBookChangePrice(newPrice: Int): Book = {
    val book = new Book
    book.isbn = isbn
    book.title = title
    book.price = newPrice
    book
  }
}

なんの変哲もないSerializableなクラスですが、あとで価格をコロコロと変えるので、そこだけ変更した新しいインスタンスを返すための
メソッドを用意しています。

続いて、Continuous Queryを使った際にイベントを受け取るクラスを作成します。ContinuousQueryListenerインターフェースを実装し、受け取りたい
イベントに応じたメソッド(result〜)をオーバーライドします。

デフォルトでは、どのメソッドもなにもしないように実装されています。
https://github.com/infinispan/infinispan/blob/9.0.0.Final/query-dsl/src/main/java/org/infinispan/query/api/continuous/ContinuousQueryListener.java

今回は、イベントを呼び出された回数をカウントしつつ、コンソールに出力する実装にしました。
src/main/scala/org/littlewings/infinispan/continuousquery/MyContinuousQueryListener.scala

package org.littlewings.infinispan.continuousquery

import org.infinispan.query.api.continuous.ContinuousQueryListener

class MyContinuousQueryListener extends ContinuousQueryListener[String, Book] {
  protected[continuousquery] var joined: Int = 0
  protected[continuousquery] var updated: Int = 0
  protected[continuousquery] var leaved: Int = 0

  override def resultJoining(key: String, value: Book): Unit = {
    synchronized(joined += 1)
    println(s"joined, key = ${key}, value = ${value.title}:${value.price}")
  }

  override def resultUpdated(key: String, value: Book): Unit = {
    synchronized(updated += 1)
    println(s"update, key = ${key}, value = ${value.title}:${value.price}")
  }

  override def resultLeaving(key: String): Unit = {
    synchronized(leaved += 1)
    println(s"leaved, key = ${key}")
  }

  def allCount: (Int, Int, Int) = (joined, updated, leaved)
}

ContinuousQueryListenerはパラメーター化されているので、キーの型とQueryの結果に応じた型を指定します。ここでは、StringとBookとします。

class MyContinuousQueryListener extends ContinuousQueryListener[String, Book] {

イベントごとに呼び出された回数を、一括で取得するメソッドも用意しています。テストコードでは、こちらで確認します。

では、テストコードで確認していきましょう。

  test("continuous query, simple") {
    withCache[String, Book]("bookCache", 3) { cache =>
      val continuousQuery = Search.getContinuousQuery(cache)
      val continuousQueryListener = new MyContinuousQueryListener

      val query =
        Search
          .getQueryFactory(cache)
          .create(
            s"""|from ${classOf[Book].getName} b
                |where b.title like '%Infinispan%'
                |and b.price >= :priceLower
                |and b.price <= :priceUpper""".stripMargin)
      query.setParameter("priceLower", 3000)
      query.setParameter("priceUpper", 5000)

      continuousQuery.addContinuousQueryListener(query, continuousQueryListener)

      val book = Book("978-1782169970", "Infinispan Data Grid Platform Definitive Guide", 2000)

      // あとで
    }
  }

Search#getContinuousQueryで、ContinuousQueryのインスタンスを取得できます。また、先ほど作成したListenerはインスタンス化しておきます。

      val continuousQuery = Search.getContinuousQuery(cache)
      val continuousQueryListener = new MyContinuousQueryListener

で、Continuous Queryを使うにはQueryが必要になるのですが、Queryを登録する方法はいくつかあります。

まずは、Queryを先に作ってContinuous Queryに登録する方法をとってみます。

      val query =
        Search
          .getQueryFactory(cache)
          .create(
            s"""|from ${classOf[Book].getName} b
                |where b.title like '%Infinispan%'
                |and b.price >= :priceLower
                |and b.price <= :priceUpper""".stripMargin)
      query.setParameter("priceLower", 3000)
      query.setParameter("priceUpper", 5000)

      continuousQuery.addContinuousQueryListener(query, continuousQueryListener)

Query DSL APIのQueryが得られればいいので、今回はIckle Queryを使用しました。このQueryを、先ほど作成したListenerとともに
Continuous Queryに登録します。
※Queryに、しれっとタイトルだけを絞り込む条件も追加しています…

あとはデータを登録していきます。

      val book = Book("978-1782169970", "Infinispan Data Grid Platform Definitive Guide", 2000)

まずは、価格が2,000円のエントリなので、Cacheに登録してもなにも起こりません。

      // none
      cache.put(book.isbn, book)
      continuousQueryListener.allCount should be((0, 0, 0))

価格を3,000円に上げて登録すると、resultJoininingが呼び出されることになります。

      // join1
      val book3000 = book.newBookChangePrice(3000)
      cache.put(book3000.isbn, book3000)
      continuousQueryListener.allCount should be((1, 0, 0))

Listenerからのコンソールの出力結果。

joined, key = 978-1782169970, value = Infinispan Data Grid Platform Definitive Guide:3000

3,000円から5,000円の範囲にいる間での更新をすると、resultUpdatedが呼び出されます。

      // update
      val book4500 = book3000.newBookChangePrice(4500)
      cache.put(book4500.isbn, book4500)
      continuousQueryListener.allCount should be((1, 1, 0))

Listener側。

update, key = 978-1782169970, value = Infinispan Data Grid Platform Definitive Guide:4500

3,000円から5,000円の範囲をはみ出るような更新の仕方をすると、resultLeavingが呼び出されます。

      // leave1
      val book5001 = book4500.newBookChangePrice(5001)
      cache.put(book5001.isbn, book5001)
      continuousQueryListener.allCount should be((1, 1, 1))

Listener側。

leaved, key = 978-1782169970

また、いったんエントリを削除して

      cache.remove(book.isbn)
      continuousQueryListener.allCount should be((1, 1, 1))

いきなりQueryの条件が真になるエントリを放り込むとresultJoiningが呼び出されますし、removeすればresultLeavingが呼び出されます。

      // join2
      cache.put(book3000.isbn, book3000)
      continuousQueryListener.allCount should be((2, 1, 1))

      // leave2
      cache.remove(book3000.isbn)
      continuousQueryListener.allCount should be((2, 1, 2))

Listener側。

joined, key = 978-1782169970, value = Infinispan Data Grid Platform Definitive Guide:3000
leaved, key = 978-1782169970

で、さっきから価格のみ見ていましたが、いちおうタイトルの条件も効いていることを確認するために、Queryの価格の条件にはマッチするものの、
タイトルではマッチしない書籍のデータも入れてみます。

      val ignoredBook = Book("978-1785285332", "Getting Started With Hazelcast", 3904)
      cache.put(ignoredBook.isbn, ignoredBook)
      continuousQueryListener.allCount should be((2, 1, 2))

カウントが変化しませんでしたね。イベントが発火していません。

OKそうですね。

なお、Listenerといえば、登録したNodeでしかイベントを受け取れないと思いきや、Continuous Queryで使われているListenerの実体は
Cluster Listenerとなっていて、どのNodeで発生したイベントでもQueryの条件にマッチさえすれば受け取れるようになっています。
https://github.com/infinispan/infinispan/blob/9.0.0.Final/query/src/main/java/org/infinispan/query/continuous/impl/ContinuousQueryImpl.java#L90-L91

Projectionも試す

Continous Queryのドキュメントを読んでいると、パフォーマンス上の注意事項が書かれてあるのですが、メモリの節約のためにできれば取得する
プロパティは絞った方がいいよ、みたいなことが書かれています。

Notes on performance of Continuous Queries

要するに、Projectionが使えそうな感じなので、こちらも確認してみます。

Projectionを使った場合は、Queryの結果がObjectの配列になります。なので、Listenerはこのようにしました。
src/main/scala/org/littlewings/infinispan/continuousquery/MyProjectionSupportContinuousQueryListener.scala

package org.littlewings.infinispan.continuousquery

import org.infinispan.query.api.continuous.ContinuousQueryListener

class MyProjectionSupportContinuousQueryListener extends ContinuousQueryListener[String, Array[AnyRef]] {
  protected[continuousquery] var joined: Int = 0
  protected[continuousquery] var updated: Int = 0
  protected[continuousquery] var leaved: Int = 0

  override def resultJoining(key: String, value: Array[AnyRef]): Unit = {
    synchronized(joined += 1)
    println(s"joined, key = ${key}, value = ${value.mkString(",")}")
  }

  override def resultUpdated(key: String, value: Array[AnyRef]): Unit = {
    synchronized(updated += 1)
    println(s"update, key = ${key}, value = ${value.mkString(",")}")
  }

  override def resultLeaving(key: String): Unit = {
    synchronized(leaved += 1)
    println(s"leaved, key = ${key}")
  }

  def allCount: (Int, Int, Int) = (joined, updated, leaved)
}

使い方自体は、Projectionを使おうが使うまいがそう変わりません。

  test("continuous query, projection") {
    withCache[String, Book]("bookCache", 3) { cache =>
      val continuousQuery = Search.getContinuousQuery(cache)
      val continuousQueryListener = new MyProjectionSupportContinuousQueryListener

      val queryParameters = new java.util.HashMap[String, AnyRef]
      queryParameters.put("priceLower", Integer.valueOf(3000))
      queryParameters.put("priceUpper", Integer.valueOf(5000))

      continuousQuery
        .addContinuousQueryListener(
          s"""|select b.title, b.price
              |from ${classOf[Book].getName} b
              |where b.price >= :priceLower
              |and b.price <= :priceUpper""".stripMargin,
          queryParameters,
          continuousQueryListener)

      val book = Book("978-1782169970", "Infinispan Data Grid Platform Definitive Guide", 2000)

      // none
      cache.put(book.isbn, book)
      continuousQueryListener.allCount should be((0, 0, 0))

      // join
      val book3000 = book.newBookChangePrice(3000)
      cache.put(book3000.isbn, book3000)
      continuousQueryListener.allCount should be((1, 0, 0))

      // remove
      cache.remove(book3000.isbn)
      continuousQueryListener.allCount should be((1, 0, 1))
    }
  }

Query自体は、select句を書くことになります。

          s"""|select b.title, b.price
              |from ${classOf[Book].getName} b
              |where b.price >= :priceLower
              |and b.price <= :priceUpper""".stripMargin,

また、先ほどの例ではQueryを構築後にContinuous Queryに登録していましたが、今回はQueryを文字列としてバインドするパラメーターも
合わせてContinuousQueryに渡すAPIを使用してみました。
※バインドするパラメーターが不要な場合は、QueryStringとContinuousQueryListenerだけを渡せるメソッドもあります

      val queryParameters = new java.util.HashMap[String, AnyRef]
      queryParameters.put("priceLower", Integer.valueOf(3000))
      queryParameters.put("priceUpper", Integer.valueOf(5000))

      continuousQuery
        .addContinuousQueryListener(
          s"""|select b.title, b.price
              |from ${classOf[Book].getName} b
              |where b.price >= :priceLower
              |and b.price <= :priceUpper""".stripMargin,
          queryParameters,
          continuousQueryListener)

このAPIの場合は、ContinuousQuery側でパースが行われます。

今回は、登録と削除を行ったので、コンソール上の出力はこんな感じになります。

joined, key = 978-1782169970, value = Infinispan Data Grid Platform Definitive Guide,3000
leaved, key = 978-1782169970

Continuous QueryとFull Text Index

ここまで、Continuous Queryに対して、特にインデックス設定を行っていないCacheおよびEntityに対して実行してきました。

では、インデックスを有効にしたEntity、Cacheの場合はどうなるか?ですが、使えるとも言えますし、使えないとも言えます。

まあ、言ってしまうと全文検索での演算子を使った条件は使用できません、ということになります。

たとえばこんなEntityと
src/main/scala/org/littlewings/infinispan/continuousquery/IndexedBook.scala

package org.littlewings.infinispan.continuousquery

import org.apache.lucene.analysis.standard.StandardAnalyzer
import org.hibernate.search.annotations._

import scala.beans.BeanProperty

object IndexedBook {
  def apply(isbn: String, title: String, price: Int): IndexedBook = {
    val book = new IndexedBook
    book.isbn = isbn
    book.title = title
    book.price = price
    book
  }
}

@Indexed
@Analyzer(impl = classOf[StandardAnalyzer])
@SerialVersionUID(1L)
class IndexedBook extends Serializable {
  @Field(analyze = Analyze.NO)
  @BeanProperty
  var isbn: String = _

  @Field
  @BeanProperty
  var title: String = _

  @Field
  @BeanProperty
  var price: Int = _

  def newBookChangePrice(newPrice: Int): IndexedBook = {
    val book = new IndexedBook
    book.isbn = isbn
    book.title = title
    book.price = newPrice
    book
  }
}

対応するContinuousQueryListenerを用意して
src/main/scala/org/littlewings/infinispan/continuousquery/MyIndexedContinuousQueryListener.scala

package org.littlewings.infinispan.continuousquery

import org.infinispan.query.api.continuous.ContinuousQueryListener

class MyIndexedContinuousQueryListener extends ContinuousQueryListener[String, IndexedBook] {
  protected[continuousquery] var joined: Int = 0
  protected[continuousquery] var updated: Int = 0
  protected[continuousquery] var leaved: Int = 0

  override def resultJoining(key: String, value: IndexedBook): Unit = {
    synchronized(joined += 1)
    println(s"joined, key = ${key}, value = ${value.title}:${value.price}")
  }

  override def resultUpdated(key: String, value: IndexedBook): Unit = {
    synchronized(updated += 1)
    println(s"update, key = ${key}, value = ${value.title}:${value.price}")
  }

  override def resultLeaving(key: String): Unit = {
    synchronized(leaved += 1)
    println(s"leaved, key = ${key}")
  }

  def allCount: (Int, Int, Int) = (joined, updated, leaved)
}

全文検索演算子をQueryに含めると、例外がスローされます。
※Cacheはインデックスを有効にしたものを使っています

  test("continuous query, unsupported full-text query") {
    withCache[String, IndexedBook]("indexedBookCache", 3) { cache =>
      val continuousQuery = Search.getContinuousQuery(cache)
      val continuousQueryListener = new MyIndexedContinuousQueryListener

      val query =
        Search
          .getQueryFactory(cache)
          .create(
            s"""|from ${classOf[IndexedBook].getName} b
                |where b.title: 'infinispan'""".stripMargin)

      val thrown = the[ParsingException] thrownBy continuousQuery.addContinuousQueryListener(query, continuousQueryListener)
      thrown.getMessage should be("ISPN028523: Filters cannot use full-text searches")
    }
  }

全文検索演算子さえ含めなければ、ふつうに使うことができます。

  test("continuous query, indexed") {
    withCache[String, IndexedBook]("indexedBookCache", 3) { cache =>
      val continuousQuery = Search.getContinuousQuery(cache)
      val continuousQueryListener = new MyIndexedContinuousQueryListener

      val query =
        Search
          .getQueryFactory(cache)
          .create(
            s"""|from ${classOf[IndexedBook].getName} b
                |where b.price >= :priceLower
                |and b.price <= :priceUpper""".stripMargin)
      query.setParameter("priceLower", 3000)
      query.setParameter("priceUpper", 5000)

      continuousQuery.addContinuousQueryListener(query, continuousQueryListener)

      val book = IndexedBook("978-1782169970", "Infinispan Data Grid Platform Definitive Guide", 2000)

      // none
      cache.put(book.isbn, book)
      continuousQueryListener.allCount should be((0, 0, 0))

      // join1
      val book3000 = book.newBookChangePrice(3000)
      cache.put(book3000.isbn, book3000)
      continuousQueryListener.allCount should be((1, 0, 0))

      // update
      val book4500 = book3000.newBookChangePrice(4500)
      cache.put(book4500.isbn, book4500)
      continuousQueryListener.allCount should be((1, 1, 0))

      // インデックスなしの場合と同じなので、以降省略
  }
}

省略しますが、Projectionも使えます。

で、全文検索演算子が含まれているかどうかは、ContinuousQueryListener登録時にチェックされているわけですが
https://github.com/infinispan/infinispan/blob/9.0.0.Final/object-filter/src/main/java/org/infinispan/objectfilter/impl/BaseMatcher.java#L195-L196

これを見るとGrouping/Aggregationが入っていてもNGそうですね。まあ、そりゃあそうだという気はしますが。

そもそも、Continuous QueryってCacheからあらためて検索するのではなくイベントのフィルタリングを行っているので、Continuous Queryでの
処理は、リフレクションベースになります。

Analyzeなものとは合わないので、インデックスを有効にしたEntityやCacheを使ってさらにContinuous Queryを使う場合には注意しましょうねと。

まとめ

InfinispanのContinous QueryがUpdateをサポートしたので、改めて試してみました。特に違和感なく使えましたが、気になるところも
あらためて確認できて勉強になりました。

今回作成したソースコードは、こちらに置いています。
https://github.com/kazuhira-r/infinispan-getting-started/tree/master/embedded-continuous-query-update-support