Infinispan 7.0から、Listenerにclusteredというものが設定できるようになっているのですが、これまでちょっと飛ばしていたのでここらで試してみることにしました。
InfinispanのListenerはCacheとCacheManagerに対して使うことができますが、このうちCacheに対するListenerが今回の話の対象です。
Cluster Listernerとは?
簡単に言うと、どのNodeでイベントが発生しても通知を受け取ることができるListenerです。以下のような特徴を持ちます。
- @CacheEntryCreated、@CacheEntryModified、@CacheEntryRemoved、@CacheEntryExpiredのみサポート
- 上記以外のイベントは通知されない
- イベント発生時のPre/Postのうち、PostのみがCluster Listenerに送られる
Infinispan 8.0.0.Finalで追加されたContinuous Queryは、このCluster Listenerの上に成り立っています。
とまあ、こんな感じですが、通常のCacheレベルのListenerと比べながら試してみたいと思います。
準備
sbtの設定は、以下の通り。infinispan-coreが依存関係にあれば大丈夫です。
build.sbt
name := "embedded-listeners" version := "0.0.1-SNAPSHOT" scalaVersion := "2.11.7" organization := "org.littlewings" scalacOptions ++= Seq("-Xlint", "-deprecation", "-unchecked", "-feature") updateOptions := updateOptions.value.withCachedResolution(true) libraryDependencies ++= Seq( "org.infinispan" % "infinispan-core" % "8.1.1.Final", "net.jcip" % "jcip-annotations" % "1.0" % "provided" )
Infinispanの設定は、以下の通り。CacheはDistributed CacheとしてTTLを30秒、JGroupsはデフォルト設定を使うことにしました。
src/main/resources/infinispan.xml
<?xml version="1.0" encoding="UTF-8"?> <infinispan xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:infinispan:config:8.1 http://www.infinispan.org/schemas/infinispan-config-8.1.xsd" xmlns="urn:infinispan:config:8.1"> <jgroups> <stack-file name="udp" path="default-configs/default-jgroups-udp.xml"/> </jgroups> <cache-container default-cache="distCache" shutdown-hook="REGISTER"> <transport stack="udp"/> <distributed-cache name="distCache"> <expiration lifespan="30000"/> </distributed-cache> </cache-container> </infinispan>
確認用プログラム
動作確認には、2種類のプログラムとListenerを用意することにしました。
まずは、Listenerから。
src/main/scala/org/littlewings/infinispan/listener/CacheListener.scala
package org.littlewings.infinispan.listener import org.infinispan.notifications.Listener import org.infinispan.notifications.cachelistener.annotation._ import org.infinispan.notifications.cachelistener.event.{CacheEntriesEvictedEvent, CacheEntryEvent} trait CacheListener[K, V] { val name: String @CacheEntryCreated @CacheEntryModified @CacheEntryRemoved @CacheEntryExpired def handleEvent(event: CacheEntryEvent[K, V]): Unit = { println(s"[${getClass.getSimpleName}]:${name} event = ${event.getType}, isPre = ${event.isPre}, key = ${event.getKey}, value = ${event.getValue}") } @CacheEntriesEvicted def handleEvictedEvent(event: CacheEntriesEvictedEvent[K, V]): Unit = { println(s"[${getClass.getSimpleName}]:${name} event = ${event.getType}, isPre = ${event.isPre}, entries = ${event.getEntries}") } } @Listener class LocalListener[K, V](val name: String) extends CacheListener[K, V] @Listener(clustered = true) class ClusteredListener[K, V](val name: String) extends CacheListener[K, V]
骨格実装はトレイトにまとめて、@CacheEntryCreated、@CacheEntryModified、@CacheEntryRemoved、@CacheEntryExpiredによる通知を受け取るメソッド、もうひとつ@CacheEntriesEvictedによる通知を受け取るメソッドを用意。
Listenerに名前を与えてわかりやすくするのと、イベント通知時に受け取った情報をある程度出力するようにしています。
println(s"[${getClass.getSimpleName}]:${name} event = ${event.getType}, isPre = ${event.isPre}, key = ${event.getKey}, value = ${event.getValue}")
実体としては、clusteredをtrueにしたものと、そうでないものの2種類を実装しました。
@Listener class LocalListener[K, V](val name: String) extends CacheListener[K, V] @Listener(clustered = true) class ClusteredListener[K, V](val name: String) extends CacheListener[K, V]
そして、Cacheを操作するプログラムとしては、簡単な対話形式のCUIプログラムと、単に浮いててもらうサーバーの2つを作成。
対話形式のプログラムはこちら。
src/main/scala/org/littlewings/infinispan/listener/Console.scala
package org.littlewings.infinispan.listener import org.infinispan.manager.DefaultCacheManager import scala.collection.JavaConverters._ object Console { def main(args: Array[String]): Unit = { System.setProperty("java.net.preferIPv4Stack", "true") val name :: mode :: Nil = args.toList val manager = new DefaultCacheManager("infinispan.xml") try { val cache = manager.getCache[String, String]("distCache") val listener = if (mode == "local") new LocalListener[String, String](name) else new ClusteredListener[String, String](name) cache.addListener(listener) Iterator .continually(System.console().readLine("> ")) .filter(l => l != null && !l.isEmpty) .takeWhile(_ != "exit") .foreach { line => line.split("\\s+").toList match { case "put" :: key :: value :: Nil => cache.put(key, value) println(s"key:${key}, value = ${value}, putted.") case "get" :: key :: Nil => println(s"get key = ${key} => value = ${cache.get(key)}") case "remove" :: key :: Nil => cache.remove(key) println(s"removed key = ${key}") case "evict" :: key :: Nil => cache.evict(key) println(s"evicted key = ${key}") case "all" :: Nil => cache.entrySet.asScala.foreach(e => println(s"key = ${e.getKey}, value = ${e.getValue}")) case _ => println(s"unknown command[${line}]") } } cache.stop() } finally { manager.stop() } } }
単に浮いててもらうサーバーは、こちら。
src/main/scala/org/littlewings/infinispan/listener/EmbeddedServer.scala
package org.littlewings.infinispan.listener import java.io.{InputStreamReader, BufferedReader} import java.nio.charset.StandardCharsets import org.infinispan.manager.DefaultCacheManager object EmbeddedServer { def main(args: Array[String]): Unit = { System.setProperty("java.net.preferIPv4Stack", "true") val name :: mode :: Nil = args.toList val manager = new DefaultCacheManager("infinispan.xml") try { val cache = manager.getCache[String, String]("distCache") val listener = if (mode == "local") new LocalListener[String, String](name) else new ClusteredListener[String, String](name) cache.addListener(listener) System.console().readLine("> Press Enter, stop!") cache.stop() } finally { manager.stop() } } }
それぞれ、第1引数に便宜上の名前、第2引数にListenerの種類を取ります。第2引数に「local」を選んだ場合はデフォルト、それ以外はCluster Listenerとなります。
確認
それでは、全部で3つのNodeをまずは「local」で起動。
## CUIツール $ sbt "runMain org.littlewings.infinispan.listener.Console console local" ## Node1 $ sbt "runMain org.littlewings.infinispan.listener.EmbeddedServer node1 local" ## Node2 $ sbt "runMain org.littlewings.infinispan.listener.EmbeddedServer node1 local"
3つのNodeが参加した、クラスタができあがります。
では、CUIツールでいくつか操作。
> put key1 value1 [LocalListener]:console event = CACHE_ENTRY_CREATED, isPre = true, key = key1, value = null [LocalListener]:console event = CACHE_ENTRY_CREATED, isPre = false, key = key1, value = value1 key:key1, value = value1, putted. > put key1 value1-1 [LocalListener]:console event = CACHE_ENTRY_MODIFIED, isPre = true, key = key1, value = value1 [LocalListener]:console event = CACHE_ENTRY_MODIFIED, isPre = false, key = key1, value = value1-1 key:key1, value = value1-1, putted. > put key2 value2 [LocalListener]:console event = CACHE_ENTRY_CREATED, isPre = true, key = key2, value = null [LocalListener]:console event = CACHE_ENTRY_CREATED, isPre = false, key = key2, value = value2 key:key2, value = value2, putted. > put key3 value3 [LocalListener]:console event = CACHE_ENTRY_CREATED, isPre = true, key = key3, value = null [LocalListener]:console event = CACHE_ENTRY_CREATED, isPre = false, key = key3, value = value3 key:key3, value = value3, putted. > put key3 value3-3 [LocalListener]:console event = CACHE_ENTRY_MODIFIED, isPre = true, key = key3, value = value3 [LocalListener]:console event = CACHE_ENTRY_MODIFIED, isPre = false, key = key3, value = value3-3 key:key3, value = value3-3, putted. > put key4 value4 [LocalListener]:console event = CACHE_ENTRY_CREATED, isPre = true, key = key4, value = null [LocalListener]:console event = CACHE_ENTRY_CREATED, isPre = false, key = key4, value = value4 key:key4, value = value4, putted. > remove key4 [LocalListener]:console event = CACHE_ENTRY_REMOVED, isPre = true, key = key4, value = value4 [LocalListener]:console event = CACHE_ENTRY_REMOVED, isPre = false, key = key4, value = null removed key = key4 > evict key1 [LocalListener]:console event = CACHE_ENTRY_EVICTED, isPre = false, entries = {key1=value1-1} evicted key = key1
コンソール出力があるもの、ないものがありますね。
また、 しばらく待っていると有効期限切れになります。
[LocalListener]:console event = CACHE_ENTRY_EXPIRED, isPre = false, key = key2, value = value2 [LocalListener]:console event = CACHE_ENTRY_EXPIRED, isPre = false, key = key3, value = value3-3 [LocalListener]:console event = CACHE_ENTRY_EXPIRED, isPre = false, key = key1, value = value1-1
他のNodeだと、今回はこうなりました。
## Node1 [LocalListener]:node1 event = CACHE_ENTRY_CREATED, isPre = true, key = key2, value = null [LocalListener]:node1 event = CACHE_ENTRY_CREATED, isPre = false, key = key2, value = value2 [LocalListener]:node1 event = CACHE_ENTRY_CREATED, isPre = true, key = key3, value = null [LocalListener]:node1 event = CACHE_ENTRY_CREATED, isPre = false, key = key3, value = value3 [LocalListener]:node1 event = CACHE_ENTRY_MODIFIED, isPre = true, key = key3, value = value3 [LocalListener]:node1 event = CACHE_ENTRY_MODIFIED, isPre = false, key = key3, value = value3-3 [LocalListener]:node1 event = CACHE_ENTRY_EXPIRED, isPre = false, key = key2, value = value2 [LocalListener]:node1 event = CACHE_ENTRY_EXPIRED, isPre = false, key = key3, value = value3-3 ## Node2 [LocalListener]:node2 event = CACHE_ENTRY_CREATED, isPre = false, key = key1, value = value1 [LocalListener]:node2 event = CACHE_ENTRY_MODIFIED, isPre = true, key = key1, value = value1 [LocalListener]:node2 event = CACHE_ENTRY_MODIFIED, isPre = false, key = key1, value = value1-1 [LocalListener]:node2 event = CACHE_ENTRY_CREATED, isPre = true, key = key4, value = null [LocalListener]:node2 event = CACHE_ENTRY_CREATED, isPre = false, key = key4, value = value4 [LocalListener]:node2 event = CACHE_ENTRY_REMOVED, isPre = true, key = key4, value = value4 [LocalListener]:node2 event = CACHE_ENTRY_REMOVED, isPre = false, key = key4, value = null [LocalListener]:node2 event = CACHE_ENTRY_EXPIRED, isPre = false, key = key1, value = value1-1
イベントを受け取るListenerは、データを持っているNode(バックアップNode含む)となっているみたいですね。また、isPreの結果がtrueとfalseになっていることから、Pre/Post両方のイベントが受け取れていることがわかります。
それでは1度プログラムを終了し、今度はCluster Listenerとして起動してみます。
## CUIツール $ sbt "runMain org.littlewings.infinispan.listener.Console console clustered" ## Node1 $ sbt "runMain org.littlewings.infinispan.listener.EmbeddedServer node1 clustered" ## Node2 $ sbt "runMain org.littlewings.infinispan.listener.EmbeddedServer node2 clustered"
先ほどと同じ操作を実行。
> put key1 value1 [ClusteredListener]:console event = CACHE_ENTRY_CREATED, isPre = false, key = key1, value = value1 key:key1, value = value1, putted. > put key1 value1-1 [ClusteredListener]:console event = CACHE_ENTRY_MODIFIED, isPre = false, key = key1, value = value1-1 key:key1, value = value1-1, putted. > put key2 value2 [ClusteredListener]:console event = CACHE_ENTRY_CREATED, isPre = false, key = key2, value = value2 key:key2, value = value2, putted. > put key3 value3 [ClusteredListener]:console event = CACHE_ENTRY_CREATED, isPre = false, key = key3, value = value3 key:key3, value = value3, putted. > put key3 value3-3 [ClusteredListener]:console event = CACHE_ENTRY_MODIFIED, isPre = false, key = key3, value = value3-3 key:key3, value = value3-3, putted. > put key4 value4 [ClusteredListener]:console event = CACHE_ENTRY_CREATED, isPre = false, key = key4, value = value4 key:key4, value = value4, putted. > remove key4 [ClusteredListener]:console event = CACHE_ENTRY_REMOVED, isPre = false, key = key4, value = null removed key = key4 > evict key1 evicted key = key1
出力される内容が、明らかに減りました。isPreの結果が全部falseになり、CACHE_ENTRY_EVICTEDイベントは発生しなくなっています。
こちらも、しばらく待っているとExpireします。
[ClusteredListener]:console event = CACHE_ENTRY_EXPIRED, isPre = false, key = key2, value = value2 [ClusteredListener]:console event = CACHE_ENTRY_EXPIRED, isPre = false, key = key1, value = value1-1 [ClusteredListener]:console event = CACHE_ENTRY_EXPIRED, isPre = false, key = key3, value = value3-3
残りのNodeの結果は、こちら。
## Node1 [ClusteredListener]:node1 event = CACHE_ENTRY_CREATED, isPre = false, key = key1, value = value1 [ClusteredListener]:node1 event = CACHE_ENTRY_MODIFIED, isPre = false, key = key1, value = value1-1 [ClusteredListener]:node1 event = CACHE_ENTRY_CREATED, isPre = false, key = key2, value = value2 [ClusteredListener]:node1 event = CACHE_ENTRY_CREATED, isPre = false, key = key3, value = value3 [ClusteredListener]:node1 event = CACHE_ENTRY_MODIFIED, isPre = false, key = key3, value = value3-3 [ClusteredListener]:node1 event = CACHE_ENTRY_CREATED, isPre = false, key = key4, value = value4 [ClusteredListener]:node1 event = CACHE_ENTRY_REMOVED, isPre = false, key = key4, value = null [ClusteredListener]:node1 event = CACHE_ENTRY_EXPIRED, isPre = false, key = key2, value = value2 [ClusteredListener]:node1 event = CACHE_ENTRY_EXPIRED, isPre = false, key = key1, value = value1-1 [ClusteredListener]:node1 event = CACHE_ENTRY_EXPIRED, isPre = false, key = key3, value = value3-3 ## Node2 [ClusteredListener]:node2 event = CACHE_ENTRY_CREATED, isPre = false, key = key1, value = value1 [ClusteredListener]:node2 event = CACHE_ENTRY_MODIFIED, isPre = false, key = key1, value = value1-1 [ClusteredListener]:node2 event = CACHE_ENTRY_CREATED, isPre = false, key = key2, value = value2 [ClusteredListener]:node2 event = CACHE_ENTRY_CREATED, isPre = false, key = key3, value = value3 [ClusteredListener]:node2 event = CACHE_ENTRY_MODIFIED, isPre = false, key = key3, value = value3-3 [ClusteredListener]:node2 event = CACHE_ENTRY_CREATED, isPre = false, key = key4, value = value4 [ClusteredListener]:node2 event = CACHE_ENTRY_REMOVED, isPre = false, key = key4, value = null [ClusteredListener]:node2 event = CACHE_ENTRY_EXPIRED, isPre = false, key = key2, value = value2 [ClusteredListener]:node2 event = CACHE_ENTRY_EXPIRED, isPre = false, key = key1, value = value1-1 [ClusteredListener]:node2 event = CACHE_ENTRY_EXPIRED, isPre = false, key = key3, value = value3-3
Node間で、出力する内容に差がなくなりましたね。
Cluster Listenerの動作
こうなると、Cluster Listenerがどのように実装されているかが気になるところですが、Distributed Executorで実装されているようです。
で、Distributed Executorが起動して
https://github.com/infinispan/infinispan/blob/8.1.1.Final/core/src/main/java/org/infinispan/notifications/cachelistener/cluster/impl/BatchingClusterEventManagerImpl.java#L47
https://github.com/infinispan/infinispan/blob/8.1.1.Final/core/src/main/java/org/infinispan/notifications/cachelistener/cluster/impl/BatchingClusterEventManagerImpl.java#L90
またCacheNotifierImplに戻ってきます。
https://github.com/infinispan/infinispan/blob/8.1.1.Final/core/src/main/java/org/infinispan/notifications/cachelistener/CacheNotifierImpl.java#L612
ちょっとぐるっと回ってる感じですが、なんとなくイメージはわかりました。
まとめ
Infinispan 7.0でCacheレベルのListenerに追加された、clusteredをtrue/falseにした場合の挙動を確認してみました。Cluster Listenerにした場合は、全Nodeのイベントを受け取るようになること、そうでない場合は該当のデータを持つ(キーが割り当てられた)NodeであればListenerが起動するということですね。
個人的には、復習も兼ねられましたと。
今回作成したコードは、こちらに置いています。
https://github.com/kazuhira-r/infinispan-getting-started/tree/master/embedded-listeners