Infinispanに8.0.0.Finalから搭載されているContinuous Queryですが、Hot Rod(Client/Server Mode)でも使うことができます。
EmbeddedでContinuous Queryは試したことがありましたが、Hot Rodではまだ試していなかったので、今回使ってみることにしました。
Continuous Queryとは?
平たく言うとListenerなのですが、イベント通知時の条件をQueryとして登録しておき、そのQueryにマッチする/しないが切り替わる時にイベントを受信することができます。
※現在のInfinispanのContinous Queryでは、Queryにマッチし続けるような更新については検知しないのですが、9.0になると可能になりそうな感じです
準備と利用するための前提
まずは、ビルド定義から。
build.sbt
name := "remote-continuous-query" organization := "org.littlewings" version := "0.0.1-SNAPSHOT" scalaVersion := "2.11.8" updateOptions := updateOptions.value.withCachedResolution(true) scalacOptions ++= Seq("-Xlint", "-deprecation", "-unchecked", "-feature") parallelExecution in Test := false libraryDependencies ++= Seq( "org.infinispan" % "infinispan-client-hotrod" % "8.2.4.Final" % Compile, "org.infinispan" % "infinispan-query-dsl" % "8.2.4.Final" % Compile, "org.infinispan" % "infinispan-remote-query-client" % "8.2.4.Final" % Compile, "org.infinispan" % "infinispan-server-hotrod" % "8.2.4.Final" % Test, "org.infinispan" % "infinispan-remote-query-server" % "8.2.4.Final" % Test, "net.jcip" % "jcip-annotations" % "1.0" % Provided, "org.scalatest" %% "scalatest" % "3.0.0" % Test )
Hot RodでのContinuous Queryを使うのに必要なモジュールは、こちらです。
"org.infinispan" % "infinispan-client-hotrod" % "8.2.4.Final" % Compile, "org.infinispan" % "infinispan-query-dsl" % "8.2.4.Final" % Compile, "org.infinispan" % "infinispan-remote-query-client" % "8.2.4.Final" % Compile,
Server系のものも入っていますが、こちらはテスト時にHot Rod ServerをEmbeddedに起動するために使うので、Continuous Queryとはあまり関係ないです。なお、EmbeddedにHot Rod Serverを使う場合は、infinispan-remote-query-serverを入れておかないとContinuous Queryが使えません。
"org.infinispan" % "infinispan-server-hotrod" % "8.2.4.Final" % Test, "org.infinispan" % "infinispan-remote-query-server" % "8.2.4.Final" % Test,
また、Hot RodでContinuous Queryを使うための注意点として、Protocol Buffersの利用が前提となります。なので、登録するEntityクラスに対してのProtocol BuffersのIDLが必要です。
InfinispanにはEntityからアノテーションでProtocol BufferesのIDLを生成する機能があるので、今回はそちらを利用しました。
というか、Hot RodのQuery系でProtocol Buffersが必要なのいつもコロッと忘れていて、その度に踏んでいるのですが…ええ。
さて、あとはコードを書いてきましょう。
Entity
今回のお題は、書籍とします。@ProtoMessageとか@ProtoFieldとか付いているのが、Protocol BufferesのIDLを生成するためのInfinispanのアノテーションです。
src/test/scala/org/littlewings/infinispan/remotecq/Book.scala
package org.littlewings.infinispan.remotecq import org.infinispan.protostream.annotations.{ProtoField, ProtoMessage} import org.infinispan.protostream.descriptors.Type object Book { def apply(isbn: String, title: String, price: Int): Book = { val book = new Book book.isbn = isbn book.title = title book.price = price book } } @ProtoMessage(name = "Book") class Book extends Serializable { private[remotecq] var isbn: String = _ private[remotecq] var title: String = _ private[remotecq] var price: Int = _ @ProtoField(number = 1, name = "isbn", required = true, `type` = Type.STRING) def getIsbn: String = isbn def setIsbn(isbn: String): Unit = this.isbn = isbn @ProtoField(number = 2, name = "title", required = true, `type` = Type.STRING) def getTitle: String = title def setTitle(title: String): Unit = this.title = title @ProtoField(number = 3, name = "price", required = true, defaultValue = "0", `type` = Type.INT32) def getPrice: Int = price def setPrice(price: Int): Unit = this.price = price }
テストコードの雛形
テストコード全体の雛形を作成します。Hot Rod ServerをEmbeddedに起動するメソッド、RemoteCacheを使うためのメソッド、そしてProtocol BufferesのIDLを生成してRemoteCacheに登録するところまでを共通的に用意します。
src/test/scala/org/littlewings/infinispan/remotecq/RemoteContinuousQuerySpec.scala
package org.littlewings.infinispan.remotecq import java.util.concurrent.TimeUnit import org.infinispan.client.hotrod.configuration.ConfigurationBuilder import org.infinispan.client.hotrod.marshall.ProtoStreamMarshaller import org.infinispan.client.hotrod.{RemoteCache, RemoteCacheManager, Search} import org.infinispan.manager.DefaultCacheManager import org.infinispan.protostream.annotations.ProtoSchemaBuilder import org.infinispan.query.dsl.Query import org.infinispan.query.remote.client.ProtobufMetadataManagerConstants import org.infinispan.server.hotrod.HotRodServer import org.infinispan.server.hotrod.configuration.HotRodServerConfigurationBuilder import org.scalatest.{FunSpec, Matchers} class RemoteContinuousQuerySpec extends FunSpec with Matchers { describe("Remote Continuous Query Spec") { // ここに、テストを書く! } protected def registerProtocolBufIdl[K, V](cache: RemoteCache[K, V], clazz: Class[_]): Unit = { val manager = cache.getRemoteCacheManager val context = ProtoStreamMarshaller.getSerializationContext(manager) val protoSchemaBuilder = new ProtoSchemaBuilder val idl = protoSchemaBuilder .fileName(clazz.getName) .addClass(clazz) .build(context) val metaCache = manager.getCache[String, String](ProtobufMetadataManagerConstants.PROTOBUF_METADATA_CACHE_NAME) metaCache.put(clazz.getName + ".proto", idl) } protected def withRemoteCache[K, V](cacheName: String)(fun: RemoteCache[K, V] => Unit): Unit = { val manager = new RemoteCacheManager( new ConfigurationBuilder() .addServer() .host("localhost") .port(11222) .marshaller(new ProtoStreamMarshaller) .build ) try { fun(manager.getCache[K, V](cacheName)) } finally { manager.stop() } } protected def withRemoteCacheServer(fun: => Unit): Unit = { val embeddedCacheManager = new DefaultCacheManager("infinispan.xml") val host = "localhost" val port = 11222 val hotRodServer = new HotRodServer try { hotRodServer .start( new HotRodServerConfigurationBuilder() .host(host) .port(port) .build, embeddedCacheManager) fun } finally { hotRodServer.stop embeddedCacheManager.stop() } } }
Hot Rod Serverの起動は、本質的でないので端折ります。
RemoteCacheは、まずRemoteCacheManagerを作成する際に、ProtoStreamMarshallerをMarshallerとして登録しておく必要があります。
val manager = new RemoteCacheManager( new ConfigurationBuilder() .addServer() .host("localhost") .port(11222) .marshaller(new ProtoStreamMarshaller) .build )
そして、EntityのアノテーションをもとにProtocol BuffersのIDLを生成し、RemoteCacheに登録するためのコードは以下の通りです。ここも、そういうものだくらいに流してもらえればいいかなと思います。
protected def registerProtocolBufIdl[K, V](cache: RemoteCache[K, V], clazz: Class[_]): Unit = { val manager = cache.getRemoteCacheManager val context = ProtoStreamMarshaller.getSerializationContext(manager) val protoSchemaBuilder = new ProtoSchemaBuilder val idl = protoSchemaBuilder .fileName(clazz.getName) .addClass(clazz) .build(context) val metaCache = manager.getCache[String, String](ProtobufMetadataManagerConstants.PROTOBUF_METADATA_CACHE_NAME) metaCache.put(clazz.getName + ".proto", idl) }
ちなみに、このコードを使う時のInfinispanの設定は以下のようにしました(EmbeddedなHot Rod Serverが使います)。
src/test/resources/infinispan.xml
<?xml version="1.0" encoding="UTF-8"?> <infinispan xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:infinispan:config:8.2 http://www.infinispan.org/schemas/infinispan-config-8.2.xsd" xmlns="urn:infinispan:config:8.2"> <cache-container default-cache="hotRodBasicCache"> <jmx duplicate-domains="true"/> <local-cache name="hotRodBasicCache"> <data-container key-equivalence="org.infinispan.commons.equivalence.AnyServerEquivalence" value-equivalence="org.infinispan.commons.equivalence.AnyServerEquivalence"/> </local-cache> <local-cache name="namedCache"/> </cache-container> </infinispan>
Continuous Queryに対するListenerを作る
では、Continous Queryで、Queryにマッチした時のイベントを受け取るListenerを作成します。
ContinuousQueryListenerというインターフェースを実装したクラスを作成しますが、このインターフェースはEmbeddedとHot Rod共通です。
src/test/scala/org/littlewings/infinispan/remotecq/BookContinousQueryListener.scala
package org.littlewings.infinispan.remotecq import org.infinispan.query.api.continuous.ContinuousQueryListener class BookContinousQueryListener extends ContinuousQueryListener[String, Book] { var joiningBooks: Vector[Book] = Vector.empty var leavingBooks: Vector[String] = Vector.empty override def resultJoining(key: String, value: Book): Unit = { joiningBooks = joiningBooks :+ value } override def resultLeaving(key: String): Unit = leavingBooks = leavingBooks :+ key }
メソッドとしては、Queryにマッチするようになった時に呼び出されキーと値を受け取るresultJoiningと、Queryにマッチしたものがマッチしなくなった時に呼び出されるresultLeavingを実装することになります。
今回は、受け取ったキーや値を保存しておくようにしました(テストで確認します)。
使ってみる
それでは、テストを書きつつContinuous Queryを使ってみます。
追加
まずは追加時。
it("add Entry") { withRemoteCacheServer { withRemoteCache[String, Book]("namedCache") { cache => registerProtocolBufIdl(cache, classOf[Book]) val books = Array( Book("978-1782169970", "Infinispan Data Grid Platform Definitive Guide", 4947), Book("978-1785285332", "Getting Started With Hazelcast - Second Edition", 3848), Book("978-1783988181", "Mastering Redis", 6172) ) val continousQuery = Search.getContinuousQuery(cache) val query = Search .getQueryFactory(cache) .from(classOf[Book]) .having("price") .gte(4000) .toBuilder[Query] .build val listener = new BookContinousQueryListener continousQuery.addContinuousQueryListener(query, listener) books.foreach(b => cache.put(b.isbn, b)) TimeUnit.SECONDS.sleep(1L) listener.joiningBooks.map(_.isbn) should be(books.filter(_.price > 4000).map(_.isbn)) } } }
最初の3行で、EmbeddedなHot Rod Serverの起動、RemoteCacheManagerおよびRemoteCacheの構築、Protocol BuffersのIDLの生成とRemoteCacheへの登録を行います。
Continuous Queryの登録は、SearchからのContinuousQueryの取得、Queryの構築とContinuousQueryに対するQueryとListenerの追加で完了します。
val continousQuery = Search.getContinuousQuery(cache) val query = Search .getQueryFactory(cache) .from(classOf[Book]) .having("price") .gte(4000) .toBuilder[Query] .build val listener = new BookContinousQueryListener continousQuery.addContinuousQueryListener(query, listener)
この例では、priceが4,000円以上のものが対象となります。
では、書籍を登録してみます。対象はこちら。
val books = Array( Book("978-1782169970", "Infinispan Data Grid Platform Definitive Guide", 4947), Book("978-1785285332", "Getting Started With Hazelcast - Second Edition", 3848), Book("978-1783988181", "Mastering Redis", 6172) )
登録したら、Listenerに非同期に通知されるので、確認してみます。
books.foreach(b => cache.put(b.isbn, b)) TimeUnit.SECONDS.sleep(1L) listener.joiningBooks.map(_.isbn) should be(books.filter(_.price > 4000).map(_.isbn))
4000円以上の本だけが通知されていることが確認できました。
削除
削除。
it("delete Entry") { withRemoteCacheServer { withRemoteCache[String, Book]("namedCache") { cache => registerProtocolBufIdl(cache, classOf[Book]) val books = Array( Book("978-1782169970", "Infinispan Data Grid Platform Definitive Guide", 4947), Book("978-1785285332", "Getting Started With Hazelcast - Second Edition", 3848), Book("978-1783988181", "Mastering Redis", 6172) ) val continousQuery = Search.getContinuousQuery(cache) val query = Search .getQueryFactory(cache) .from(classOf[Book]) .having("price") .gte(4000) .toBuilder[Query] .build val listener = new BookContinousQueryListener continousQuery.addContinuousQueryListener(query, listener) books.foreach(b => cache.put(b.isbn, b)) books.foreach(b => cache.remove(b.isbn)) TimeUnit.SECONDS.sleep(1L) listener.joiningBooks.map(_.isbn) should be(books.filter(_.price > 4000).map(_.isbn)) listener.leavingBooks should be(books.filter(_.price > 4000).map(_.isbn)) } } }
追加後に削除するわけですが、ここではもともとQueryにマッチしていた書籍が削除された場合に、Listenerに通知されます。
listener.leavingBooks should be(books.filter(_.price > 4000).map(_.isbn))
Entityを更新してQueryにマッチさせるようにする
今度は、最初に登録していたEntityを更新して、Queryにマッチするように更新した場合の挙動を見てみましょう。
ソースコードは、こちら。
it("join Entry") { withRemoteCacheServer { withRemoteCache[String, Book]("namedCache") { cache => registerProtocolBufIdl(cache, classOf[Book]) val book = Book("978-1782169970", "Infinispan Data Grid Platform Definitive Guide", 4947) val continousQuery = Search.getContinuousQuery(cache) val query = Search .getQueryFactory(cache) .from(classOf[Book]) .having("price") .gte(5000) .toBuilder[Query] .build val listener = new BookContinousQueryListener continousQuery.addContinuousQueryListener(query, listener) cache.put(book.isbn, book) TimeUnit.SECONDS.sleep(1L) listener.joiningBooks should be(empty) val bookUpdated = Book("978-1782169970", "Infinispan Data Grid Platform Definitive Guide", 5100) cache.put(bookUpdated.isbn, bookUpdated) TimeUnit.SECONDS.sleep(1L) listener.joiningBooks.map(_.isbn) should be(Array(bookUpdated.isbn)) } } }
今回は、書籍を1冊に絞って、Queryの条件を5,000円以上にしてみました。
val book = Book("978-1782169970", "Infinispan Data Grid Platform Definitive Guide", 4947) val continousQuery = Search.getContinuousQuery(cache) val query = Search .getQueryFactory(cache) .from(classOf[Book]) .having("price") .gte(5000) .toBuilder[Query] .build
この条件だと、最初の登録時にはQueryの条件を下回っているのでListenerには通知されませんが、
cache.put(book.isbn, book)
TimeUnit.SECONDS.sleep(1L)
listener.joiningBooks should be(empty)
Queryにマッチするように更新されると、通知されるようになります。
val bookUpdated = Book("978-1782169970", "Infinispan Data Grid Platform Definitive Guide", 5100) cache.put(bookUpdated.isbn, bookUpdated) TimeUnit.SECONDS.sleep(1L) listener.joiningBooks.map(_.isbn) should be(Array(bookUpdated.isbn))
Entityを更新してQueryにマッチさせないようにする
最後は、もともとQueryにマッチしていたEntityを、更新してQueryにマッチしないようにした場合の挙動を見てみましょう。
ソースコードは、こちら。
it("leave Entry") { withRemoteCacheServer { withRemoteCache[String, Book]("namedCache") { cache => registerProtocolBufIdl(cache, classOf[Book]) val book = Book("978-1782169970", "Infinispan Data Grid Platform Definitive Guide", 4947) val continousQuery = Search.getContinuousQuery(cache) val query = Search .getQueryFactory(cache) .from(classOf[Book]) .having("price") .gte(4000) .toBuilder[Query] .build val listener = new BookContinousQueryListener continousQuery.addContinuousQueryListener(query, listener) cache.put(book.isbn, book) TimeUnit.SECONDS.sleep(1L) listener.joiningBooks.map(_.isbn) should be(Array(book.isbn)) val bookUpdated = Book("978-1782169970", "Infinispan Data Grid Platform Definitive Guide", 3900) cache.put(bookUpdated.isbn, bookUpdated) TimeUnit.SECONDS.sleep(1L) listener.leavingBooks should be(Array(bookUpdated.isbn)) } } }
本は1冊で、Queryの条件は4,000円以上。
val book = Book("978-1782169970", "Infinispan Data Grid Platform Definitive Guide", 4947) val continousQuery = Search.getContinuousQuery(cache) val query = Search .getQueryFactory(cache) .from(classOf[Book]) .having("price") .gte(4000) .toBuilder[Query] .build
これを登録すると、joiningの方に通知されます。
cache.put(book.isbn, book) TimeUnit.SECONDS.sleep(1L) listener.joiningBooks.map(_.isbn) should be(Array(book.isbn))
今度は、値段を下げて更新。
val bookUpdated = Book("978-1782169970", "Infinispan Data Grid Platform Definitive Guide", 3900) cache.put(bookUpdated.isbn, bookUpdated) TimeUnit.SECONDS.sleep(1L) listener.leavingBooks should be(Array(bookUpdated.isbn))
leavingの方に通知されることがわかります。
だいたい、こんなところですね。
Continuous Query(over Hot Rod)の舞台裏
少し、Hot RodのContinuous Queryの裏を見てみました。
やっぱり、Queryを使うにはProtocol Buffersが必要なようでContinuous Queryを追加すると、裏で以下のようなFilterConverterが追加されるように動作します。
https://github.com/infinispan/infinispan/blob/8.2.4.Final/remote-query/remote-query-server/src/main/java/org/infinispan/query/remote/impl/filter/JPAContinuousQueryProtobufCacheEventFilterConverter.java
このFilterConverterはFactoryによって生成されるのですが、
https://github.com/infinispan/infinispan/blob/8.2.4.Final/remote-query/remote-query-server/src/main/java/org/infinispan/query/remote/impl/filter/JPAContinuousQueryProtobufCacheEventFilterConverterFactory.java
このFactoryは、Hot Rod Serverの起動時にService Providerの仕組みで登録されます。
https://github.com/infinispan/infinispan/blob/8.2.4.Final/server/hotrod/src/main/scala/org/infinispan/server/hotrod/HotRodServer.scala#L120
これで、Listenerに通知するイベントがフィルタリングされるというわけですね。
フィルタリングを行い、実際にListenerを呼び出すかどうかは、通常のListenerの仕組みによって行われます。
https://github.com/infinispan/infinispan/blob/8.2.4.Final/core/src/main/java/org/infinispan/notifications/cachelistener/CacheNotifierImpl.java#L1194
ところで、ユーザーはContinousQueryListenerインターフェースの実装を作成する必要がありますが、ContinuosQuery自体はInfinispanのListenerではありません。
ContinuousQueryListenerのインスタンスをContinuousQueryに登録した時に、裏でContinuous Queryが提供する(Hot Rodの)Listenerにラップされます。
https://github.com/infinispan/infinispan/blob/8.2.4.Final/client/hotrod-client/src/main/java/org/infinispan/client/hotrod/event/ContinuousQueryImpl.java#L87
Hot RodのListenerを登録したという情報は、Server側へ送信されClientListenerRegistryによってEmbeddedなListenerに代替されてイベント受信を実現します。
https://github.com/infinispan/infinispan/blob/8.2.4.Final/server/hotrod/src/main/scala/org/infinispan/server/hotrod/ClientListenerRegistry.scala
あとは、EmbeddedなListenerが受信したイベントを介して、Hot RodのListenerにも通知されるという仕組みです。
最後の方は、Hot RodのListener自体の仕掛けでしたが、こういった感じで実現されているようです、と。
まとめ
Hot RodでのContinuous Queryを試してみました。
Protocol Buffersが必要なところは(個人的にはコロッと忘れて)ハマりましたが、そのくらいであとはそれほど困らずに使えました。
オフィシャルのInfinispan 8.0のドキュメントにはContinuous Queryについての記載はないのですが、Infinispan 9.0ではドキュメントが作られているので、こちらで見てみるとよいかもです。
Continuous Queryの裏を追っていく過程で、ClientListenerの仕組みなども見れて個人的には勉強になりました。
今回作成したソースコードは、こちらに置いています。
https://github.com/kazuhira-r/infinispan-getting-started/tree/master/remote-continuous-query