Infinispanを使った、トランザクション周りの話題に関係ありそうだなーと思っていた、InfinispanのBatching APIを触ってみました。
Batching
https://docs.jboss.org/author/display/ISPN/Batching
どういう時に使うのかというと…Infinispanでのトランザクション管理を行う時に、それがInfinispanクラスタ内で完結するのであれば、Batching APIを使えということらしいです。
JTAは、複数のシステム使ったトランザクション管理を行う場合に使うものだ、と。例えば、2つトランザクションに参加するリソースを使う場合、一方がInfinispanで、もう一方がデータベースの時、とかいう場合みたいですね。
とりあえず、使ってみましょう。
まずは設定ファイルの例から。
src/main/resources/infinispan.xml
<?xml version="1.0" encoding="UTF-8"?> <infinispan xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:infinispan:config:5.2 http://www.infinispan.org/schemas/infinispan-config-5.2.xsd" xmlns="urn:infinispan:config:5.2"> <global> <globalJmxStatistics enabled="true" jmxDomain="org.infinispan" cacheManagerName="DefaultCacheManager" /> </global> <namedCache name="batchingCache"> <invocationBatching enabled="true" /> <transaction transactionManagerLookupClass="org.infinispan.transaction.lookup.GenericTransactionManagerLookup" transactionMode="TRANSACTIONAL" /> </namedCache> </infinispan>
ドキュメントでは、
<invocationBatching enabled="true" />
しか書いていませんが、実際には裏でトランザクション管理を行うので
<transaction transactionManagerLookupClass="org.infinispan.transaction.lookup.GenericTransactionManagerLookup" transactionMode="TRANSACTIONAL" />
少なくともトランザクションができる設定にしておかないと、起動に失敗します。裏でTransactionManagerを使っているのが、ここでハッキリわかりますね。
(追記)
デモのサンプルとなっている設定ファイルでは、トランザクションの設定がなかったので、これはどういうことかなぁと思っていたら、恐ろしいことにdefaultのCacheはBatching APIを利用できるようにすると、自動的にトランザクションが有効になるようです。
7 03, 2013 11:14:15 午後 org.infinispan.factories.TransactionManagerFactory construct INFO: ISPN000161: Using a batchMode transaction manager
また、defaultの方にBatching APIを有効にするように入れると、namedCacheでも有効になるみたいです…。
(追記、ここまで)
Batching APIの使い方は、いたって簡単でDefaltCacheManagerから取得したCacheに対して、startBatchで始めて
cache.startBatch();
この後更新処理とかをいろいろやって、終わったらendBatchを呼び出します。
cache.endBatch(true); // または cache.endBatch(false);
endBatchの引数はbooleanで、trueを指定すると処理が確定します。ドキュメントでは、ここからキャッシュのレプリケーションなども始まるようです。falseを指定すると、startBatchからendBatchまでの間の処理がロールバックされます。
では、例を。
src/main/scala/UpdateBatching.scala
import scala.concurrent.SyncVar import org.infinispan.Cache import org.infinispan.manager.DefaultCacheManager object UpdateBatching { def main(args: Array[String]): Unit = { val manager = new DefaultCacheManager("infinispan.xml") val cache = manager.getCache[String, String]("batchingCache") try { val shared = new SyncVar[String] val writeThread = new UpdateThread(cache, shared) val readThread = new ReadThread(cache, shared) writeThread.start() readThread.start() writeThread.join() readThread.join() } finally { cache.stop() manager.stop() } } } trait ThreadSupport { self: Thread => def w(millsec: Long): Unit = { p(s"Waiting $millsec msec...") Thread.sleep(millsec) } def p(msg: String): Unit = println(s"${getName} => $msg") } class UpdateThread(cache: Cache[String, String], shared: SyncVar[String]) extends Thread("update-thread") with ThreadSupport { override def run(): Unit = { cache.put("not-batching-key", "not-batching-value") cache.put("shared-update-key", "shared-update-value") cache.put("shared-remove-key", "shared-remove-value") p("----- non batching writed -----") w(3000L) shared.take() cache.startBatch() cache.put("batching-key1", "batching-value1") cache.put("batching-key2", "batching-value2") cache.put("batching-key3", "batching-value3") cache.remove("shared-remove-key") cache.put("shared-update-key", "shared-update-value-updated") p("----- batch writed, not committed -----") shared.put("writed") w(3000L) shared.take() val end = true cache.endBatch(end) p(s"----- batch update end!![$end] -----") shared.put("commited") } } class ReadThread(cache: Cache[String, String], shared: SyncVar[String]) extends Thread("read-thread") with ThreadSupport { override def run(): Unit = { w(3000L) p("----- initial read -----") p(s"not-batching-key => ${cache.get("not-batching-key")}") p(s"shared-update-key => ${cache.get("shared-update-key")}") p(s"shared-remove-key => ${cache.get("shared-remove-key")}") shared.put("----- readed -----") w(3000L) shared.take() p("in batching read") p(s"not-batching-key => ${cache.get("not-batching-key")}") p(s"batching-key1 => ${cache.get("batching-key1")}") p(s"batching-key2 => ${cache.get("batching-key2")}") p(s"batching-key3 => ${cache.get("batching-key3")}") p(s"shared-update-key => ${cache.get("shared-update-key")}") p(s"shared-remove-key => ${cache.get("shared-remove-key")}") shared.put("readed") w(3000L) shared.take() p("----- end batching read -----") p(s"not-batching-key => ${cache.get("not-batching-key")}") p(s"batching-key1 => ${cache.get("batching-key1")}") p(s"batching-key2 => ${cache.get("batching-key2")}") p(s"batching-key3 => ${cache.get("batching-key3")}") p(s"shared-update-key => ${cache.get("shared-update-key")}") p(s"shared-remove-key => ${cache.get("shared-remove-key")}") } }
ホントは、2つプロセスを起動してもっと簡単に書きたかったのですが、fork in trueにしたsbtにハマったので、マルチスレッドでスリープとSyncVarを使ってムリヤリ順番制御しときました…。
このプログラムは、UpdateThreadの
val end = true cache.endBatch(end)
の箇所をtrueにするかfalseにするかで動きが変わります。
処理の順番的には
- UpdateThreadが普通にCacheに値を書き込む(ReadThreadは待機)
- ReadThreadが、UpdateThreadが書いた値を読む(UpdateThreadは待機)
- UpdateThreadがBatch処理を始め、値を書き込んだ後一時停止。処理の確定はしない
- ReadThreadがUpdateThreadの書いた値を読む(UpdateThreadは待機)
- UpdateThreadが処理を確定する
- ReadThreadが結果を読む
ここで、ReadThreadが読む4と5がどうなるだろう?という話ですね。
では、動かしてみます。まずはendBatch(true)のケース。
> run [info] read-thread => Waiting 3000 msec... [info] update-thread => ----- non batching writed ----- [info] update-thread => Waiting 3000 msec... [info] read-thread => ----- initial read ----- [info] read-thread => not-batching-key => not-batching-value [info] read-thread => shared-update-key => shared-update-value [info] read-thread => shared-remove-key => shared-remove-value [info] read-thread => Waiting 3000 msec... [info] update-thread => ----- batch writed, not committed ----- [info] update-thread => Waiting 3000 msec... [info] read-thread => in batching read [info] read-thread => not-batching-key => not-batching-value [info] read-thread => batching-key1 => null [info] read-thread => batching-key2 => null [info] read-thread => batching-key3 => null [info] read-thread => shared-update-key => shared-update-value [info] read-thread => shared-remove-key => shared-remove-value [info] read-thread => Waiting 3000 msec... [info] update-thread => ----- batch update end!![true] ----- [info] read-thread => ----- end batching read ----- [info] read-thread => not-batching-key => not-batching-value [info] read-thread => batching-key1 => batching-value1 [info] read-thread => batching-key2 => batching-value2 [info] read-thread => batching-key3 => batching-value3 [info] read-thread => shared-update-key => shared-update-value-updated [info] read-thread => shared-remove-key => null [success] Total time: 11 s, completed 2013/04/24 23:53:51
Batching APIを使う前の値はReadThreadからも見えていますが、startBatchをして確定する前の値は、ReadThreadから見えていません。endBatch(true)を呼び出した後、見えるようになっています。
続いて、endBatch(false)にした場合。
> run [info] read-thread => Waiting 3000 msec... [info] update-thread => ----- non batching writed ----- [info] update-thread => Waiting 3000 msec... [info] read-thread => ----- initial read ----- [info] read-thread => not-batching-key => not-batching-value [info] read-thread => shared-update-key => shared-update-value [info] read-thread => shared-remove-key => shared-remove-value [info] read-thread => Waiting 3000 msec... [info] update-thread => ----- batch writed, not committed ----- [info] update-thread => Waiting 3000 msec... [info] read-thread => in batching read [info] read-thread => not-batching-key => not-batching-value [info] read-thread => batching-key1 => null [info] read-thread => batching-key2 => null [info] read-thread => batching-key3 => null [info] read-thread => shared-update-key => shared-update-value [info] read-thread => shared-remove-key => shared-remove-value [info] read-thread => Waiting 3000 msec... [info] update-thread => ----- batch update end!![false] ----- [info] read-thread => ----- end batching read ----- [info] read-thread => not-batching-key => not-batching-value [info] read-thread => batching-key1 => null [info] read-thread => batching-key2 => null [info] read-thread => batching-key3 => null [info] read-thread => shared-update-key => shared-update-value [info] read-thread => shared-remove-key => shared-remove-value [success] Total time: 13 s, completed 2013/04/24 23:55:36
更新途中の値が見えないところは先ほどと同じですが、endBatch(false)を呼び出した後は、UpdateThreadの更新処理がなかったことになっています。
トランザクションに参加するリソースがInfinispanだけの場合は、Batching APIだけでいいのかな…?
あと、同ドキュメント中にBatching APIとJTAについて、もう少し書かれています。
https://docs.jboss.org/author/display/ISPN/Batching
裏でTransactionManagerを使っているので、JTAと合わせて使えるよという話みたいです。
以下の設定を追加すると、
<transaction syncRollbackPhase="false" syncCommitPhase="false" useEagerLocking="true" eagerLockSingleNode="true" />
ドキュメント的には、こういうことになるのかな?
- トランザクションがコミットまたはロールバックされるまで、ロックを取得する(useEagerLocking/eagerLockSingleNode)
- トランザクションがコミットプロセスの一部として、変更内容がクラスタに伝播する。もしトランザクション中で複数の変更が発生していれば、レプリケーションのやり取りを少なくできる
- 同期レプリケーションまたはInvalidationを使っていた場合は、レプリケーション/Invalidationが失敗した時にはトランザクションがロールバックする
- JTA DataSourceなどのJTAリソースであるCacheLoaderを使っていた場合は、トランザクションに参加することができるようになる
ただまあ、TransactionConfigurationを見る限りは、
http://docs.jboss.org/infinispan/5.2/apidocs/org/infinispan/configuration/cache/TransactionConfiguration.html
useEagerLockingとeagerLockSingleNodeはDeprecatedになっていますが…。
eagerLockSingleNodeは、trueにするとnumOwnersの設定に関係なく、単一のノードをロックするようです。falseにすると、全ノードをロックしにいくと…。
*つまり、Distributionモード限定の設定です
useEagerLockingは、eagerLockSingleNodeと同じく単一のノードをロックする設定らしいです。Infinispan 5.1からは悲観的ロックに読み替えられるようです。が、falseにした場合は、やっぱり全ノードをロックするらしいです。
*単に悲観的ロックを使った場合は、単一のノードをロックしているようです
その他、syncCommitPhase/syncRollbackPhaseの意味も書いてあります。コミットまたはロールバックを命令を送った全ノードの応答を、待つかどうかの設定です。trueにすれば同期モードで、falseにすると非同期モードになります。非同期モードは、パフォーマンスが向上するけど、失敗した時にはちゃんとトラップしてロールバック指示を出すよ、と書いていますが…。
ああ、設定が深い…