CLOVER🍀

That was when it all began.

InfinispanのAtomicMapを使ってみる

前々から、これ何だろう?と思っていた、InfinispanのAtomicMapを試してみます。特にドキュメントには記載はありませんが、org.infinispan.atomicパッケージという存在と、たまにテストコードが更新されているので、気になってみました。

AtomicMapとは?

とりあえず、Javadocを見ましょうって感じではありますが…。

AtomicMap
https://docs.jboss.org/infinispan/6.0/apidocs/org/infinispan/atomic/AtomicMap.html

コレクションを越えたアトミックなロックと、分離性を実現するMapみたいです。トランザクションに対応しており、分離レベルに応じた読み取りが可能だと。また、更新時の差分をDeltaとして取得することができます。

更新を反映する時は、明示的なコミットが必要です。

この仕組みを使うことで、特にリモートのNodeにデータをレプリケートする時に、更新分を細かな粒度で反映するのではなくDeltaを送信することでシリアライズと転送のコストを抑えることができるそうな。

…普通にトランザクションを使うのと、どう違うんでしょう?

AtomicMapは、これを実装したクラスであるAtomicHashMapを自分でnewする方法と、InfinispanのCacheからルックアップする方法があります。普通にInfinispanと合わせる時は後者を使うのだと思いますが、今回は両方やってみます。

準備

依存関係の定義は、普通にInfinispanを使う時と同様ですが、最終的にトランザクションを必要とするので、JBoss JTAを入れておきました。
build.sbt

name := "infinispan-atomic-hashmap"

version := "0.0.1-SNAPSHOT"

scalaVersion := "2.10.3"

organization := "org.littlewings"

scalacOptions ++= Seq("-Xlint", "-deprecation", "-unchecked")

libraryDependencies ++= Seq(
  "org.infinispan" % "infinispan-core" % "6.0.1.Final"  excludeAll(
    ExclusionRule(organization = "org.jgroups", name = "jgroups"),
    ExclusionRule(organization = "org.jboss.marshalling", name = "jboss-marshalling-river"),
    ExclusionRule(organization = "org.jboss.marshalling", name = "jboss-marshalling"),
    ExclusionRule(organization = "org.jboss.logging", name = "jboss-logging"),
    ExclusionRule(organization = "org.jboss.spec.javax.transaction", name = "jboss-transaction-api_1.1_spec")
  ),
  "org.jgroups" % "jgroups" % "3.4.1.Final",
  "org.jboss.spec.javax.transaction" % "jboss-transaction-api_1.1_spec" % "1.0.1.Final",
  "org.jboss.marshalling" % "jboss-marshalling-river" % "1.3.18.GA",
  "org.jboss.marshalling" % "jboss-marshalling" % "1.3.18.GA",
  "org.jboss.logging" % "jboss-logging" % "3.1.2.GA",
  "net.jcip" % "jcip-annotations" % "1.0",
  "org.jboss.jbossts.jta" % "narayana-jta" % "4.17.13.Final",
  "org.scalatest" %% "scalatest" % "2.0" % "test"
)

AtomicHashMapを使う

では、とりあえず普通にAtomicHashMapをnewして使ってみます。以下の雛形コードに、AtomicHashMapを使ったサンプルを埋めていきます。
src/test/scala/org/littlewings/infinispan/atomic/SimpleAtomicHashMapSpec.scala

package org.littlewings.infinispan.atomic

import org.infinispan.atomic.{AtomicMap, AtomicHashMap, AtomicHashMapDelta, Delta, NullDelta}

import org.scalatest.FunSpec
import org.scalatest.Matchers._

class SimpleAtomicHashMapSpec extends FunSpec {
  describe("simple atomic hashmap spec") {
    // ここに、AtomicHashMapを使ったテストを書く!!
  }
}

単純にput/getする例。普通のMapですね。

    it("simple use AtomicMap") {
      val map: AtomicMap[String, String] = new AtomicHashMap
      map.put("key1", "value1")
      map.get("key1") should be ("value1")
      map should have size 1
    }

    it("simple use AtomicHashMap") {
      val map = new AtomicHashMap[String, String]
      map.put("key1", "value1")
      map.put("key2", "value2")
      map.get("key1") should be ("value1")
      map.get("key2") should be ("value2")
      map should have size 2
    }

Deltaを使ってみる例。コミットするまでは、Deltaより更新予定のキーを確認することができます。

    it("delta") {
      val map = new AtomicHashMap[String, String]
      map.put("key1", "value1")
      map.put("key2", "value2")

      // commitするまでは、AtomicHashMapDeltaになる
      val delta1: Delta = map.delta
      delta1 should be (a [AtomicHashMapDelta])

      val atomicHashMapDelta = delta1.asInstanceOf[AtomicHashMapDelta]
      atomicHashMapDelta.getKeys should contain only ("key1", "key2")
      atomicHashMapDelta.getChangeLogSize should be (2)

      // コミット
      map.commit()

      // commitすると、DeltaはNullDeltaになる
      val delta2: Delta = map.delta
      delta2 should be (a [NullDelta])
    }

コミットした後は、再度Deltaを取得してもNullDeltaになります。

ちなみに、AtomicHashMap単体で使う時は、ロールバックするメソッドはありません。

AtomicHashMapはDeltaAwareというインターフェースを実装しており、Deltaに対してマージすることもできます。

    it("merge") {
      val map1 = new AtomicHashMap[String, String]
      val map2 = new AtomicHashMap[String, String]

      map1.put("key1", "value1")
      map1.put("key2", "value2")

      map2.put("key1", "value1-1")
      map2.put("key2", "value2-1")
      map2.put("key3", "value3")

      val delta = map1.delta
      val deltaAware = delta.merge(map2)

      // mergeの引数と戻り値は、同じ参照
      deltaAware should be theSameInstanceAs map2

      // コミットする
      map1.commit()
      deltaAware.commit()

      // map1は、通常通り
      map1.keySet should contain only ("key1", "key2")
      map1.values should contain only ("value1", "value2")

      // map2は、key1とkey2がmap1とマージされている
      map2.keySet should contain only ("key1", "key2", "key3")
      map2.values should contain only ("value1", "value2", "value3")
    }

この場合、mergeメソッドの引数になった側に、結果が上塗りされるようです。

削除や更新も、Deltaとして扱われます。

    it("remove") {
      val map = new AtomicHashMap[String, String]
      map.put("key1", "value1")
      map.put("key2", "value2")
      map.commit()

      map.remove("key2")

      // removeの分も、Deltaになる
      val delta = map.delta
      delta should be (a [AtomicHashMapDelta])

      val atomicHashMapDelta = delta.asInstanceOf[AtomicHashMapDelta]
      atomicHashMapDelta.getChangeLogSize should be (1)

      map.commit()

      map.delta should be (a [NullDelta])
    }

    it("update") {
      val map = new AtomicHashMap[String, String]
      map.put("key1", "value1")
      map.commit()

      map.put("key1", "new-value1")

      // 更新もDeltaになる
      val delta = map.delta
      delta should be (a [AtomicHashMapDelta])

      // Deltaは、新旧データを保持している
      val atomicHashMapDelta = delta.asInstanceOf[AtomicHashMapDelta]
      atomicHashMapDelta.getChangeLogSize should be (1)
      atomicHashMapDelta.toString should be ("AtomicHashMapDelta{changeLog=[PutOperation{key=key1, oldValue=value1, newValue=new-value1}],hasClear=false}")

      map.commit()

      map.delta should be (a [NullDelta])
    }

マルチスレッド環境では、同じキーをputすると、後からputした方がブロックしてしまいます。
src/test/scala/org/littlewings/infinispan/atomic/MultiThreadSpec.scala

package org.littlewings.infinispan.atomic

import scala.util.{Failure, Success, Try}

import java.util.concurrent.CountDownLatch
import java.util.concurrent.atomic.AtomicReference

import org.infinispan.atomic.AtomicHashMap

import org.scalatest.FunSpec
import org.scalatest.Matchers._

class MultiThreadSpec extends FunSpec {
  describe("multi thread spec") {
    it("use two thread") {
      val latch1 = new CountDownLatch(1)
      val latch2 = new CountDownLatch(1)

      val map = new AtomicHashMap[String, String]

      val thread1Result = new AtomicReference[Try[Int]](Success(0))
      val thread1 = new Thread {
        override def run(): Unit =
          thread1Result.set {
            Try {
              map.put("key1", "value1")
              map.put("key2", "value2")
              latch1.countDown()

              Thread.sleep(3 * 1000L)

              map.commit()

              latch2.await()

              map.size
            }
          }
      }

      val thread2Result = new AtomicReference[Try[Int]](Success(0))
      val thread2 = new Thread {
        override def run(): Unit =
          thread2Result.set {
            Try {
              latch1.await()

              // putは、Thread1と異なるキーならブロックしない
              // 同じキーをputすると、このコードの場合はデッドロックになる

              val start = System.currentTimeMillis

              // getは、Thread1がコミットするまでブロックする
              map.get("key1") should be ("value1")
              map.get("key2") should be ("value2")

              val elapsed = System.currentTimeMillis
              elapsed should be > (3 * 1000L)

              latch2.countDown()

              map.size
            }
          }
      }

      Array(thread1, thread2)
        .map { t => t.start(); t }
        .foreach(_.join())

      thread1Result.get.get should be (2)
      thread2Result.get.get should be (2)
    }
  }
}

InfinispanのCacheからルックアップする

続いて、InfinispanのCacheとAtomicMapを組み合わせてみます。

最初に、設定ファイルを用意。
src/test/resources/infinispan.xml

<?xml version="1.0" encoding="UTF-8"?>
<infinispan
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="urn:infinispan:config:6.0 http://www.infinispan.org/schemas/infinispan-config-6.0.xsd"
    xmlns="urn:infinispan:config:6.0">

  <global>
    <globalJmxStatistics
        enabled="true"
        jmxDomain="org.infinispan"
        cacheManagerName="DefaultCacheManager"
        allowDuplicateDomains="true"
        />

    <shutdown hookBehavior="REGISTER"/>
  </global>

  <namedCache name="repeatableReadCache">
    <transaction
        transactionMode="TRANSACTIONAL"
        lockingMode="OPTIMISTIC"
        autoCommit="false" />
    <versioning enabled="true" versioningScheme="SIMPLE" />
    <locking
        isolationLevel="REPEATABLE_READ"
        writeSkewCheck="true" />
  </namedCache>

  <namedCache name="readCommittedCache">
    <transaction
        transactionMode="TRANSACTIONAL"
        lockingMode="PESSIMISTIC"
        autoCommit="false" />
    <locking
        isolationLevel="READ_COMMITTED" />
  </namedCache>

</infinispan>

トランザクションに対応したCacheを、2つ用意しています。

では、テストコードの雛形を。
src/test/scala/org/littlewings/infinispan/atomic/AtomicHashMapWithCacheSpec.scala

package org.littlewings.infinispan.atomic

import org.infinispan.Cache
import org.infinispan.atomic.{AtomicHashMap, AtomicMapLookup, AtomicHashMapProxy}
import org.infinispan.manager.DefaultCacheManager

import org.scalatest.FunSpec
import org.scalatest.Matchers._

class AtomicHashMapWithCacheSpec extends FunSpec {
  describe("with Infinispan Cache Spec") {
    // ここに、テストを書く!!
  }

  def withCache[T](cacheName: String = "")(fun: Cache[String, String] => T): T = {
    val manager = new DefaultCacheManager("infinispan.xml")

    try {
      val cache =
        if (cacheName.isEmpty) manager.getCache[String, String]
        else manager.getCache[String, String](cacheName)

      try {
        fun(cache)
      } finally {
        cache.stop()
      }
    } finally {
      manager.stop()
    }
  }
}

withCacheメソッドにCacheを引数に取る関数を渡すことで、InfinispanのCacheを利用して実装することができます。

デフォルトのCacheはトランザクションを設定していないので、AtomicMapをルックアップすることができません。

    it("no transaction cache NG") {
      withCache() { cache =>
        // トランザクションの設定がないCacheでは、AtomicMapの取得ができない
        an [IllegalStateException] should be thrownBy AtomicMapLookup.getAtomicMap[String, String, String](cache, "atomic-map")
      }
    }

なお、InfinispanのCacheから取得する場合は、AtomicMapLookup#getAtomicMapメソッドを利用します。第2引数に渡すキーは、Cache内にAtomicHashMapを作成するためのキーとして利用されます。

また、トランザクションに対応しているCacheを使う場合でも、JTAトランザクションを開始する、またはバッチモードを始めていないと、AtomicMapを取得することができません。

    it("need transaction") {
      withCache("repeatableReadCache") { cache =>
        // JTAトランザクションが開始済み、またはバッチモードが利用できないと
        // AtomicHapは取得できない
        an [IllegalArgumentException] should be thrownBy AtomicMapLookup.getAtomicMap[String, String, String](cache, "atomic-map")
      }
    }

トランザクションを始めていれば、JTAでコミット/ロールバックすることで、結果を確定したり破棄することができます。

    it("repeatable-read cache commit") {
      withCache("repeatableReadCache") { cache =>
        val tm = cache.getAdvancedCache.getTransactionManager
        tm.begin()

        val map = AtomicMapLookup.getAtomicMap[String, String, String](cache, "atomic-map")

        map.put("key1", "value1")
        map.put("key2", "value2")

        tm.commit()

        map.get("key1") should be ("value1")
        map.get("key2") should be ("value2")

        map should have size 2

        // map自身は、AtomicHashMapProxyのインスタンス
        map should be (a [AtomicHashMapProxy[_, _]])

        // Cacheは[String, String]だが、「atomic-map」キーでAtomicHashMapが入っている
        cache.get("atomic-map").getClass should be (classOf[AtomicHashMap[_, _]])
      }
    }

    it("repeatable-read cache rollback") {
      withCache("repeatableReadCache") { cache =>
        val tm = cache.getAdvancedCache.getTransactionManager
        tm.begin()

        val map = AtomicMapLookup.getAtomicMap[String, String, String](cache, "atomic-map")

        map.put("key1", "value1")
        map.put("key2", "value2")

        tm.rollback()

        // 結果として空のAtomicHashMapとなってしまった場合は、getなどもできない
        // map.get("key1")
        // cache.get("atomic-map").getClass should be (classOf[AtomicHashMap[_, _]])
      }
    }

    it("read-committed cache") {
      withCache("readCommittedCache") { cache =>
        val tm = cache.getAdvancedCache.getTransactionManager
        tm.begin()

        val map = AtomicMapLookup.getAtomicMap[String, String, String](cache, "atomic-map")
        map.put("key1", "value1")
        map.put("key2", "value2")

        tm.commit()

        tm.begin()

        map.put("key1", "value1-1")
        map.put("key3", "value3")

        tm.rollback()

        map should have size 2
        map.get("key1") should be ("value1")
        map.get("key2") should be ("value2")
        map.get("key3") should be (null)
      }
    }

実は、AtomicMapLookup#getAtomicMapの第2引数で指定したキーに紐付けられて、AtomicHashMapが登録されています。

        // Cacheは[String, String]だが、「atomic-map」キーでAtomicHashMapが入っている
        cache.get("atomic-map").getClass should be (classOf[AtomicHashMap[_, _]])

AtomicMapを削除する場合は、AtomicMapLookup#removeAtomicMapで。

    it("remove AtomicMap") {
      withCache("readCommittedCache") { cache =>
        val tm = cache.getAdvancedCache.getTransactionManager
        tm.begin()

        val map = AtomicMapLookup.getAtomicMap[String, String, String](cache, "atomic-map")
        map.put("key1", "value1")
        map.put("key2", "value2")

        tm.commit()

        cache.get("atomic-map").getClass should be (classOf[AtomicHashMap[_, _]])

        tm.begin()
        AtomicMapLookup.removeAtomicMap(cache, "atomic-map")
        tm.commit()

        cache.get("atomic-map") should be (null)
      }
    }

なお、AtomicMapLookup#getAtomicMapで取得できるAtomicMapはAtomicHashMapProxyであり、Deltaを取得するためのメソッドがありません。

InfinispanのCacheの背後にある、AtomicHashMapを取得してDeltaを確認するんでしょうか…。

ちなみに、今回CacheをString, Stringと型パラメータを適用しているので、普通にAtomicHashMapを取得しようとするとClassCastExceptionでコケてくれます。まあ、見るなってことでしょうね…。

FineGrainedAtomicMap

InfinispanのCacheを使用して取得する場合、FineGrainedAtomicMapという形でも取得することができます。

FineGrainedAtomicMap
https://docs.jboss.org/infinispan/6.0/apidocs/org/infinispan/atomic/FineGrainedAtomicMap.html

Javadocによると、AtomicMapとFineGrainedAtomicMapの違いは、このようになっています。

AtomicMap

1. Atomic locking and isolation over the entire collection

This allows the entire AtomicMap to be locked when making changes even to certain entries within the map, and also isolates the map for safe reading (see IsolationLevel while concurrent writes may be going on.

FineGrainedAtomicMap

1. Fine-grained atomic locking and isolation

FineGrainedAtomicMap allows fine grained locking of entries within the map; it also isolates the map for safe reading (see IsolationLevel while concurrent writes may be going on.

どうも、ロックの粒度の違いみたいです。が、ちょっと動かしてみた感じでは、ちょっと違いがわからなかったです。

書いたテストコードがマズかったかなぁ?

FineGrainedAtomicMap自体は、このようにして取得します。

    it("repeatable-read cache") {
      withCache("repeatableReadCache") { cache =>
        val tm = cache.getAdvancedCache.getTransactionManager
        tm.begin()

        val map = AtomicMapLookup.getFineGrainedAtomicMap[String, String, String](cache, "fine-grained-atomic-map")

        map.put("key1", "value1")
        map.put("key2", "value2")

        tm.commit()

        // Cacheに入っているのは、AtomicHashMap
        cache.get("fine-grained-atomic-map").getClass should be (classOf[AtomicHashMap[_, _]])
      }
    }

マルチスレッドでのアクセスとかも書いてみたのですが、長くなったのでこのへんで。FineGrainedAtomicMapなども含めて、書いたソースコードはこちらにアップしておきました。

https://github.com/kazuhira-r/infinispan-examples/tree/master/infinispan-atomic-hashmap