Infinispanには、各種Cacheの操作などに対して、イベント通知を受け取って何か処理をするListernerの機能があります。
Listeners and Notifications
https://docs.jboss.org/author/display/ISPN/Listeners+and+Notifications
JBossDataGridのリスナーのコールバック関数を呼び出す
http://news.mynavi.jp/news/2013/05/24/236/index.html
勢いで試してみたら、けっこうなボリュームになりそうなので、何回かに分けて書きますね。
Listenerとは
CacheまたはCacheManagerに対して作成したListenerクラスを登録することで、各種イベントに対する通知を受け取れるようになり、それに応じた処理ができるようになります。
Listenerは、Listenableインターフェースを実装したクラスに対して、追加、削除、登録したListenerの取得を行うことができます。Listenableインターフェースを実装したものの例としては、Cache、CacheManager、AdvancedCacheなどがあります。
Listenable
http://docs.jboss.org/infinispan/5.3/apidocs/org/infinispan/notifications/Listenable.html
Listenerは、@Listenerアノテーションを付与したPOJOとして作成します。イベント通知を受けとるのは、これもイベントに対応したアノテーションを付与したメソッドになります。そのメソッドは、Eventインターフェースを実装したものになります。
以下は、ドキュメントからのJavaでの実装例です。
@Listener public class PrintWhenAdded { @CacheEntryCreated public void print(CacheEntryCreatedEvent event) { System.out.println("New entry " + event.getKey() + " created in the cache"); } }
これは、Cacheに対するエントリを追加した時のイベントを受け取るクラスになります。
ちなみに、EventインターフェースはCacheレベルのものとCacheManagerレベルでインターフェースが異なるのですが、実装方法はそんなに変わりません。
ひとつのメソッドで、複数の種類のイベント通知を受けることもできます。
@Listener public class MultipleEventListener { @CacheStarted @CacheStopped public void doSomething(Event event) { if (event.getType() == Event.Type.CACHE_STARTED) System.out.println("Cache started. Details = " + event); else if (event.getType() == Event.Type.CACHE_STOPPED) System.out.println("Cache stopped. Details = " + event); } }
このようにした場合は、Event#getTypeから発生したイベントの種類を判定することになります。
それではまず、CacheレベルのイベントとCacheManagerレベルのイベント通知をそれぞれ紹介しましょう。
なお、イベントの種類は@ListenerアノテーションのJavadocに記載があります。
@Listener
org.infinispan.notifications.Listener
Cacheレベルのイベント通知
Cacheレベルのイベント通知で使用するアノテーションとEventインターフェースの実装は、以下の通りです。
アノテーション | 通知されるイベント | 説明 |
---|---|---|
CacheEntryCreated | CacheEntryCreatedEvent | Cacheにエントリを追加された時(Cache#put)に発生 |
CacheEntryModified | CacheEntryModifiedEvent | Cacheのエントリが変更された時(Cache#replace)に発生(が、追加時(Cache#put)も発生する模様) |
CacheEntryRemoved | CacheEntryRemovedEvent | Cacheのエントリを削除した時(Cache#remove)に発生 |
CacheEntryVisited | CacheEntryVisitedEvent | Cacheのエントリを参照した時(Cache#get)に発生 |
CacheEntryLoaded | CacheEntryLoadedEvent | Cacheのエントリを、CacheStoreからロードした時に発生。参照時や変更時などに、必要に応じて発生 |
CacheEntriesEvicted | CacheEntriesEvictedEvent | Evictを有効にした場合に、メモリ上のCacheエントリ(複数)が、CacheStoreに追い出された場合に発生 |
CacheEntryActivated | CacheEntryActivatedEvent | EvictとPassivationを有効にした場合に、CacheエントリをCacheStoreからロード(活性化)した時に発生 |
CacheEntryPassivated | CacheEntryPassivatedEvent | EvictとPassivationを有効にした場合に、CacheエントリをCacheStoreに保存(非活性化)した時に発生 |
CacheEntryInvalidated | CacheEntryInvalidatedEvent | クラスタリングのモードがINVALIDATION_SYNCまたはINVALIDATION_ASYNCの時に、Cacheエントリの追加、変更などを行った時に、他のNode上のCacheエントリが無効になった場合に発生。複数のNodeで、同じキーに対して操作を行うと見ることができる |
TransactionRegistered | TransactionRegisteredEvent | トランザクション有効時に、トランザクションが開始された時に発生 |
TransactionCompleted | TransactionCompletedEvent | トランザクション有効時に、トランザクションが完了した時(コミット、ロールバック)に発生 |
DataRehashed | DataRehashedEvent | *クラスタリングのモードの時に、クラスタメンバの増減によりデータのリハッシュを行う際の開始、終了時に発生 |
TopologyChanged | TopologyChangedEvent | *クラスタリングのモードの時に、クラスタメンバの増減によりトポロジの構成に変更があった時の開始、終了で発生 |
*ドキュメントでは「クラスタリングのモードがDIST_SYNCまたはDIST_ASYNCの時に発生」とありましたが、ReplicationでもInvalidationでも発生しました…
アノテーションは
org.infinispan.notifications.cachelistener.annotation パッケージ
イベントは
org.infinispan.notifications.cachelistener.event パッケージ
にそれぞれ属します。
CacheManagerレベルのイベント通知
CacheManagerレベルのイベント通知で使用するアノテーションとEventインターフェースの実装は、以下の通りです。
*ここでいうEventインターフェースは、Cacheレベルのイベント通知で使用するEventインターフェースとは異なります
アノテーション | 通知されるイベント | 説明 |
---|---|---|
CacheStarted | CacheStartedEvent | Cacheの開始時(Cache#start)に発生。ただし、停止後のCacheを再度開始させると、Listenerの登録は解除されるため必要であれば再度登録しなければならないことに注意 |
CacheStopped | CacheStoppedEvent | Cacheの終了時(Cache#stop)に発生 |
ViewChanged | ViewChangedEvent | クラスタ参加メンバの増減により、クラスタ構成が変化した場合に発生 |
Merged | MergedEvent | 分断されたクラスタが、マージされた場合に発生 |
アノテーションは
org.infinispan.notifications.cachemanagerlistener.annotation パッケージ
イベントは
org.infinispan.notifications.cachemanagerlistener.event パッケージ
にそれぞれ属します。
準備
それでは、各Listenerを作成する準備を。今回は動作させませんが、次回以降にちょっと動作結果を書いていこうと思います。
build.sbt
name := "infinispan-listeners" version := "0.0.1-SNAPSHOT" scalaVersion := "2.10.2" organization := "littlewings" fork in run := true resolvers += "JBoss Public Maven Repository Group" at "http://repository.jboss.org/nexus/content/groups/public-jboss/" libraryDependencies ++= Seq( "org.infinispan" % "infinispan-core" % "5.3.0.Final", "net.jcip" % "jcip-annotations" % "1.0" )
Infinispan 5.3.0.Finalがリリースされましたね!!
Infinispanの設定ファイル。コメントアウトの調整で一通りの動作確認ができるよう、いろいろ書いてます。あと、CacheLoaderの保存先はシステムプロパティでちょっといじれるようになっています。
src/main/resources/infinispan.xml
<?xml version="1.0" encoding="UTF-8"?> <infinispan xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:infinispan:config:5.3 http://www.infinispan.org/schemas/infinispan-config-5.3.xsd" xmlns="urn:infinispan:config:5.3"> <global> <transport clusterName="listenres-cluster"> <properties> <property name="configurationFile" value="jgroups.xml" /> </properties> </transport> <globalJmxStatistics enabled="true" jmxDomain="org.infinispan" cacheManagerName="DefaultCacheManager" /> <!-- <asyncListenerExecutor factory="org.infinispan.executors.DefaultExecutorFactory"> <properties> <property name="maxThreads" value="5"/> <property name="threadNamePrefix" value="AsyncListenerThread"/> </properties> </asyncListenerExecutor> --> </global> <namedCache name="listenersCache"> <jmxStatistics enabled="true"/> <!-- <clustering mode="invalidation" /> --> <!-- <clustering mode="replication" /> --> <clustering mode="distribution"> <hash numOwners="2" /> <sync /> </clustering> <eviction strategy="LIRS" maxEntries="2" /> <loaders passivation="true" shared="false" preload="true"> <loader class="org.infinispan.loaders.file.FileCacheStore" fetchPersistentState="false" ignoreModifications="false" purgeOnStartup="false"> <properties> <property name="location" value="cache-store-${nodeId}" /> </properties> </loader> </loaders> <transaction transactionManagerLookupClass="org.infinispan.transaction.lookup.GenericTransactionManagerLookup" transactionMode="TRANSACTIONAL" lockingMode="OPTIMISTIC" autoCommit="true" /> </namedCache> </infinispan>
src/main/resources/jgroups.xml
<?xml version="1.0" encoding="UTF-8"?> <config xmlns="urn:org:jgroups" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/JGroups-3.2.xsd"> <UDP mcast_addr="${jgroups.udp.mcast_addr:228.11.11.11}" mcast_port="${jgroups.udp.mcast_port:45688}" tos="8" ucast_recv_buf_size="130000" ucast_send_buf_size="100000" mcast_recv_buf_size="130000" mcast_send_buf_size="100000" loopback="true" thread_naming_pattern="cl" thread_pool.enabled="true" thread_pool.min_threads="2" thread_pool.max_threads="8" thread_pool.keep_alive_time="5000" thread_pool.queue_enabled="true" thread_pool.queue_max_size="1000" thread_pool.rejection_policy="discard" oob_thread_pool.enabled="true" oob_thread_pool.min_threads="2" oob_thread_pool.max_threads="8" oob_thread_pool.keep_alive_time="1000" oob_thread_pool.queue_enabled="false" oob_thread_pool.rejection_policy="discard" /> <PING /> <FD_ALL /> <FD_SOCK /> <UNICAST2 /> <MERGE3 /> <pbcast.NAKACK2 /> <pbcast.GMS print_local_addr="true" /> </config>
ロギング用のトレイト。
src/main/scala/LogSupport.scala
import org.infinispan.notifications.cachelistener.event.Event trait LogSupport { def log(msg: => Any): Unit = { println(s"${prefix}${msg}") } protected def prefix: String = "" } trait SimpleClassNameLogSupport extends LogSupport { override protected def prefix: String = s"${super.prefix}[${getClass.getSimpleName}]# " } trait ThreadNameLogSupport extends LogSupport { override protected def prefix: String = s"${super.prefix}[${Thread.currentThread.getName}] " } trait PrePostLogSupport extends LogSupport { def log(event: Event[_, _], msg: Any): Unit = if (event.isPre) log("[Pre ] " + msg) else log("[Post] " + msg) }
CacheレベルのListener。
src/main/scala/CacheLevelListener.scala
import org.infinispan.notifications.Listener import org.infinispan.notifications.cachelistener.annotation._ import org.infinispan.notifications.cachelistener.event._ //@Listener(sync = true) //@Listener(sync = false) @Listener class CacheLevelListener extends SimpleClassNameLogSupport with PrePostLogSupport { @CacheEntryCreated def cacheEntryCreated(event: CacheEntryCreatedEvent[_, _]): Unit = log(event, s"作成イベント => ${event.getKey + ":" + event.getValue}") @CacheEntryRemoved def cacheEntryRemoved(event: CacheEntryRemovedEvent[_, _]): Unit = log(event, s"削除イベント => ${event.getKey + ":" + event.getValue}, 古い値 => ${event.getOldValue}") @CacheEntryModified def cacheEntryModified(event: CacheEntryModifiedEvent[_, _]): Unit = log(event, s"変更イベント => ${event.getKey + ":" + event.getValue}, isCreated? => ${event.isCreated}") @CacheEntryVisited def cacheEntryVisited(event: CacheEntryVisitedEvent[_, _]): Unit = log(event, s"参照イベント => ${event.getKey + ":" + event.getValue}") @CacheEntryLoaded def cacheEntryLoaded(event: CacheEntryLoadedEvent[_, _]): Unit = log(event, s"ロード完了イベント => ${event.getKey + ":" + event.getValue}") /** ひとつのメソッドで、複数のイベントを受け取ることも可能 **/ @CacheEntryActivated @CacheEntryPassivated def cacheEntryActivatedOrPassivated(e: CacheEntryEvent[_, _]): Unit = e.getType match { case Event.Type.CACHE_ENTRY_ACTIVATED => val event = e.asInstanceOf[CacheEntryActivatedEvent[_, _]] log(event, s"活性化イベント => ${event.getKey + ":" + event.getValue}") case Event.Type.CACHE_ENTRY_PASSIVATED => val event = e.asInstanceOf[CacheEntryPassivatedEvent[_, _]] log(event, s"非活性化イベント => ${event.getKey + ":" + event.getValue}") case _ => throw new IllegalArgumentException(e.getType.toString) } @CacheEntriesEvicted def cacheEntriesEvicted(event: CacheEntriesEvictedEvent[_, _]): Unit = log(event, s"エビクトイベント => ${event.getEntries}") @CacheEntryInvalidated def cacheEntryInvalidated(event: CacheEntryInvalidatedEvent[_, _]): Unit = log(event, s"無効化イベント => ${event.getKey + ":" + event.getValue}") @TransactionRegistered def transactionRegistered(event: TransactionRegisteredEvent[_, _]): Unit = log(event, s"トランザクション登録 => ${event.getGlobalTransaction}, isOriginLocal? => ${event.isOriginLocal}") @TransactionCompleted def transactionCompleted(event: TransactionCompletedEvent[_, _]): Unit = log(event, s"トランザクション完了 => ${event.getGlobalTransaction}, isTransactionSuccessful? => ${event.isTransactionSuccessful}, isOriginLocal? => ${event.isOriginLocal}") @DataRehashed def dataRehashed(event: DataRehashedEvent[_, _]): Unit = log(event, s"データリハッシュ => ${event.getNewTopologyId}, memberAtStart => ${event.getMembersAtStart}, memberAtEnd => ${event.getMembersAtEnd}, getConsistentHashAtStart => ${event.getConsistentHashAtStart}, getConsistentHashAtEnd => ${event.getConsistentHashAtEnd}") @TopologyChanged def topologyChanged(event: TopologyChangedEvent[_, _]): Unit = log(event, s"トポロジ変更 => ${event.getNewTopologyId}, getConsistentHashAtStart => ${event.getConsistentHashAtStart}, getConsistentHashAtEnd => ${event.getConsistentHashAtEnd}") }
CacheManagerレベルのListener。
src/main/scala/CacheManagerLevelListener.scala
import org.infinispan.notifications.Listener import org.infinispan.notifications.cachemanagerlistener.annotation._ import org.infinispan.notifications.cachemanagerlistener.event._ //@Listener(sync = true) //@Listener(sync = false) @Listener class CacheManagerLevelListener extends SimpleClassNameLogSupport { @CacheStarted def cacheStarted(event: CacheStartedEvent): Unit = log(s"キャッシュ開始イベント => ${event.getCacheName}") @CacheStopped def cacheStopped(event: CacheStoppedEvent): Unit = log(s"キャッシュ停止イベント => ${event.getCacheName}") @ViewChanged def viewChanged(event: ViewChangedEvent): Unit = log(s"ビュー変更イベント => isMergeView? => ${event.isMergeView}, viewId => ${event.getViewId}, localAddress => ${event.getLocalAddress}, getOldMembers => ${event.getOldMembers}, getNewMembers => ${event.getNewMembers}") @Merged def merged(event: MergeEvent): Unit = log(s"マージイベント => ${event.getSubgroupsMerged}, isMergeView? => ${event.isMergeView}, viewId => ${event.getViewId}, localAddress => ${event.getLocalAddress}, getOldMembers => ${event.getOldMembers}, getNewMembers => ${event.getNewMembers}") }
さまざまなCache操作を行うクラス。
src/main/scala/InfinispanListeners.scala
import scala.collection.JavaConverters._ import javax.transaction.Status import org.infinispan.Cache import org.infinispan.configuration.cache.CacheMode import org.infinispan.manager.DefaultCacheManager import org.infinispan.transaction.TransactionMode object InfinispanListeners extends SimpleClassNameLogSupport { def main(args: Array[String]): Unit = { System.setProperty("nodeId", "master") val manager = new DefaultCacheManager("infinispan.xml") val cache = manager.getCache[String, String]("listenersCache") manager.addListener(new CacheManagerLevelListener) cache.addListener(new CacheLevelListener) cache.stop() cache.start() cache.addListener(new CacheLevelListener) try { simplePattern(cache) useTransactionIfEnabled(cache) manyEntries(cache, 100, 110) } finally { cache.stop() manager.stop() } } def simplePattern(cache: Cache[String, String]): Unit = { val entry = ("key1", "value1") log(s"エントリ $entry を参照します") cache.get(entry._1) log(s"エントリ $entry を登録します") cache.put(entry._1, entry._2) log(s"エントリ $entry を参照します") cache.get(entry._1) val entryReplace = ("key1", "value1-replace") log(s"エントリ $entryReplace をreplaceで更新します") cache.replace(entryReplace._1, entryReplace._2) val entryPut = ("key1", "value1-put") log(s"エントリ $entryPut をputで更新します") cache.put(entryPut._1, entryPut._2) log(s"エントリ $entry を削除します") cache.remove(entry._1) } def manyEntries(cache: Cache[String, String], start: Int, end: Int): Unit = { val keysValues = (start to end) map (i => (s"key$i", s"value$i")) log(s"エントリを ${keysValues.size} 個参照します") keysValues.foreach { case (k, _) => cache.get(k) } log(s"エントリを ${keysValues.size} 個登録します") keysValues.foreach { case (k, v) => cache.put(k, v) } } def useTransactionIfEnabled(cache: Cache[String, String]): Unit = cache.getCacheConfiguration.transaction.transactionMode match { case TransactionMode.TRANSACTIONAL => val tm = cache.getAdvancedCache.getTransactionManager try { log("トランザクションを開始します") tm.begin() val pair1 = ("transactional-key1", "transactional-value1") log(s"データ $pair1 を登録します") cache.put(pair1._1, pair1._2) val pair1Update = ("transactional-key1", "transactional-value1-update") log(s"データ $pair1Update を更新します") cache.put(pair1Update._1, pair1Update._2) log("トランザクションをコミットします") tm.commit() } catch { case th: Throwable => th.printStackTrace() tm.getStatus match { case Status.STATUS_ACTIVE | Status.STATUS_MARKED_ROLLBACK => log("トランザクションをロールバックします") tm.rollback() case _ => } } log("トランザクションを開始します") tm.begin() val pair1 = ("transactional-key1", "transactional-value1") log(s"データ $pair1 を削除します") cache.remove(pair1._1) val pair2 = ("transactional-key2", "transactional-value2") log(s"データ $pair2 を登録します") cache.put(pair2._1, pair2._2) log("トランザクションをロールバックします") tm.rollback() case _ => } }
クラスタリング用に浮いててもらうサーバ。
src/main/scala/EmbeddedCacheServer.scala
import org.infinispan.manager.DefaultCacheManager object EmbeddedCacheServer extends SimpleClassNameLogSupport { def main(args: Array[String]): Unit = { System.setProperty("nodeId", args(0)) val manager = new DefaultCacheManager("infinispan.xml") val cache = manager.getCache[Any, Any]("listenersCache") manager.addListener(new CacheManagerLevelListener) cache.addListener(new CacheLevelListener) val keysValues = (10 to 15) map (i => (s"key$i", s"value$i")) log(s"エントリを ${keysValues.size} 個登録します") keysValues.foreach { case (k, v) => cache.put(k, v) } } }
Invalidation確認用のクラス。クラスタリング用のサーバと、合わせて使います。
src/main/scala/Invalidator.scala
import org.infinispan.manager.DefaultCacheManager object Invalidator extends SimpleClassNameLogSupport { def main(args: Array[String]): Unit = { System.setProperty("nodeId", "invalidator") val manager = new DefaultCacheManager("infinispan.xml") val cache = manager.getCache[String, String]("listenersCache") manager.addListener(new CacheManagerLevelListener) cache.addListener(new CacheLevelListener) val entry = ("key1", "value1-invalidate") log(s"エントリ $entry を登録します") cache.put(entry._1, entry._2) val keysValues = (10 to 15) map (i => (s"key$i", s"value$i")) log(s"エントリを ${keysValues.size} 個登録します") keysValues.foreach { case (k, v) => cache.put(k, v) } cache.stop() manager.stop() } }
mainメソッドを持ったクラスというかオブジェクトは、すべてListenerを登録した同じCacheを使用します。
それでは、結果確認はまた次回に。