CLOVER🍀

That was when it all began.

Infinispanでトランザクション管理

Infinispanで、JTAを使ったトランザクション管理ができるということで。

Infinispan transactions
https://docs.jboss.org/author/display/ISPN/Infinispan+transactions
Locking and Concurrency
https://docs.jboss.org/author/display/ISPN/Locking+and+Concurrency

InfinispanでJTAのTransactionManagerを使用するには、Cache#getAdvancedCacheで取得できるAdvancedCacheから、TransactionManagerを取得すればよいらしいです。

TransactionManager tm = cache.getAdvancedCache().getTransactionManager();

AdvancedCacheって…。

TransactionManagerの設定は、設定ファイルで書く場合はdefaultタグやnamedCacheタグの配下に、transactionタグを書くことで行います。例えば、こんな感じ。

    <transaction
        transactionManagerLookupClass="org.infinispan.transaction.lookup.GenericTransactionManagerLookup"
        transactionMode="TRANSACTIONAL"
        lockingMode="OPTIMISTIC"
        autoCommit="true" />

transactionManagerLookupClass属性には、org.infinispan.transaction.lookup.TransactionManagerLookupインターフェースを実装したクラスを指定します。

Infinispanが提供しているのは、以下の4つです。

クラス名 説明
DummyTransactionManagerLookup テスト用のTransactionManagerLookup。本番環境の利用には推奨しません
JBossStandaloneJTAManagerLookup 主にInfinispanをスタンドアロンで動かす場合に使用します*JBoss Transactionsが必要と思われます
GenericTransactionManagerLookup 一般的なJava EEアプリケーションサーバに組み込まれているTransactionManagerを使用します。TransactionManagerが見つからなかった場合は、DummyTransactionManagerLookupを使用します
JBossTransactionManagerLookup JBoss Application ServerでInfinispanを使用する場合は、こちら

transactionMode属性は、「TRANSACTIONAL」「NON_TRANSACTIONAL」のいずれかを指定して、トランザクションが使えるようにするかどうかを指定します。デフォルトは「NON_TRANSACTIONAL」です。

lockingMode属性は、ロック戦略を指定します。「OPTIMISTIC」(楽観的)と「PESSIMISTIC」(非観的)が指定でき、デフォルトは「OPTIMISTIC」です。
非観的ロックを使用すると、トランザクション中に指定したキーに対してロックが取られます。楽観的ロックは、同一のキーに対して多くの更新が発生しない場合に使用するとよいそうです。

autoCommit属性は、トランザクションを使わなかった場合に自動コミットするかどうかで、デフォルトはtrueです。これをfalseにした場合で、かつtransactionModeを「TRANSACTIONAL」にしていた場合は、トランザクション外でCache#putを呼び出すと例外が発生します。

とまあ、こんな感じ。

では、使ってみます。

準備

今回は、スタンドアロンでいきます。分散キャッシュは使用しません。

build.sbt

name := "infinispan-transactions-example"

version := "0.0.1"

scalaVersion := "2.10.1"

organization := "littlewings"

fork in run := true

resolvers += "JBoss Public Maven Repository Group" at "http://repository.jboss.org/nexus/content/groups/public-jboss/"

libraryDependencies += "org.infinispan" % "infinispan-core" % "5.2.1.Final"

src/main/resources/infinispan.xml

<?xml version="1.0" encoding="UTF-8"?>
<infinispan
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="urn:infinispan:config:5.2 http://www.infinispan.org/schemas/infinispan-config-5.2.xsd"
      xmlns="urn:infinispan:config:5.2">
  <global>
    <globalJmxStatistics
        enabled="true"
        jmxDomain="org.infinispan"
        cacheManagerName="DefaultCacheManager"
        />
  </global>

  <default>
    <transaction
        transactionManagerLookupClass="org.infinispan.transaction.lookup.GenericTransactionManagerLookup"
        transactionMode="TRANSACTIONAL"
        lockingMode="OPTIMISTIC"
        autoCommit="true" />
  </default>
</infinispan>

で、こんなコードを用意。

src/main/scala/InfinispanTransactions.scala 
import java.util.Date

import org.infinispan.Cache
import org.infinispan.manager.DefaultCacheManager

object InfinispanTransactions {
  def main(args: Array[String]): Unit = {
    val manager = new DefaultCacheManager("infinispan.xml")
    val cache = manager.getCache[String, String]()

    val range = 1 to 3
    val keys = range map (i => s"key$i")

    val tm = cache.getAdvancedCache.getTransactionManager
    tm.begin()

    range foreach (i => cache.put(keys(i - 1), s"value$i"))
    var values = keys map (k => cache.get(k)) mkString(", ")
    println(s"results => $values")

    tm.commit()

    values = keys map (k => cache.get(k)) mkString(", ")
    println(s"results => $values")
  }
}

動かしてみます。

> run
[error] 4 01, 2013 12:05:44 午前 org.infinispan.factories.GlobalComponentRegistry start
[error] INFO: ISPN000128: Infinispan version: Infinispan 'Delirium' 5.2.1.Final
[error] 4 01, 2013 12:05:45 午前 org.infinispan.transaction.lookup.GenericTransactionManagerLookup useDummyTM
[error] WARN: ISPN000104: Falling back to DummyTransactionManager from Infinispan
[error] 4 01, 2013 12:05:45 午前 org.infinispan.jmx.CacheJmxRegistration start
[error] INFO: ISPN000031: MBeans were successfully registered to the platform MBean server.
[info] results => value1, value2, value3
[info] results => value1, value2, value3
[success] Total time: 2 s, completed 2013/04/01 0:05:45

まあ、普通に動いてますね。

ちなみに、よーく見ると

[error] 4 01, 2013 12:05:45 午前 org.infinispan.transaction.lookup.GenericTransactionManagerLookup useDummyTM
[error] WARN: ISPN000104: Falling back to DummyTransactionManager from Infinispan

と、現在使われているのがDummyTransactionManagerということがわかります。

では、以下の部分を

    tm.commit()

ロールバックするように

    tm.rollback()

変更。

すると

> run
  〜省略〜
[info] results => value1, value2, value3
[info] results => null, null, null
[success] Total time: 3 s, completed 2013/04/01 0:06:14

ちゃんとロールバックしていますね。

トランザクション分離レベル

Infinispanは、トランザクション分離レベルとしては「READ_COMMITTED」と「REPEATABLE_READ」をサポートしています。

デフォルトは、「READ_COMMITTED」です。

こちらは、lockingタグで設定できるので、せっかくなのでこちらも試してみます。

READ COMMITTED

今回はinfinispan.xmlのdefaultタグの配下に、lockingタグを加えます。

  <default>
    <transaction
        transactionManagerLookupClass="org.infinispan.transaction.lookup.GenericTransactionManagerLookup"
        transactionMode="TRANSACTIONAL"
        lockingMode="OPTIMISTIC"
        autoCommit="true" />
    <locking isolationLevel="READ_COMMITTED" />
  </default>

なお、デフォルトが「READ_COMMITTED」なので、書かなくても挙動は変わりません。

そして、先ほどのScalaコードを変更。
src/main/scala/InfinispanTransactions.scala

import java.util.Date

import org.infinispan.Cache
import org.infinispan.manager.DefaultCacheManager

object InfinispanTransactions {
  def main(args: Array[String]): Unit = {
    val manager = new DefaultCacheManager("infinispan.xml")
    val cache = manager.getCache[String, String]()

    val range = 1 to 3
    val keys = range map (i => s"key$i")

    range foreach (i => cache.put(keys(i - 1), s"value$i"))

    val (th1, th2) = (new Thread1(cache, keys), new Thread2(cache, keys))
    th1.start()
    th2.start()

    th1.join()
    th2.join()

    val values = keys map (k => cache.get(k)) mkString(", ")
    println(s"results => $values")
  }
}

trait ThreadSupport {
  self: Thread =>

  protected def w(mills: Long): Unit = {
    log(s"Waiting $mills msec...")
    Thread.sleep(mills)
  }

  protected def log(msg: String): Unit =
    println(s"[${new Date}] <${self.getName}> $msg")
}

class Thread1(cache: Cache[String, String], knownKeys: Seq[String]) extends Thread("thread-1")
                                                                    with ThreadSupport {
  override def run(): Unit = {
    log("start")

    val tm = cache.getAdvancedCache.getTransactionManager

    log("begin transaction")
    tm.begin()

    w(2000)

    val initialValues = knownKeys map (k => cache.get(k))
    log(s"read initial values ${initialValues.mkString("[", ", ", "]")}")

    w(3000)

    val nextValues = knownKeys map (k => cache.get(k))
    log(s"read next values ${nextValues.mkString("[", ", ", "]")}")

    log("commit transaction")
    tm.commit()

    log("end")
  }
}

class Thread2(cache: Cache[String, String], knownKeys: Seq[String]) extends Thread("thread-2")
                                                                    with ThreadSupport {
  override def run(): Unit = {
    log("start")

    val tm = cache.getAdvancedCache.getTransactionManager

    log("begin transaction")
    tm.begin()

    w(3000)

    log(s"Key[${knownKeys(1)}] update")
    cache.put(knownKeys(1), "Updated value2")

    log("commit transaction")
    tm.commit()

    log("end")
  }
}

最初に、Cacheにキーと値を3つ入れておきます。

    range foreach (i => cache.put(keys(i - 1), s"value$i"))

そして、スレッドを2つ用意して、片方のスレッドはトランザクションを開始して2秒待ち、それから最初に入れた値を読み出します。その後、3秒待機してもう1度同じキーを使用して値を読み出します。

    val tm = cache.getAdvancedCache.getTransactionManager

    log("begin transaction")
    tm.begin()

    w(2000)

    val initialValues = knownKeys map (k => cache.get(k))
    log(s"read initial values ${initialValues.mkString("[", ", ", "]")}")

    w(3000)

    val nextValues = knownKeys map (k => cache.get(k))
    log(s"read next values ${nextValues.mkString("[", ", ", "]")}")

    log("commit transaction")
    tm.commit()

もう片方のスレッドは、開始して3秒待機して、2つ目のキーに対応した値を更新してコミットします。

    val tm = cache.getAdvancedCache.getTransactionManager

    log("begin transaction")
    tm.begin()

    w(3000)

    log(s"Key[${knownKeys(1)}] update")
    cache.put(knownKeys(1), "Updated value2")

    log("commit transaction")
    tm.commit()

最後に、メインスレッドでもう1度キーに対応する値を読み出してみます。

    val values = keys map (k => cache.get(k)) mkString(", ")
    println(s"results => $values")

「READ COMMITTED」なので、このプログラムではスレッド2が変更した値が、スレッド1の2回目の読み出しでは見えるようになっているはずです。

では、実行。

> run
  〜省略〜
[info] [Mon Apr 01 00:23:51 JST 2013] <thread-1> start
[info] [Mon Apr 01 00:23:51 JST 2013] <thread-1> begin transaction
[info] [Mon Apr 01 00:23:51 JST 2013] <thread-1> Waiting 2000 msec...
[info] [Mon Apr 01 00:23:51 JST 2013] <thread-2> start
[info] [Mon Apr 01 00:23:51 JST 2013] <thread-2> begin transaction
[info] [Mon Apr 01 00:23:51 JST 2013] <thread-2> Waiting 3000 msec...
[info] [Mon Apr 01 00:23:53 JST 2013] <thread-1> read initial values [value1, value2, value3]
[info] [Mon Apr 01 00:23:53 JST 2013] <thread-1> Waiting 3000 msec...
[info] [Mon Apr 01 00:23:54 JST 2013] <thread-2> Key[key2] update
[info] [Mon Apr 01 00:23:54 JST 2013] <thread-2> commit transaction
[info] [Mon Apr 01 00:23:54 JST 2013] <thread-2> end
[info] [Mon Apr 01 00:23:56 JST 2013] <thread-1> read next values [value1, Updated value2, value3]
[info] [Mon Apr 01 00:23:56 JST 2013] <thread-1> commit transaction
[info] [Mon Apr 01 00:23:56 JST 2013] <thread-1> end
[info] results => value1, Updated value2, value3
[success] Total time: 7 s, completed 2013/04/01 0:23:56

1回目の読み出しは

[info] [Mon Apr 01 00:23:53 JST 2013] <thread-1> read initial values [value1, value2, value3]

ですが、2回目は

[info] [Mon Apr 01 00:23:56 JST 2013] <thread-1> read next values [value1, Updated value2, value3]

となり、別のトランザクションが更新した結果を読んでいます。

REPEATABLE READ

では、今度は「REPEATABLE_READ」です。プログラムは一切変更せず、設定ファイルだけ以下のように変更します。

  <default>
    <transaction
        transactionManagerLookupClass="org.infinispan.transaction.lookup.GenericTransactionManagerLookup"
        transactionMode="TRANSACTIONAL"
        lockingMode="OPTIMISTIC"
        autoCommit="true" />
    <locking isolationLevel="REPEATABLE_READ" />
  </default>

変更したのは、lockingタグのisolationLevelのみです。

実行。

> run
  〜省略〜
[info] [Mon Apr 01 00:26:48 JST 2013] <thread-2> start
[info] [Mon Apr 01 00:26:48 JST 2013] <thread-2> begin transaction
[info] [Mon Apr 01 00:26:48 JST 2013] <thread-2> Waiting 3000 msec...
[info] [Mon Apr 01 00:26:48 JST 2013] <thread-1> start
[info] [Mon Apr 01 00:26:48 JST 2013] <thread-1> begin transaction
[info] [Mon Apr 01 00:26:48 JST 2013] <thread-1> Waiting 2000 msec...
[info] [Mon Apr 01 00:26:50 JST 2013] <thread-1> read initial values [value1, value2, value3]
[info] [Mon Apr 01 00:26:50 JST 2013] <thread-1> Waiting 3000 msec...
[info] [Mon Apr 01 00:26:51 JST 2013] <thread-2> Key[key2] update
[info] [Mon Apr 01 00:26:51 JST 2013] <thread-2> commit transaction
[info] [Mon Apr 01 00:26:51 JST 2013] <thread-2> end
[info] [Mon Apr 01 00:26:53 JST 2013] <thread-1> read next values [value1, value2, value3]
[info] [Mon Apr 01 00:26:53 JST 2013] <thread-1> commit transaction
[info] [Mon Apr 01 00:26:53 JST 2013] <thread-1> end
[info] results => value1, Updated value2, value3
[success] Total time: 7 s, completed 2013/04/01 0:26:53

今度は、1回目も

[info] [Mon Apr 01 00:26:50 JST 2013] <thread-1> read initial values [value1, value2, value3]

2回目も

[info] [Mon Apr 01 00:26:53 JST 2013] <thread-1> read next values [value1, value2, value3]

と同じ結果になり、別のトランザクションのコミット内容が見えなくなりました。

なお、当然のことながら「REPEATABLE READ」ではファントムリードは発生します。

こんなところで。