CLOVER🍀

That was when it all began.

InfinispanのListener機能を使ってみる - 1

Infinispanには、各種Cacheの操作などに対して、イベント通知を受け取って何か処理をするListernerの機能があります。

Listeners and Notifications
https://docs.jboss.org/author/display/ISPN/Listeners+and+Notifications

JBossDataGridのリスナーのコールバック関数を呼び出す
http://news.mynavi.jp/news/2013/05/24/236/index.html

勢いで試してみたら、けっこうなボリュームになりそうなので、何回かに分けて書きますね。

Listenerとは

CacheまたはCacheManagerに対して作成したListenerクラスを登録することで、各種イベントに対する通知を受け取れるようになり、それに応じた処理ができるようになります。

Listenerは、Listenableインターフェースを実装したクラスに対して、追加、削除、登録したListenerの取得を行うことができます。Listenableインターフェースを実装したものの例としては、Cache、CacheManager、AdvancedCacheなどがあります。

Listenable
http://docs.jboss.org/infinispan/5.3/apidocs/org/infinispan/notifications/Listenable.html

Listenerは、@Listenerアノテーションを付与したPOJOとして作成します。イベント通知を受けとるのは、これもイベントに対応したアノテーションを付与したメソッドになります。そのメソッドは、Eventインターフェースを実装したものになります。

以下は、ドキュメントからのJavaでの実装例です。

@Listener
public class PrintWhenAdded {
 
  @CacheEntryCreated
  public void print(CacheEntryCreatedEvent event) {
    System.out.println("New entry " + event.getKey() + " created in the cache");
  }
}

これは、Cacheに対するエントリを追加した時のイベントを受け取るクラスになります。

ちなみに、EventインターフェースはCacheレベルのものとCacheManagerレベルでインターフェースが異なるのですが、実装方法はそんなに変わりません。

ひとつのメソッドで、複数の種類のイベント通知を受けることもできます。

@Listener
public class MultipleEventListener {
    @CacheStarted
    @CacheStopped
    public void doSomething(Event event) {
       if (event.getType() == Event.Type.CACHE_STARTED)
           System.out.println("Cache started.  Details = " + event);
       else if (event.getType() == Event.Type.CACHE_STOPPED)
           System.out.println("Cache stopped.  Details = " + event);
    }
}

このようにした場合は、Event#getTypeから発生したイベントの種類を判定することになります。

それではまず、CacheレベルのイベントとCacheManagerレベルのイベント通知をそれぞれ紹介しましょう。

なお、イベントの種類は@ListenerアノテーションJavadocに記載があります。

@Listener
org.infinispan.notifications.Listener

Cacheレベルのイベント通知

Cacheレベルのイベント通知で使用するアノテーションとEventインターフェースの実装は、以下の通りです。

アノテーション 通知されるイベント 説明
CacheEntryCreated CacheEntryCreatedEvent Cacheにエントリを追加された時(Cache#put)に発生
CacheEntryModified CacheEntryModifiedEvent Cacheのエントリが変更された時(Cache#replace)に発生(が、追加時(Cache#put)も発生する模様)
CacheEntryRemoved CacheEntryRemovedEvent Cacheのエントリを削除した時(Cache#remove)に発生
CacheEntryVisited CacheEntryVisitedEvent Cacheのエントリを参照した時(Cache#get)に発生
CacheEntryLoaded CacheEntryLoadedEvent Cacheのエントリを、CacheStoreからロードした時に発生。参照時や変更時などに、必要に応じて発生
CacheEntriesEvicted CacheEntriesEvictedEvent Evictを有効にした場合に、メモリ上のCacheエントリ(複数)が、CacheStoreに追い出された場合に発生
CacheEntryActivated CacheEntryActivatedEvent EvictとPassivationを有効にした場合に、CacheエントリをCacheStoreからロード(活性化)した時に発生
CacheEntryPassivated CacheEntryPassivatedEvent EvictとPassivationを有効にした場合に、CacheエントリをCacheStoreに保存(非活性化)した時に発生
CacheEntryInvalidated CacheEntryInvalidatedEvent クラスタリングのモードがINVALIDATION_SYNCまたはINVALIDATION_ASYNCの時に、Cacheエントリの追加、変更などを行った時に、他のNode上のCacheエントリが無効になった場合に発生。複数のNodeで、同じキーに対して操作を行うと見ることができる
TransactionRegistered TransactionRegisteredEvent トランザクション有効時に、トランザクションが開始された時に発生
TransactionCompleted TransactionCompletedEvent トランザクション有効時に、トランザクションが完了した時(コミット、ロールバック)に発生
DataRehashed DataRehashedEvent クラスタリングのモードの時に、クラスタメンバの増減によりデータのリハッシュを行う際の開始、終了時に発生
TopologyChanged TopologyChangedEvent クラスタリングのモードの時に、クラスタメンバの増減によりトポロジの構成に変更があった時の開始、終了で発生

*ドキュメントでは「クラスタリングのモードがDIST_SYNCまたはDIST_ASYNCの時に発生」とありましたが、ReplicationでもInvalidationでも発生しました…

アノテーション
org.infinispan.notifications.cachelistener.annotation パッケージ
イベントは
org.infinispan.notifications.cachelistener.event パッケージ
にそれぞれ属します。

CacheManagerレベルのイベント通知

CacheManagerレベルのイベント通知で使用するアノテーションとEventインターフェースの実装は、以下の通りです。
*ここでいうEventインターフェースは、Cacheレベルのイベント通知で使用するEventインターフェースとは異なります

アノテーション 通知されるイベント 説明
CacheStarted CacheStartedEvent Cacheの開始時(Cache#start)に発生。ただし、停止後のCacheを再度開始させると、Listenerの登録は解除されるため必要であれば再度登録しなければならないことに注意
CacheStopped CacheStoppedEvent Cacheの終了時(Cache#stop)に発生
ViewChanged ViewChangedEvent クラスタ参加メンバの増減により、クラスタ構成が変化した場合に発生
Merged MergedEvent 分断されたクラスタが、マージされた場合に発生

アノテーション
org.infinispan.notifications.cachemanagerlistener.annotation パッケージ
イベントは
org.infinispan.notifications.cachemanagerlistener.event パッケージ
にそれぞれ属します。

準備

それでは、各Listenerを作成する準備を。今回は動作させませんが、次回以降にちょっと動作結果を書いていこうと思います。

build.sbt

name := "infinispan-listeners"

version := "0.0.1-SNAPSHOT"

scalaVersion := "2.10.2"

organization := "littlewings"

fork in run := true

resolvers += "JBoss Public Maven Repository Group" at "http://repository.jboss.org/nexus/content/groups/public-jboss/"

libraryDependencies ++= Seq(
  "org.infinispan" % "infinispan-core" % "5.3.0.Final",
  "net.jcip" % "jcip-annotations" % "1.0"
)

Infinispan 5.3.0.Finalがリリースされましたね!!

Infinispanの設定ファイル。コメントアウトの調整で一通りの動作確認ができるよう、いろいろ書いてます。あと、CacheLoaderの保存先はシステムプロパティでちょっといじれるようになっています。
src/main/resources/infinispan.xml

<?xml version="1.0" encoding="UTF-8"?>
<infinispan
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="urn:infinispan:config:5.3 http://www.infinispan.org/schemas/infinispan-config-5.3.xsd"
    xmlns="urn:infinispan:config:5.3">

  <global>
    <transport clusterName="listenres-cluster">
      <properties>
        <property name="configurationFile" value="jgroups.xml" />
      </properties>
    </transport>
    <globalJmxStatistics
        enabled="true"
        jmxDomain="org.infinispan"
        cacheManagerName="DefaultCacheManager"
        />

    <!--
    <asyncListenerExecutor factory="org.infinispan.executors.DefaultExecutorFactory">
      <properties>
        <property name="maxThreads" value="5"/>
        <property name="threadNamePrefix" value="AsyncListenerThread"/>
      </properties>
    </asyncListenerExecutor>
    -->
  </global>

  <namedCache name="listenersCache">
    <jmxStatistics enabled="true"/>

    <!--
    <clustering mode="invalidation" />
    -->
    <!--
    <clustering mode="replication" />
    -->

    <clustering mode="distribution">
      <hash numOwners="2" />
      <sync />
    </clustering>

    <eviction strategy="LIRS" maxEntries="2" />
    <loaders passivation="true" shared="false" preload="true">
      <loader class="org.infinispan.loaders.file.FileCacheStore"
              fetchPersistentState="false" ignoreModifications="false"
              purgeOnStartup="false">
        <properties>
          <property name="location" value="cache-store-${nodeId}" />
        </properties>
      </loader>
    </loaders>

    <transaction
        transactionManagerLookupClass="org.infinispan.transaction.lookup.GenericTransactionManagerLookup"
        transactionMode="TRANSACTIONAL"
        lockingMode="OPTIMISTIC"
        autoCommit="true" />
  </namedCache>
</infinispan>

src/main/resources/jgroups.xml

<?xml version="1.0" encoding="UTF-8"?>
<config xmlns="urn:org:jgroups"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/JGroups-3.2.xsd">
  <UDP
      mcast_addr="${jgroups.udp.mcast_addr:228.11.11.11}"
      mcast_port="${jgroups.udp.mcast_port:45688}"
      tos="8"
      ucast_recv_buf_size="130000"
      ucast_send_buf_size="100000"
      mcast_recv_buf_size="130000"
      mcast_send_buf_size="100000"
      loopback="true"

      thread_naming_pattern="cl"

      thread_pool.enabled="true"
      thread_pool.min_threads="2"
      thread_pool.max_threads="8"
      thread_pool.keep_alive_time="5000"
      thread_pool.queue_enabled="true"
      thread_pool.queue_max_size="1000"
      thread_pool.rejection_policy="discard"

      oob_thread_pool.enabled="true"
      oob_thread_pool.min_threads="2"
      oob_thread_pool.max_threads="8"
      oob_thread_pool.keep_alive_time="1000"
      oob_thread_pool.queue_enabled="false"
      oob_thread_pool.rejection_policy="discard"
      />

  <PING />
  <FD_ALL />
  <FD_SOCK />
  <UNICAST2 />
  <MERGE3 />
  <pbcast.NAKACK2 />
  <pbcast.GMS print_local_addr="true" />
</config>

ロギング用のトレイト。
src/main/scala/LogSupport.scala

import org.infinispan.notifications.cachelistener.event.Event

trait LogSupport {
  def log(msg: => Any): Unit = {
    println(s"${prefix}${msg}")
  }

  protected def prefix: String = ""
}

trait SimpleClassNameLogSupport extends LogSupport {
  override protected def prefix: String =
    s"${super.prefix}[${getClass.getSimpleName}]# "
}

trait ThreadNameLogSupport extends LogSupport {
  override protected def prefix: String =
    s"${super.prefix}[${Thread.currentThread.getName}] "
}

trait PrePostLogSupport extends LogSupport {
  def log(event: Event[_, _], msg: Any): Unit =
    if (event.isPre)
      log("[Pre ] " + msg)
    else 
      log("[Post] " + msg)
}

CacheレベルのListener。
src/main/scala/CacheLevelListener.scala

import org.infinispan.notifications.Listener
import org.infinispan.notifications.cachelistener.annotation._
import org.infinispan.notifications.cachelistener.event._

//@Listener(sync = true)
//@Listener(sync = false)
@Listener
class CacheLevelListener extends SimpleClassNameLogSupport
                         with PrePostLogSupport {
  @CacheEntryCreated
  def cacheEntryCreated(event: CacheEntryCreatedEvent[_, _]): Unit =
    log(event, s"作成イベント => ${event.getKey + ":" + event.getValue}")

  @CacheEntryRemoved
  def cacheEntryRemoved(event: CacheEntryRemovedEvent[_, _]): Unit =
    log(event, s"削除イベント => ${event.getKey + ":" + event.getValue}, 古い値 => ${event.getOldValue}")

  @CacheEntryModified
  def cacheEntryModified(event: CacheEntryModifiedEvent[_, _]): Unit =
    log(event, s"変更イベント => ${event.getKey + ":" + event.getValue}, isCreated? => ${event.isCreated}")

  @CacheEntryVisited
  def cacheEntryVisited(event: CacheEntryVisitedEvent[_, _]): Unit =
    log(event, s"参照イベント => ${event.getKey + ":" + event.getValue}")

  @CacheEntryLoaded
  def cacheEntryLoaded(event: CacheEntryLoadedEvent[_, _]): Unit =
    log(event, s"ロード完了イベント => ${event.getKey + ":" + event.getValue}")

  /** ひとつのメソッドで、複数のイベントを受け取ることも可能 **/
  @CacheEntryActivated
  @CacheEntryPassivated
  def cacheEntryActivatedOrPassivated(e: CacheEntryEvent[_, _]): Unit =
    e.getType match {
      case Event.Type.CACHE_ENTRY_ACTIVATED =>
        val event = e.asInstanceOf[CacheEntryActivatedEvent[_, _]]
        log(event, s"活性化イベント => ${event.getKey + ":" + event.getValue}")
      case Event.Type.CACHE_ENTRY_PASSIVATED =>
        val event = e.asInstanceOf[CacheEntryPassivatedEvent[_, _]]
        log(event, s"非活性化イベント => ${event.getKey + ":" + event.getValue}")
      case _ => throw new IllegalArgumentException(e.getType.toString)
    }

  @CacheEntriesEvicted
  def cacheEntriesEvicted(event: CacheEntriesEvictedEvent[_, _]): Unit =
    log(event, s"エビクトイベント => ${event.getEntries}")

  @CacheEntryInvalidated
  def cacheEntryInvalidated(event: CacheEntryInvalidatedEvent[_, _]): Unit =
    log(event, s"無効化イベント => ${event.getKey + ":" + event.getValue}")

  @TransactionRegistered
  def transactionRegistered(event: TransactionRegisteredEvent[_, _]): Unit =
    log(event, s"トランザクション登録 => ${event.getGlobalTransaction}, isOriginLocal? => ${event.isOriginLocal}")

  @TransactionCompleted
  def transactionCompleted(event: TransactionCompletedEvent[_, _]): Unit =
    log(event, s"トランザクション完了 => ${event.getGlobalTransaction}, isTransactionSuccessful? => ${event.isTransactionSuccessful}, isOriginLocal? => ${event.isOriginLocal}")

  @DataRehashed
  def dataRehashed(event: DataRehashedEvent[_, _]): Unit =
    log(event, s"データリハッシュ => ${event.getNewTopologyId}, memberAtStart => ${event.getMembersAtStart}, memberAtEnd => ${event.getMembersAtEnd}, getConsistentHashAtStart => ${event.getConsistentHashAtStart}, getConsistentHashAtEnd => ${event.getConsistentHashAtEnd}")

  @TopologyChanged
  def topologyChanged(event: TopologyChangedEvent[_, _]): Unit =
    log(event, s"トポロジ変更 => ${event.getNewTopologyId}, getConsistentHashAtStart => ${event.getConsistentHashAtStart}, getConsistentHashAtEnd => ${event.getConsistentHashAtEnd}")
}

CacheManagerレベルのListener。
src/main/scala/CacheManagerLevelListener.scala

import org.infinispan.notifications.Listener
import org.infinispan.notifications.cachemanagerlistener.annotation._
import org.infinispan.notifications.cachemanagerlistener.event._

//@Listener(sync = true)
//@Listener(sync = false)
@Listener
class CacheManagerLevelListener extends SimpleClassNameLogSupport {
  @CacheStarted
  def cacheStarted(event: CacheStartedEvent): Unit =
    log(s"キャッシュ開始イベント => ${event.getCacheName}")

  @CacheStopped
  def cacheStopped(event: CacheStoppedEvent): Unit =
    log(s"キャッシュ停止イベント => ${event.getCacheName}")

  @ViewChanged
  def viewChanged(event: ViewChangedEvent): Unit =
    log(s"ビュー変更イベント => isMergeView? => ${event.isMergeView}, viewId => ${event.getViewId}, localAddress => ${event.getLocalAddress}, getOldMembers => ${event.getOldMembers}, getNewMembers => ${event.getNewMembers}")

  @Merged
  def merged(event: MergeEvent): Unit =
    log(s"マージイベント => ${event.getSubgroupsMerged}, isMergeView? => ${event.isMergeView}, viewId => ${event.getViewId}, localAddress => ${event.getLocalAddress}, getOldMembers => ${event.getOldMembers}, getNewMembers => ${event.getNewMembers}")
}

さまざまなCache操作を行うクラス。
src/main/scala/InfinispanListeners.scala

import scala.collection.JavaConverters._

import javax.transaction.Status

import org.infinispan.Cache
import org.infinispan.configuration.cache.CacheMode
import org.infinispan.manager.DefaultCacheManager
import org.infinispan.transaction.TransactionMode

object InfinispanListeners extends SimpleClassNameLogSupport {
  def main(args: Array[String]): Unit = {
    System.setProperty("nodeId", "master")

    val manager = new DefaultCacheManager("infinispan.xml")
    val cache = manager.getCache[String, String]("listenersCache")

    manager.addListener(new CacheManagerLevelListener)
    cache.addListener(new CacheLevelListener)

    cache.stop()
    cache.start()

    cache.addListener(new CacheLevelListener)

    try {
      simplePattern(cache)
      useTransactionIfEnabled(cache)
      manyEntries(cache, 100, 110)
    } finally {
      cache.stop()
      manager.stop()
    }
  }

  def simplePattern(cache: Cache[String, String]): Unit = {
    val entry = ("key1", "value1")
    log(s"エントリ $entry を参照します")
    cache.get(entry._1)

    log(s"エントリ $entry を登録します")
    cache.put(entry._1, entry._2)

    log(s"エントリ $entry を参照します")
    cache.get(entry._1)

    val entryReplace = ("key1", "value1-replace")
    log(s"エントリ $entryReplace をreplaceで更新します")
    cache.replace(entryReplace._1, entryReplace._2)

    val entryPut = ("key1", "value1-put")
    log(s"エントリ $entryPut をputで更新します")
    cache.put(entryPut._1, entryPut._2)

    log(s"エントリ $entry を削除します")
    cache.remove(entry._1)
  }

  def manyEntries(cache: Cache[String, String], start: Int, end: Int): Unit = {
    val keysValues = (start to end) map (i => (s"key$i", s"value$i"))
    log(s"エントリを ${keysValues.size} 個参照します")
    keysValues.foreach { case (k, _) => cache.get(k) }

    log(s"エントリを ${keysValues.size} 個登録します")
    keysValues.foreach { case (k, v) => cache.put(k, v) }
  }

  def useTransactionIfEnabled(cache: Cache[String, String]): Unit =
    cache.getCacheConfiguration.transaction.transactionMode match {
      case TransactionMode.TRANSACTIONAL =>
        val tm = cache.getAdvancedCache.getTransactionManager

        try {
          log("トランザクションを開始します")
          tm.begin()

          val pair1 = ("transactional-key1", "transactional-value1")
          log(s"データ $pair1 を登録します")
          cache.put(pair1._1, pair1._2)

          val pair1Update = ("transactional-key1", "transactional-value1-update")
          log(s"データ $pair1Update を更新します")
          cache.put(pair1Update._1, pair1Update._2)

          log("トランザクションをコミットします")
          tm.commit()
        } catch {
          case th: Throwable =>
            th.printStackTrace()
            tm.getStatus match {
              case Status.STATUS_ACTIVE | Status.STATUS_MARKED_ROLLBACK =>
                log("トランザクションをロールバックします")
                tm.rollback()
              case _ =>
            }
        }

        log("トランザクションを開始します")
        tm.begin()

        val pair1 = ("transactional-key1", "transactional-value1")
        log(s"データ $pair1 を削除します")
        cache.remove(pair1._1)

        val pair2 = ("transactional-key2", "transactional-value2")
        log(s"データ $pair2 を登録します")
        cache.put(pair2._1, pair2._2)

        log("トランザクションをロールバックします")
        tm.rollback()
      case _ =>
    }
}

クラスタリング用に浮いててもらうサーバ。
src/main/scala/EmbeddedCacheServer.scala

import org.infinispan.manager.DefaultCacheManager

object EmbeddedCacheServer extends SimpleClassNameLogSupport {
  def main(args: Array[String]): Unit = {
    System.setProperty("nodeId", args(0))

    val manager = new DefaultCacheManager("infinispan.xml")
    val cache = manager.getCache[Any, Any]("listenersCache")

    manager.addListener(new CacheManagerLevelListener)
    cache.addListener(new CacheLevelListener)

    val keysValues = (10 to 15) map (i => (s"key$i", s"value$i"))
    log(s"エントリを ${keysValues.size} 個登録します")
    keysValues.foreach { case (k, v) => cache.put(k, v) }
  }
}

Invalidation確認用のクラス。クラスタリング用のサーバと、合わせて使います。
src/main/scala/Invalidator.scala

import org.infinispan.manager.DefaultCacheManager

object Invalidator extends SimpleClassNameLogSupport {
  def main(args: Array[String]): Unit = {
    System.setProperty("nodeId", "invalidator")

    val manager = new DefaultCacheManager("infinispan.xml")
    val cache = manager.getCache[String, String]("listenersCache")

    manager.addListener(new CacheManagerLevelListener)
    cache.addListener(new CacheLevelListener)

    val entry = ("key1", "value1-invalidate")
    log(s"エントリ $entry を登録します")
    cache.put(entry._1, entry._2)

    val keysValues = (10 to 15) map (i => (s"key$i", s"value$i"))
    log(s"エントリを ${keysValues.size} 個登録します")
    keysValues.foreach { case (k, v) => cache.put(k, v) }

    cache.stop()
    manager.stop()
  }
}

mainメソッドを持ったクラスというかオブジェクトは、すべてListenerを登録した同じCacheを使用します。

それでは、結果確認はまた次回に。