CLOVER🍀

That was when it all began.

InifinispanのBatching API

Infinispanを使った、トランザクション周りの話題に関係ありそうだなーと思っていた、InfinispanのBatching APIを触ってみました。

Batching
https://docs.jboss.org/author/display/ISPN/Batching

どういう時に使うのかというと…Infinispanでのトランザクション管理を行う時に、それがInfinispanクラスタ内で完結するのであれば、Batching APIを使えということらしいです。

JTAは、複数のシステム使ったトランザクション管理を行う場合に使うものだ、と。例えば、2つトランザクションに参加するリソースを使う場合、一方がInfinispanで、もう一方がデータベースの時、とかいう場合みたいですね。

とりあえず、使ってみましょう。

まずは設定ファイルの例から。
src/main/resources/infinispan.xml

<?xml version="1.0" encoding="UTF-8"?>
<infinispan
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="urn:infinispan:config:5.2 http://www.infinispan.org/schemas/infinispan-config-5.2.xsd"
      xmlns="urn:infinispan:config:5.2">
  <global>
    <globalJmxStatistics
        enabled="true"
        jmxDomain="org.infinispan"
        cacheManagerName="DefaultCacheManager"
        />
  </global>

  <namedCache name="batchingCache">
    <invocationBatching enabled="true" />
    <transaction
        transactionManagerLookupClass="org.infinispan.transaction.lookup.GenericTransactionManagerLookup"
        transactionMode="TRANSACTIONAL" />
  </namedCache>
</infinispan>

ドキュメントでは、

    <invocationBatching enabled="true" />

しか書いていませんが、実際には裏でトランザクション管理を行うので

    <transaction
        transactionManagerLookupClass="org.infinispan.transaction.lookup.GenericTransactionManagerLookup"
        transactionMode="TRANSACTIONAL" />

少なくともトランザクションができる設定にしておかないと、起動に失敗します。裏でTransactionManagerを使っているのが、ここでハッキリわかりますね。

追記
デモのサンプルとなっている設定ファイルでは、トランザクションの設定がなかったので、これはどういうことかなぁと思っていたら、恐ろしいことにdefaultのCacheはBatching APIを利用できるようにすると、自動的にトランザクションが有効になるようです。

7 03, 2013 11:14:15 午後 org.infinispan.factories.TransactionManagerFactory construct
INFO: ISPN000161: Using a batchMode transaction manager

また、defaultの方にBatching APIを有効にするように入れると、namedCacheでも有効になるみたいです…。
追記、ここまで)

Batching APIの使い方は、いたって簡単でDefaltCacheManagerから取得したCacheに対して、startBatchで始めて

cache.startBatch();

この後更新処理とかをいろいろやって、終わったらendBatchを呼び出します。

cache.endBatch(true); // または cache.endBatch(false);

endBatchの引数はbooleanで、trueを指定すると処理が確定します。ドキュメントでは、ここからキャッシュのレプリケーションなども始まるようです。falseを指定すると、startBatchからendBatchまでの間の処理がロールバックされます。

では、例を。
src/main/scala/UpdateBatching.scala

import scala.concurrent.SyncVar

import org.infinispan.Cache
import org.infinispan.manager.DefaultCacheManager

object UpdateBatching {
  def main(args: Array[String]): Unit = {
    val manager = new DefaultCacheManager("infinispan.xml")
    val cache = manager.getCache[String, String]("batchingCache")

    try {
      val shared = new SyncVar[String]

      val writeThread = new UpdateThread(cache, shared)
      val readThread = new ReadThread(cache, shared)
      writeThread.start()
      readThread.start()

      writeThread.join()
      readThread.join()
    } finally {
      cache.stop()
      manager.stop()
    }
  }
}

trait ThreadSupport {
  self: Thread =>

  def w(millsec: Long): Unit = {
    p(s"Waiting $millsec msec...")
    Thread.sleep(millsec)
  }

  def p(msg: String): Unit =
    println(s"${getName} => $msg")  
}

class UpdateThread(cache: Cache[String, String], shared: SyncVar[String]) extends Thread("update-thread")
                                                                          with ThreadSupport {
  override def run(): Unit = {
    cache.put("not-batching-key", "not-batching-value")

    cache.put("shared-update-key", "shared-update-value")
    cache.put("shared-remove-key", "shared-remove-value")

    p("----- non batching writed -----")

    w(3000L)
    shared.take()
     
    cache.startBatch()

    cache.put("batching-key1", "batching-value1")
    cache.put("batching-key2", "batching-value2")
    cache.put("batching-key3", "batching-value3")

    cache.remove("shared-remove-key")
    cache.put("shared-update-key", "shared-update-value-updated")

    p("----- batch writed, not committed -----")

    shared.put("writed")
    w(3000L)
    shared.take()

    val end = true
    cache.endBatch(end)

    p(s"----- batch update end!![$end] -----")

    shared.put("commited")
  }
}

class ReadThread(cache: Cache[String, String], shared: SyncVar[String]) extends Thread("read-thread")
                                                                        with ThreadSupport {
  override def run(): Unit = {
    w(3000L)

    p("----- initial read -----")

    p(s"not-batching-key => ${cache.get("not-batching-key")}")

    p(s"shared-update-key => ${cache.get("shared-update-key")}")
    p(s"shared-remove-key => ${cache.get("shared-remove-key")}")

    shared.put("----- readed -----")

    w(3000L)

    shared.take()

    p("in batching read")

    p(s"not-batching-key => ${cache.get("not-batching-key")}")

    p(s"batching-key1 => ${cache.get("batching-key1")}")
    p(s"batching-key2 => ${cache.get("batching-key2")}")
    p(s"batching-key3 => ${cache.get("batching-key3")}")

    p(s"shared-update-key => ${cache.get("shared-update-key")}")
    p(s"shared-remove-key => ${cache.get("shared-remove-key")}")

    shared.put("readed")

    w(3000L)

    shared.take()

    p("----- end batching read -----")

    p(s"not-batching-key => ${cache.get("not-batching-key")}")

    p(s"batching-key1 => ${cache.get("batching-key1")}")
    p(s"batching-key2 => ${cache.get("batching-key2")}")
    p(s"batching-key3 => ${cache.get("batching-key3")}")

    p(s"shared-update-key => ${cache.get("shared-update-key")}")
    p(s"shared-remove-key => ${cache.get("shared-remove-key")}")
  }
}

ホントは、2つプロセスを起動してもっと簡単に書きたかったのですが、fork in trueにしたsbtにハマったので、マルチスレッドでスリープとSyncVarを使ってムリヤリ順番制御しときました…。

このプログラムは、UpdateThreadの

    val end = true
    cache.endBatch(end)

の箇所をtrueにするかfalseにするかで動きが変わります。

処理の順番的には

  1. UpdateThreadが普通にCacheに値を書き込む(ReadThreadは待機)
  2. ReadThreadが、UpdateThreadが書いた値を読む(UpdateThreadは待機)
  3. UpdateThreadがBatch処理を始め、値を書き込んだ後一時停止。処理の確定はしない
  4. ReadThreadがUpdateThreadの書いた値を読む(UpdateThreadは待機)
  5. UpdateThreadが処理を確定する
  6. ReadThreadが結果を読む

ここで、ReadThreadが読む4と5がどうなるだろう?という話ですね。

では、動かしてみます。まずはendBatch(true)のケース。

> run
[info] read-thread => Waiting 3000 msec...
[info] update-thread => ----- non batching writed -----
[info] update-thread => Waiting 3000 msec...
[info] read-thread => ----- initial read -----
[info] read-thread => not-batching-key => not-batching-value
[info] read-thread => shared-update-key => shared-update-value
[info] read-thread => shared-remove-key => shared-remove-value
[info] read-thread => Waiting 3000 msec...
[info] update-thread => ----- batch writed, not committed -----
[info] update-thread => Waiting 3000 msec...
[info] read-thread => in batching read
[info] read-thread => not-batching-key => not-batching-value
[info] read-thread => batching-key1 => null
[info] read-thread => batching-key2 => null
[info] read-thread => batching-key3 => null
[info] read-thread => shared-update-key => shared-update-value
[info] read-thread => shared-remove-key => shared-remove-value
[info] read-thread => Waiting 3000 msec...
[info] update-thread => ----- batch update end!![true] -----
[info] read-thread => ----- end batching read -----
[info] read-thread => not-batching-key => not-batching-value
[info] read-thread => batching-key1 => batching-value1
[info] read-thread => batching-key2 => batching-value2
[info] read-thread => batching-key3 => batching-value3
[info] read-thread => shared-update-key => shared-update-value-updated
[info] read-thread => shared-remove-key => null
[success] Total time: 11 s, completed 2013/04/24 23:53:51

Batching APIを使う前の値はReadThreadからも見えていますが、startBatchをして確定する前の値は、ReadThreadから見えていません。endBatch(true)を呼び出した後、見えるようになっています。

続いて、endBatch(false)にした場合。

> run
[info] read-thread => Waiting 3000 msec...
[info] update-thread => ----- non batching writed -----
[info] update-thread => Waiting 3000 msec...
[info] read-thread => ----- initial read -----
[info] read-thread => not-batching-key => not-batching-value
[info] read-thread => shared-update-key => shared-update-value
[info] read-thread => shared-remove-key => shared-remove-value
[info] read-thread => Waiting 3000 msec...
[info] update-thread => ----- batch writed, not committed -----
[info] update-thread => Waiting 3000 msec...
[info] read-thread => in batching read
[info] read-thread => not-batching-key => not-batching-value
[info] read-thread => batching-key1 => null
[info] read-thread => batching-key2 => null
[info] read-thread => batching-key3 => null
[info] read-thread => shared-update-key => shared-update-value
[info] read-thread => shared-remove-key => shared-remove-value
[info] read-thread => Waiting 3000 msec...
[info] update-thread => ----- batch update end!![false] -----
[info] read-thread => ----- end batching read -----
[info] read-thread => not-batching-key => not-batching-value
[info] read-thread => batching-key1 => null
[info] read-thread => batching-key2 => null
[info] read-thread => batching-key3 => null
[info] read-thread => shared-update-key => shared-update-value
[info] read-thread => shared-remove-key => shared-remove-value
[success] Total time: 13 s, completed 2013/04/24 23:55:36

更新途中の値が見えないところは先ほどと同じですが、endBatch(false)を呼び出した後は、UpdateThreadの更新処理がなかったことになっています。

トランザクションに参加するリソースがInfinispanだけの場合は、Batching APIだけでいいのかな…?

あと、同ドキュメント中にBatching APIJTAについて、もう少し書かれています。
https://docs.jboss.org/author/display/ISPN/Batching

裏でTransactionManagerを使っているので、JTAと合わせて使えるよという話みたいです。

以下の設定を追加すると、

<transaction
    syncRollbackPhase="false" syncCommitPhase="false"
    useEagerLocking="true" eagerLockSingleNode="true" />

ドキュメント的には、こういうことになるのかな?

ただまあ、TransactionConfigurationを見る限りは、
http://docs.jboss.org/infinispan/5.2/apidocs/org/infinispan/configuration/cache/TransactionConfiguration.html
useEagerLockingとeagerLockSingleNodeはDeprecatedになっていますが…。

eagerLockSingleNodeは、trueにするとnumOwnersの設定に関係なく、単一のノードをロックするようです。falseにすると、全ノードをロックしにいくと…。
*つまり、Distributionモード限定の設定です

useEagerLockingは、eagerLockSingleNodeと同じく単一のノードをロックする設定らしいです。Infinispan 5.1からは悲観的ロックに読み替えられるようです。が、falseにした場合は、やっぱり全ノードをロックするらしいです。
*単に悲観的ロックを使った場合は、単一のノードをロックしているようです

その他、syncCommitPhase/syncRollbackPhaseの意味も書いてあります。コミットまたはロールバックを命令を送った全ノードの応答を、待つかどうかの設定です。trueにすれば同期モードで、falseにすると非同期モードになります。非同期モードは、パフォーマンスが向上するけど、失敗した時にはちゃんとトラップしてロールバック指示を出すよ、と書いていますが…。

ああ、設定が深い…