CLOVER🍀

That was when it all began.

Infinispanの楽観的ロック/非観的ロックを確認する

前に、Infinispanのトランザクション管理とデータバージョニングのコードを書いていて、「楽観的ロックって、もしかして明示的にwrite-skewとバージョニングを有効にしないと効かないんじゃないかなぁ?」という疑問を持ちました。

Infinispan transactions
https://docs.jboss.org/author/display/ISPN/Infinispan+transactions
Data Versioning
https://docs.jboss.org/author/display/ISPN/Data+Versioning

なので、せっかくなのでこの機会にInfinispanのトランザクションにおける楽観的ロックと悲観的ロックを試してみることにしました。

今回使用しているInfinispanは、5.2.1.Finalです。最新版は5.3.0.Alpha1ですが、まだパスの方向で…。

まず、母体となるコードはこんな感じで用意しました。
src/main/scala/TransactionsLocks.scala

import java.util.Date
import javax.transaction.Status

import org.infinispan.Cache
import org.infinispan.configuration.cache.{ConfigurationBuilder, VersioningScheme}
import org.infinispan.manager.DefaultCacheManager
import org.infinispan.transaction.{LockingMode, TransactionMode}
import org.infinispan.transaction.lookup.GenericTransactionManagerLookup
import org.infinispan.util.concurrent.IsolationLevel

object TransactionsLocks {
  def main(args: Array[String]): Unit = {
    val manager = new DefaultCacheManager

    val cacheName = "transactionalCache"
    manager.defineConfiguration(cacheName,
                                new ConfigurationBuilder()
                                  //.clustering
                                  //.cacheMode(CacheMode.DIST_SYNC)
                                  .transaction
                                  .transactionMode(TransactionMode.TRANSACTIONAL)
                                  .transactionManagerLookup(new GenericTransactionManagerLookup)
                                /*** ロックの設定 ***/
                                  .build)

    val cache = manager.getCache[String, String](cacheName)

    cache.put("key", "value")

    try {
      val firstThread = new FirstWriteThread(cache)
      val delayThread = new DelayWriteThread(cache)
      firstThread.start()
      delayThread.start()

      firstThread.join()
      delayThread.join()

      println(s"Last Value => ${cache.get("key")}")
    } finally {
      cache.stop()
      manager.stop()
    }
  }
}

trait ThreadSupport {
  self: Thread =>

  protected def w(mills: Long): Unit = {
    log(s"Waiting $mills msec...")
    Thread.sleep(mills)
  }

  protected def log(msg: String): Unit =
    println(s"[${new Date}] <${self.getName}> $msg")
}


class FirstWriteThread(cache: Cache[String, String]) extends Thread("first-write-thread")
                                                     with ThreadSupport {
  override def run(): Unit = {
    log("start transaction")

    val tm = cache.getAdvancedCache.getTransactionManager
    try {
      tm.begin()

      cache.put("key", "value-by-first-write-thread")
      log("updated!!")

      log(cache.get("key"))

      w(1200L)

      log("try commit")
      tm.commit()
      log("committed")
    } catch {
      case e: Exception =>
        tm.getStatus match {
          case Status.STATUS_ACTIVE => tm.rollback()
          case Status.STATUS_MARKED_ROLLBACK => tm.rollback()
          case _ =>
        }
        log(s"Exception Cached [${e.toString()}]")
    }

    log("end transaction")
  }
}

class DelayWriteThread(cache: Cache[String, String]) extends Thread("delay-write-thread")
                                                     with ThreadSupport {
  override def run(): Unit = {
    log("start transaction")

    val tm = cache.getAdvancedCache.getTransactionManager
    try {
      tm.begin()

      w(1000L)

      cache.put("key", "value-by-delay-write-thread")
      log("updated!!")

      log(cache.get("key"))

      w(500L)

      log("try commit")
      tm.commit()
      log("committed")

      println(cache.getAdvancedCache.getCacheEntry("key", null, null).getVersion)
    } catch {
      case e: Exception =>
        tm.getStatus match {
          case Status.STATUS_ACTIVE => tm.rollback()
          case Status.STATUS_MARKED_ROLLBACK => tm.rollback()
          case _ =>
        }
        log(s"Exception Cached [${e.toString()}]")
    }

    log("end transaction")
  }
}

2本スレッドを用意していまして、それぞれFirstWriteThreadとDelayWriteThreadとしています。

で、最初にKey/Valueを登録しておいて

    val cache = manager.getCache[String, String](cacheName)

    cache.put("key", "value")

それぞれのスレッドにトランザクションを開始して更新してもらいます。この時、処理が

  1. FirstWriteThread、DelayWriteThreadがトランザクション開始
  2. FirstWriteThreadがkeyに対する値を更新する
  3. DelayWriteThreadがkeyに対する値を更新する
  4. FirstWriteThreadがコミットする
  5. DelayWriteThreadがコミットする

という順番で行われるようにします。

ここで、最後のDelayWriteThreadがコミットする時に、どうなるでしょう?という話ですね。

最後に、メインスレッドが両スレッドを待ち合わせた後に、もう1度キャッシュからkeyに対する値を取り出して、結果を確認するようにしています。

      println(s"Last Value => ${cache.get("key")}")

確認の際には、キャッシュの設定で

    manager.defineConfiguration(cacheName,
                                new ConfigurationBuilder()
                                  //.clustering
                                  //.cacheMode(CacheMode.DIST_SYNC)
                                  .transaction
                                  .transactionMode(TransactionMode.TRANSACTIONAL)
                                  .transactionManagerLookup(new GenericTransactionManagerLookup)
                                /*** ロックの設定 ***/
                                  .build)

                                /*** ロックの設定 ***/

の部分を変えていくようにします。

では、いってみましょう。

楽観的ロック

最初は、とりあえず単に楽観的ロックで設定します。

.lockingMode(LockingMode.OPTIMISTIC)
                                  .autoCommit(true)
                                  .locking
                                  .isolationLevel(IsolationLevel.READ_COMMITTED)

では、実行。

[info] [Sun Apr 14 20:22:43 JST 2013] <first-write-thread> start transaction
[info] [Sun Apr 14 20:22:43 JST 2013] <first-write-thread> updated!!
[info] [Sun Apr 14 20:22:43 JST 2013] <first-write-thread> value-by-first-write-thread
[info] [Sun Apr 14 20:22:43 JST 2013] <delay-write-thread> start transaction
[info] [Sun Apr 14 20:22:43 JST 2013] <delay-write-thread> Waiting 1000 msec...
[info] [Sun Apr 14 20:22:43 JST 2013] <first-write-thread> Waiting 1200 msec...
[info] [Sun Apr 14 20:22:44 JST 2013] <delay-write-thread> updated!!
[info] [Sun Apr 14 20:22:44 JST 2013] <delay-write-thread> value-by-delay-write-thread
[info] [Sun Apr 14 20:22:44 JST 2013] <delay-write-thread> Waiting 500 msec...
[info] [Sun Apr 14 20:22:44 JST 2013] <first-write-thread> try commit
[info] [Sun Apr 14 20:22:44 JST 2013] <first-write-thread> committed
[info] [Sun Apr 14 20:22:44 JST 2013] <first-write-thread> end transaction
[info] [Sun Apr 14 20:22:44 JST 2013] <delay-write-thread> try commit
[info] [Sun Apr 14 20:22:44 JST 2013] <delay-write-thread> committed
[info] null
[info] [Sun Apr 14 20:22:44 JST 2013] <delay-write-thread> end transaction
[info] Last Value => value-by-delay-write-thread

何事もなく実行されましたね。そして、

[info] Last Value => value-by-delay-write-thread

完全に後勝ちになっています。

では、今度はwrite-skewとバージョニングを使うようにしてみます。

                                  .lockingMode(LockingMode.OPTIMISTIC)
                                  .autoCommit(true)
                                  .locking
                                  
                                  .isolationLevel(IsolationLevel.REPEATABLE_READ)
                                  .writeSkewCheck(true)
                                  .versioning
                                  .enabled(true)
                                  .scheme(VersioningScheme.SIMPLE)

実行。

[info] [Sun Apr 14 20:29:28 JST 2013] <delay-write-thread> start transaction
[info] [Sun Apr 14 20:29:28 JST 2013] <delay-write-thread> Waiting 1000 msec...
[info] [Sun Apr 14 20:29:28 JST 2013] <first-write-thread> start transaction
[info] [Sun Apr 14 20:29:28 JST 2013] <first-write-thread> updated!!
[info] [Sun Apr 14 20:29:28 JST 2013] <first-write-thread> value-by-first-write-thread
[info] [Sun Apr 14 20:29:28 JST 2013] <first-write-thread> Waiting 1200 msec...
[info] [Sun Apr 14 20:29:29 JST 2013] <delay-write-thread> updated!!
[info] [Sun Apr 14 20:29:29 JST 2013] <delay-write-thread> value-by-delay-write-thread
[info] [Sun Apr 14 20:29:29 JST 2013] <delay-write-thread> Waiting 500 msec...
[info] [Sun Apr 14 20:29:29 JST 2013] <first-write-thread> try commit
[info] [Sun Apr 14 20:29:29 JST 2013] <first-write-thread> committed
[info] [Sun Apr 14 20:29:29 JST 2013] <first-write-thread> end transaction
[info] [Sun Apr 14 20:29:29 JST 2013] <delay-write-thread> try commit
[error] 4 14, 2013 8:29:29 午後 org.infinispan.container.entries.RepeatableReadEntry performLocalWriteSkewCheck
[error] WARN: ISPN000005: Detected write skew on key [key]. Another process has changed the entry since we last read it! Unable to copy entry for update.
[error] 4 14, 2013 8:29:29 午後 org.infinispan.transaction.TransactionCoordinator prepare
[error] ERROR: Error while processing prepare
[error] org.infinispan.transaction.WriteSkewException: Detected write skew.
[error] 	at org.infinispan.container.entries.RepeatableReadEntry.performLocalWriteSkewCheck(RepeatableReadEntry.java:68)
  〜省略〜
[error] 	at org.infinispan.interceptors.TxInterceptor.visitPrepareCommand(TxInterceptor.java:115)
[info] [Sun Apr 14 20:29:29 JST 2013] <delay-write-thread> Exception Cached [javax.transaction.RollbackException: Exception rolled back, status is: 9]
[info] [Sun Apr 14 20:29:29 JST 2013] <delay-write-thread> end transaction
[info] Last Value => value-by-first-write-thread
[error] 	at org.infinispan.commands.tx.PrepareCommand.acceptVisitor(PrepareCommand.java:124)
  〜省略〜
[error] 	at DelayWriteThread.run(TransactionsLocks.scala:142)
[error] 
[error] 4 14, 2013 8:29:29 午後 org.infinispan.transaction.tm.DummyTransaction notifyBeforeCompletion
[error] ERROR: ISPN000109: beforeCompletion() failed for SynchronizationAdapter{localTransaction=LocalTransaction{remoteLockedNodes=null, isMarkedForRollback=false, lockedKeys=null, backupKeyLocks=null, topologyId=-1, isFromStateTransfer=false} org.infinispan.transaction.synchronization.SyncLocalTransaction@3} org.infinispan.transaction.synchronization.SynchronizationAdapter@22
[error] org.infinispan.CacheException: Could not prepare. 
[error] 	at org.infinispan.transaction.synchronization.SynchronizationAdapter.beforeCompletion(SynchronizationAdapter.java:70)
  〜省略〜
[error] 	at org.infinispan.transaction.tm.DummyBaseTransactionManager.commit(DummyBaseTransactionManager.java:102)
[error] 	at DelayWriteThread.run(TransactionsLocks.scala:142)
[error] Caused by: javax.transaction.xa.XAException
[error] 	at org.infinispan.transaction.TransactionCoordinator.prepare(TransactionCoordinator.java:161)
  〜省略〜
[error] 	at org.infinispan.transaction.synchronization.SynchronizationAdapter.beforeCompletion(SynchronizationAdapter.java:68)
[error] 	... 5 more
[error] 

…思いっきりコケました。

ログをよくよく見ると

[info] [Sun Apr 14 20:29:29 JST 2013] <delay-write-thread> try commit
[error] 4 14, 2013 8:29:29 午後 org.infinispan.container.entries.RepeatableReadEntry performLocalWriteSkewCheck
[error] WARN: ISPN000005: Detected write skew on key [key]. Another process has changed the entry since we last read it! Unable to copy entry for update.
[error] 4 14, 2013 8:29:29 午後 org.infinispan.transaction.TransactionCoordinator prepare
[error] ERROR: Error while processing prepare
[error] org.infinispan.transaction.WriteSkewException: Detected write skew.

というように、別のプロセスが更新したので、write-skewで引っかかったと言われていて、

[info] [Sun Apr 14 20:29:29 JST 2013] <delay-write-thread> Exception Cached [javax.transaction.RollbackException: Exception rolled back, status is: 9]

呼び出し元には、すでにロールバックされたことが報告されています。

よって、メインスレッドからは

[info] Last Value => value-by-first-write-thread

最初のスレッドが更新した値を見ることができます。

その他、

                                  .writeSkewCheck(true)
                                  .versioning
                                  .enabled(true)
                                  .scheme(VersioningScheme.SIMPLE)

からwriteSkewCheckを外してみましたが、最初の例のようにチェックは行われなくなりました。writeSkewCheckを有効にしたままバージョニングを外すと、今度は設定エラーとなります。

ということで、実質この設定で楽観的ロックを行うってことでしょうね。

非観的ロック

今度は、非観的ロックにいってみます。設定を以下のように変更します。

                                  .lockingMode(LockingMode.PESSIMISTIC)
                                  .autoCommit(true)
                                  .locking
                                  .isolationLevel(IsolationLevel.READ_COMMITTED)
                                  .lockAcquisitionTimeout(0L)

わかりやすいように、ロックが取れなかったらさっさと諦めてもらうようにしました。

実行。

[info] [Sun Apr 14 20:47:51 JST 2013] <first-write-thread> start transaction
[info] [Sun Apr 14 20:47:51 JST 2013] <delay-write-thread> start transaction
[info] [Sun Apr 14 20:47:51 JST 2013] <delay-write-thread> Waiting 1000 msec...
[info] [Sun Apr 14 20:47:51 JST 2013] <first-write-thread> updated!!
[info] [Sun Apr 14 20:47:51 JST 2013] <first-write-thread> value-by-first-write-thread
[info] [Sun Apr 14 20:47:51 JST 2013] <first-write-thread> Waiting 1200 msec...
[error] 4 14, 2013 8:47:52 午後 org.infinispan.interceptors.InvocationContextInterceptor handleAll
[error] ERROR: ISPN000136: Execution error
[error] org.infinispan.util.concurrent.TimeoutException: Unable to acquire lock after [0 milliseconds] on key [key] for requestor [GlobalTransaction:<null>:3:local]! Lock held by [GlobalTransaction:<null>:2:local]
[error] 	at org.infinispan.util.concurrent.locks.LockManagerImpl.lock(LockManagerImpl.java:213)
  〜省略〜
[error] 	at org.infinispan.commands.write.PutKeyValueCommand.acceptVisitor(PutKeyValueCommand.java:77)
[info] [Sun Apr 14 20:47:52 JST 2013] <delay-write-thread> Exception Cached [org.infinispan.util.concurrent.TimeoutException: Unable to acquire lock after [0 milliseconds] on key [key] for requestor [GlobalTransaction:<null>:3:local]! Lock held by [GlobalTransaction:<null>:2:local]]
[info] [Sun Apr 14 20:47:52 JST 2013] <delay-write-thread> end transaction
[error] 	at org.infinispan.interceptors.base.CommandInterceptor.invokeNextInterceptor(CommandInterceptor.java:118)
  〜省略〜
[error] 	at DelayWriteThread.run(TransactionsLocks.scala:134)
[error] 
[info] [Sun Apr 14 20:47:52 JST 2013] <first-write-thread> try commit
[info] [Sun Apr 14 20:47:52 JST 2013] <first-write-thread> committed
[info] [Sun Apr 14 20:47:52 JST 2013] <first-write-thread> end transaction
[info] Last Value => value-by-first-write-thread

なんかログが入り乱れていますが、DelayWriteThreadがputする時にロックが取れなくて例外が投げられています。

よって、最初にロックを取ったFirstWriteThreadの結果が有効です。

[info] Last Value => value-by-first-write-thread

トランザクション分離レベルは「READ COMMITTED」としていますが、「REPEATABLE READ」に変えても結果は同じです。そりゃあ、そうですよね…。

クラスタリングにすると

今回はローカルモードで行いましたが、これをクラスタリングモードにするとちょっと結果が変わりました。

最初に

    val cache = manager.getCache[String, String](cacheName)

    cache.put("key", "value")

と値を設定していますが、これがあるかないかで楽観的ロックの方は動きが変わります。

具体的には、ローカルモードだと楽観的ロックが効きますが、クラスタリングモードにすると後勝ちになってしまいました。後勝ちになった時のバージョン番号は「1」だったので、クラスタリング時の「更新オペレーション」は阻止できても、「登録オペレーション」は阻止できないということかな。

今回書いたコードも、一応載せておきます。最初のDefaultCacheManagerとCacheの設定を変えると、楽観的ロックも悲観的ロックも、ローカルモードにもクラスタリングモードにも切り替えられます。
src/main/scala/TransactionsLocks.scala

import java.util.Date
import javax.transaction.Status

import org.infinispan.Cache
import org.infinispan.configuration.cache.{ConfigurationBuilder, VersioningScheme}
import org.infinispan.manager.DefaultCacheManager
import org.infinispan.transaction.{LockingMode, TransactionMode}
import org.infinispan.transaction.lookup.GenericTransactionManagerLookup
import org.infinispan.util.concurrent.IsolationLevel

import org.infinispan.configuration.cache.CacheMode
import org.infinispan.configuration.global.GlobalConfigurationBuilder

object TransactionsLocks {
  def main(args: Array[String]): Unit = {
    val globalConfiguration = new GlobalConfigurationBuilder()
      .transport
      .defaultTransport
      .clusterName("transaction-cluster")
      .addProperty("configurationFile", "jgroups.xml")
      .build

    //val manager = new DefaultCacheManager(globalConfiguration)
    val manager = new DefaultCacheManager

    val cacheName = "transactionalCache"
    manager.defineConfiguration(cacheName,
                                new ConfigurationBuilder()
                                  //.clustering
                                  //.cacheMode(CacheMode.DIST_SYNC)
                                  .transaction
                                  .transactionMode(TransactionMode.TRANSACTIONAL)
                                  .transactionManagerLookup(new GenericTransactionManagerLookup)
                                /** Optimisitic Lock **/
                                  .lockingMode(LockingMode.OPTIMISTIC)
                                  .autoCommit(true)
                                  .locking
                                  //.isolationLevel(IsolationLevel.READ_COMMITTED)
                                  .isolationLevel(IsolationLevel.REPEATABLE_READ)
                                  .writeSkewCheck(true)
                                  .versioning
                                  .enabled(true)
                                  .scheme(VersioningScheme.SIMPLE)

                                /** Pessimitic Lock **/
                                /*
                                  .lockingMode(LockingMode.PESSIMISTIC)
                                  .autoCommit(true)
                                  .locking
                                //.isolationLevel(IsolationLevel.READ_COMMITTED)
                                  .isolationLevel(IsolationLevel.REPEATABLE_READ)
                                  .lockAcquisitionTimeout(0L)
                                  */
                                  .build)

    val cache = manager.getCache[String, String](cacheName)

    cache.put("key", "value")

    try {
      val firstThread = new FirstWriteThread(cache)
      val delayThread = new DelayWriteThread(cache)
      firstThread.start()
      delayThread.start()

      firstThread.join()
      delayThread.join()

      println(s"Last Value => ${cache.get("key")}")
    } finally {
      cache.stop()
      manager.stop()
    }
  }
}

trait ThreadSupport {
  self: Thread =>

  protected def w(mills: Long): Unit = {
    log(s"Waiting $mills msec...")
    Thread.sleep(mills)
  }

  protected def log(msg: String): Unit =
    println(s"[${new Date}] <${self.getName}> $msg")
}


class FirstWriteThread(cache: Cache[String, String]) extends Thread("first-write-thread")
                                                     with ThreadSupport {
  override def run(): Unit = {
    log("start transaction")

    val tm = cache.getAdvancedCache.getTransactionManager
    try {
      tm.begin()

      cache.put("key", "value-by-first-write-thread")
      log("updated!!")

      log(cache.get("key"))

      w(1200L)

      log("try commit")
      tm.commit()
      log("committed")
    } catch {
      case e: Exception =>
        tm.getStatus match {
          case Status.STATUS_ACTIVE => tm.rollback()
          case Status.STATUS_MARKED_ROLLBACK => tm.rollback()
          case _ =>
        }

        log(s"Exception Cached [${e.toString()}]")
    }

    log("end transaction")
  }
}

class DelayWriteThread(cache: Cache[String, String]) extends Thread("delay-write-thread")
                                                     with ThreadSupport {
  override def run(): Unit = {
    log("start transaction")

    val tm = cache.getAdvancedCache.getTransactionManager
    try {
      tm.begin()

      w(1000L)

      cache.put("key", "value-by-delay-write-thread")
      log("updated!!")

      log(cache.get("key"))

      w(500L)

      log("try commit")
      tm.commit()
      log("committed")

      println(cache.getAdvancedCache.getCacheEntry("key", null, null).getVersion)
    } catch {
      case e: Exception =>
        tm.getStatus match {
          case Status.STATUS_ACTIVE => tm.rollback()
          case Status.STATUS_MARKED_ROLLBACK => tm.rollback()
          case _ =>
        }
        log(s"Exception Cached [${e.toString()}]")
    }

    log("end transaction")
  }
}

そんな感じで。