CLOVER🍀

That was when it all began.

InfinispanのListener機能を使ってみる - 4

Infinispanを使ったListenerネタ、これで最後です。

最後は、Listenerのコールバックの同期/非同期の切り替えです。

Listenerは@Listenerアノテーションを普通に付与すると、同期というか、イベントを発生させたスレッドと同じスレッドで動きます。

@Listener
class CacheLevelListener extends SimpleClassNameLogSupport
                         with PrePostLogSupport {

よって、長い間ブロックしてしまうと呼び出し元の処理の進行を遅らせてしまうことになります。

が、アノテーションでの設定で、Listenerの呼び出しを非同期にすることができます。設定自体は簡単で、

@Listener(sync = false)

とするだけです。

これだけで、Listenerへの通知が別のスレッドプールで実行されるようになり、ブロックしても呼び出し元のスレッドに影響を与えなくなります。

まずは、同期モードの状態を確認してみましょう。

ListenerにMix-inしているトレイトにひとつ追加します。

// Cacheレベル
@Listener
class CacheLevelListener extends ThreadNameLogSupport
                         with SimpleClassNameLogSupport
                         with PrePostLogSupport {

// CacheManagerレベル
@Listener
class CacheManagerLevelListener extends ThreadNameLogSupport
                                with SimpleClassNameLogSupport {

あと、mainメソッドを持つオブジェクトにMix-inしているトレイトをひとつ追加します。

object InfinispanListeners extends ThreadNameLogSupport
                           with SimpleClassNameLogSupport {

もちろん、自作のトレイトですが、これでスレッド名もログに出るようになります。

ソースコードは、こちら。
http://d.hatena.ne.jp/Kazuhira/20130626

では、実行。

[info] [main] [CacheManagerLevelListener]# キャッシュ停止イベント => listenersCache
[info] [main] [CacheManagerLevelListener]# キャッシュ開始イベント => listenersCache
[info] [main] [InfinispanListeners$]# エントリ (key1,value1) を参照します
[info] [main] [InfinispanListeners$]# エントリ (key1,value1) を登録します
[info] [main] [CacheLevelListener]# [Post] トランザクション登録 => GlobalTransaction:<ubuntu-50365>:5:local, isOriginLocal? => true
[info] [main] [CacheLevelListener]# [Pre ] 作成イベント => key1:null
[info] [main] [CacheLevelListener]# [Pre ] 変更イベント => key1:null, isCreated? => true
[info] [main] [CacheLevelListener]# [Post] エビクトイベント => {key106=value106}
[info] [main] [CacheLevelListener]# [Post] 変更イベント => key1:value1, isCreated? => true
[info] [main] [CacheLevelListener]# [Post] 作成イベント => key1:value1

全部、mainスレッドで動作していることがわかります。

ではここで、Listenerを非同期にしてみます。

// Cacheレベル
@Listener(sync = false)
class CacheLevelListener extends ThreadNameLogSupport
                         with SimpleClassNameLogSupport
                         with PrePostLogSupport {

// CacheManagerレベル
@Listener(sync = false)
class CacheManagerLevelListener extends ThreadNameLogSupport
                                with SimpleClassNameLogSupport {

実行。

[info] [notification-thread-0] [CacheManagerLevelListener]# キャッシュ停止イベント => listenersCache
[info] [notification-thread-0] [CacheManagerLevelListener]# キャッシュ開始イベント => listenersCache
[info] [main] [InfinispanListeners$]# エントリ (key1,value1) を参照します
[info] [main] [InfinispanListeners$]# エントリ (key1,value1) を登録します
[info] [notification-thread-0] [CacheLevelListener]# [Post] トランザクション登録 => GlobalTransaction:<ubuntu-21916>:5:local, isOriginLocal? => true
[info] [notification-thread-0] [CacheLevelListener]# [Pre ] 作成イベント => key1:null
[info] [notification-thread-0] [CacheLevelListener]# [Pre ] 変更イベント => key1:null, isCreated? => true
[info] [notification-thread-0] [CacheLevelListener]# [Post] エビクトイベント => {key106=value106}
[info] [notification-thread-0] [CacheLevelListener]# [Post] 変更イベント => key1:value1, isCreated? => true
[info] [notification-thread-0] [CacheLevelListener]# [Post] 作成イベント => key1:value1
[info] [notification-thread-0] [CacheLevelListener]# [Post] トランザクション完了 => GlobalTransaction:<ubuntu-21916>:5:local, isTransactionSuccessful? => true, isOriginLocal? => true

ちょっと動きが変わって、スレッド名が「main」じゃなくなりましたね。代わりに「notification-thread-0」になっています。

が、シングルスレッドです…。

これを調整するには、Infinispanの設定ファイルのglobal要素に、asyncListenerExecutorを加えます。

  <global>
    <!-- 省略 -->

    <asyncListenerExecutor factory="org.infinispan.executors.DefaultExecutorFactory">
      <properties>
        <property name="maxThreads" value="5"/>
        <property name="threadNamePrefix" value="AsyncListenerThread"/>
      </properties>
    </asyncListenerExecutor>
  </global>

スレッドを生成するファクトリクラス、そしてスレッドの最大数とスレッド名の接頭辞が指定できます。

デフォルト値はmaxThreadsが1、threadNamePrefixが「notification-thread」です。

参考)
https://docs.jboss.org/author/display/ISPN/Default+Values+For+Property+Based+Attributes

今回は、maxThreadsを5に、threadNamePrefixを「AsyncListenerThread」にしました。

それでは、実行。

[info] [AsyncListenerThread-0] [CacheManagerLevelListener]# キャッシュ停止イベント => listenersCache
[info] [main] [InfinispanListeners$]# エントリ (key1,value1) を参照します
[info] [main] [InfinispanListeners$]# エントリ (key1,value1) を登録します
[info] [AsyncListenerThread-1] [CacheManagerLevelListener]# キャッシュ開始イベント => listenersCache
[info] [AsyncListenerThread-2] [CacheLevelListener]# [Post] トランザクション登録 => GlobalTransaction:<ubuntu-6804>:5:local, isOriginLocal? => true
[info] [AsyncListenerThread-3] [CacheLevelListener]# [Pre ] 作成イベント => key1:null
[info] [AsyncListenerThread-4] [CacheLevelListener]# [Pre ] 変更イベント => key1:null, isCreated? => true
[info] [AsyncListenerThread-0] [CacheLevelListener]# [Post] エビクトイベント => {key106=value106}
[info] [AsyncListenerThread-1] [CacheLevelListener]# [Post] 変更イベント => key1:value1, isCreated? => true
[info] [AsyncListenerThread-2] [CacheLevelListener]# [Post] 作成イベント => key1:value1
[info] [AsyncListenerThread-3] [CacheLevelListener]# [Post] トランザクション完了 => GlobalTransaction:<ubuntu-6804>:5:local, isTransactionSuccessful? => true, isOriginLocal? => true

スレッド名と数が変わりましたね。イベント別だけではなく、同じイベントのPre/Postでもスレッドが異なるようになりました。

ちょっと長くなりましたけど、Listenerの確認は以上です。