CLOVER🍀

That was when it all began.

InfinispanのAtomicObjectFactoryを使ってみる

Infinispan 7.2で、Atomic Object Factoryというモジュールが追加されました。

特にRelease Noteやドキュメントには記載がないのですが、

Infinispan 7.2 Release Notes
http://infinispan.org/release-notes/

なんとなくモジュールが増えていたのに気付いたので、ちょっと試してみました。

Atomic Object Factory module
https://github.com/infinispan/infinispan/tree/7.2.2.Final/atomic-factory

READMEや実装を見ると、どうもこういうものみたいです。

  • AtomicObjectFactoryというクラスを使用して、アトミックなオブジェクトのファクトリを提供するもの
  • Serializableなクラスであれば、対象にできる(※)
  • 生成されたオブジェクトはレプリケーションされ、並行アクセスに対しても強い一貫性を提供する

※ … 実際には、デフォルトコンストラクタも必要

と、見てみても使いどころが今ひとつピンと来ないのですが、まずは試してみようということで。

準備

ビルド定義。
build.sbt

name := "embedded-atomic-object-factory"

version := "0.0.1-SNAPSHOT"

scalaVersion := "2.11.6"

organization := "org.littlewings"

scalacOptions ++= Seq("-Xlint", "-deprecation", "-unchecked", "-feature")

updateOptions := updateOptions.value.withCachedResolution(true)

fork in Test := true

parallelExecution in Test := false

libraryDependencies ++= Seq(
  "org.infinispan" % "infinispan-atomic-factory" % "7.2.2.Final",
  "net.jcip" % "jcip-annotations" % "1.0" % "provided",
  "org.scalatest" %% "scalatest" % "2.2.5" % "test"
)

Infinispanの設定ファイルも用意。
src/test/resources/infinispan.xml

<?xml version="1.0" encoding="UTF-8"?>
<infinispan
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="urn:infinispan:config:7.2 http://www.infinispan.org/schemas/infinispan-config-7.2.xsd"
        xmlns="urn:infinispan:config:7.2">
    <jgroups>
        <stack-file name="udp" path="jgroups.xml"/>
    </jgroups>

    <cache-container name="clustered" shutdown-hook="REGISTER">
        <transport cluster="clustered" stack="udp" site="clustered"/>
        <jmx duplicate-domains="true"/>

        <distributed-cache name="atomicFactoryCache"/>
    </cache-container>
</infinispan>

JGroupsの設定は端折ります。

テストコードの雛形。
src/test/scala/org/littlewings/infinispan/atomicobjectfactory/AtomicObjectFactorySpec.scala

package org.littlewings.infinispan.atomicobjectfactory

import org.infinispan.Cache
import org.infinispan.atomic.AtomicObjectFactory
import org.infinispan.manager.DefaultCacheManager
import org.scalatest.FunSpec
import org.scalatest.Matchers._

class AtomicObjectFactorySpec extends FunSpec {
  describe("AtomicObjectFactory Spec") {
    // ここに、テストを書く!
  }

  protected def withCache[K, V](cacheName: String, numInstances: Int = 1)(f: Cache[K, V] => Unit): Unit = {
    val managers = (1 to numInstances).map(_ => new DefaultCacheManager("infinispan.xml"))

    try {
      val cache = managers.map(_.getCache[K, V](cacheName)).head

      f(cache)

      cache.stop()
    } finally {
      managers.foreach(_.stop())
    }
  }
}

ひとつのJavaVM内で、クラスタを構成するためのヘルパーメソッド付き。

試してみる

では、まず試してみます。使い方は、こんな感じみたいです。

    it("using HashSet") {
      withCache[AnyRef, AnyRef]("atomicFactoryCache", 2) { cache =>
        val factory = new AtomicObjectFactory(cache)
        val set = factory.getInstanceOf(classOf[java.util.HashSet[String]], "set")

        set.add("Java")
        set.add("Scala")
        set.add("Groovy")
        set.add("Clojure")

        factory.disposeInstanceOf(classOf[java.util.HashSet[String]], "set", true)

        withCache[AnyRef, AnyRef]("atomicFactoryCache", 1) { nextCache =>
          val nextFactory = new AtomicObjectFactory(nextCache)
          val setAsSaved =
            nextFactory.getInstanceOf(classOf[java.util.HashSet[String]], "set", false, null, false)

          setAsSaved should have size (4)
          setAsSaved should contain only("Java", "Scala", "Groovy", "Clojure")

          nextCache should have size (1)
          nextCache.containsKey("HashSet#set") should be(true)
          nextCache.get("HashSet#set").isInstanceOf[Array[Byte]] should be(true)
        }
      }
    }

InfinispanのCacheを引数にAtomicObjectFactorのインスタンスを生成し、このファクトリに対して生成したいクラスのClassクラスと対応するキーを渡します。

        val factory = new AtomicObjectFactory(cache)
        val set = factory.getInstanceOf(classOf[java.util.HashSet[String]], "set")

生成されたインスタンスに対して、登録してみます。

        set.add("Java")
        set.add("Scala")
        set.add("Groovy")
        set.add("Clojure")

操作が終わったら、この状態をCacheに保存します。

        factory.disposeInstanceOf(classOf[java.util.HashSet[String]], "set", true)

この時、AtomicObjectFactory#getInstanceOfで渡したものと同じClassクラスとキーを渡します。第3引数にtrueを渡さないと、保存してくれません(そもそもメソッド名はdisposeInstanceOfですし…)。

この後、別のNodeからインスタンスを取得してみます。

        withCache[AnyRef, AnyRef]("atomicFactoryCache", 1) { nextCache =>
          val nextFactory = new AtomicObjectFactory(nextCache)
          val setAsSaved =
            nextFactory.getInstanceOf(classOf[java.util.HashSet[String]], "set", false, null, false)

          setAsSaved should have size (4)
          setAsSaved should contain only("Java", "Scala", "Groovy", "Clojure")

          nextCache should have size (1)
          nextCache.containsKey("HashSet#set") should be(true)
          nextCache.get("HashSet#set").isInstanceOf[Array[Byte]] should be(true)
        }

パッと見、先ほどの使い方とそう変わらないのですが、AtomicObjectFactorygetInstanceOfする時の引数が増えています。

          val setAsSaved =
            nextFactory.getInstanceOf(classOf[java.util.HashSet[String]], "set", false, null, false)

ここで重要なのは、第5引数を「false」にすることです。

この5つの引数の意味なんですけど、こういう感じです。

        val set =
          factory.getInstanceOf(
            classOf[java.util.HashSet[String]],
            "set", // key
            false, // withReadOptimization
            null, // equalsMethod
            true) // forceNew

第3引数以降、読み出し処理の最適化(trueにすると、ローカルから読むようになります)、equalsメソッドの変更(現状、硬化なし?)、そして第5引数が内部でインスタンスを強制的に作成するか、です。

最初の省略時には、裏でforceNewがtrueになっていました。

        val set = factory.getInstanceOf(classOf[java.util.HashSet[String]], "set")

また、AtomicObjectFactoryの取得は、こちらのファクトリメソッドでもOKです。以後は、こちらを使います。

        val factory = AtomicObjectFactory.forCache(cache)

ちなみに、キーに直接紐付いたオブジェクトの実体は、バイト配列みたいです。

          nextCache.get("HashSet#set").isInstanceOf[Array[Byte]] should be(true)

ひっそりとキーの先頭にクラス名が入ります。

Listでも

AtomicObjectFactoryの実装を見ていると、List、Set、Mapみたいなコレクションを対象に考えている?みたいな雰囲気です。

    it("using ArrayList") {
      withCache[String, Array[Byte]]("atomicFactoryCache", 2) { cache =>
        val factory = AtomicObjectFactory.forCache(cache)
        val list = factory.getInstanceOf(classOf[java.util.ArrayList[String]], "list")

        list.add("Java")
        list.add("Scala")
        list.add("Groovy")
        list.add("Clojure")

        factory.disposeInstanceOf(classOf[java.util.ArrayList[String]], "list", true)

        withCache[String, Array[Byte]]("atomicFactoryCache", 1) { nextCache =>
          val nextFactory = AtomicObjectFactory.forCache(cache)
          val listAsSaved =
            nextFactory.getInstanceOf(classOf[java.util.ArrayList[String]], "list", false, null, false)

          listAsSaved should have size (4)
          listAsSaved should contain only("Java", "Scala", "Groovy", "Clojure")

          nextCache should have size (1)
          nextCache.containsKey("ArrayList#list") should be(true)
          nextCache.get("ArrayList#list").isInstanceOf[Array[Byte]] should be(true)
        }
      }
    }

こちらも、同様に使えます。

AtomicObjectFactory#disposeInstanceOfの呼び出しを忘れると?

AtomicObjectFactory#disposeInstanceOfの呼び出しを忘れると、他のNodeからはうまく見えなくなってしまうので、要注意です。

    it("using HashSet, no stored, missing disposeInstanceOf") {
      withCache[String, Array[Byte]]("atomicFactoryCache", 2) { cache =>
        val factory = AtomicObjectFactory.forCache(cache)
        val set = factory.getInstanceOf(classOf[java.util.HashSet[String]], "set")

        set.add("Java")
        set.add("Scala")
        set.add("Groovy")
        set.add("Clojure")

        // factory.disposeInstanceOf(classOf[java.util.HashSet[String]], "set", true)

        withCache[String, Array[Byte]]("atomicFactoryCache", 1) { nextCache =>
          val nextFactory = AtomicObjectFactory.forCache(nextCache)
          val setAsSaved =
            nextFactory.getInstanceOf(classOf[java.util.HashSet[String]], "set", false, null, false)

          setAsSaved should have size (0)
          setAsSaved should be(empty)

          nextCache should have size (1)
          nextCache.containsKey("HashSet#set") should be(true)
        }
      }
    }

また、disposeInstanceOfメソッドの第3引数をfalseにしても、同じようなことになります。

    it("using HashSet, no stored, disposeInstanceOf false") {
      withCache[String, Array[Byte]]("atomicFactoryCache", 2) { cache =>
        val factory = AtomicObjectFactory.forCache(cache)
        val set = factory.getInstanceOf(classOf[java.util.HashSet[String]], "set")

        set.add("Java")
        set.add("Scala")
        set.add("Groovy")
        set.add("Clojure")

        factory.disposeInstanceOf(classOf[java.util.HashSet[String]], "set", false)

        withCache[String, Array[Byte]]("atomicFactoryCache", 1) { nextCache =>
          val nextFactory = AtomicObjectFactory.forCache(nextCache)
          val setAsSaved =
            nextFactory.getInstanceOf(classOf[java.util.HashSet[String]], "set", false, null, false)

          setAsSaved should have size (0)
          setAsSaved should be(empty)

          nextCache should have size (1)
          nextCache.containsKey("HashSet#set") should be(true)
        }
      }
    }

ちょっとよくわからない挙動

AtomicObjectFactory#disposeInstanceOfで保存した後に、残ったインスタンスの参照を操作すると、なぜか他Nodeから見た時にうまく値が取れなくなります。

    it("using HashSet, ignored?") {
      withCache[String, Array[Byte]]("atomicFactoryCache", 2) { cache =>
        val factory = AtomicObjectFactory.forCache(cache)
        val set = factory.getInstanceOf(classOf[java.util.HashSet[String]], "set")

        set.add("Java")
        set.add("Scala")

        factory.disposeInstanceOf(classOf[java.util.HashSet[String]], "set", true)

        set.add("Groovy")

        withCache[String, Array[Byte]]("atomicFactoryCache", 1) { nextCache =>
          val nextFactory = AtomicObjectFactory.forCache(nextCache)
          val setAsSaved =
            nextFactory.getInstanceOf(classOf[java.util.HashSet[String]], "set", false, null, false)

          setAsSaved should have size (0)
          setAsSaved should be(empty)

          nextCache should have size (1)
          nextCache.containsKey("HashSet#set") should be(true)
        }
      }
    }

そもそもメソッドがdisposeなので、いけない操作なのかなーとも思うのですが、どうなのでしょう…。

普通にインスタンスをCacheに登録した時との違いは?

ここまで見ていると、別に普通にHashSetをnewして登録した時と変わらない気もします。むしろ、disposeInstanceOfが増えているだけ面倒…。

    it("using HashSet, direct") {
      withCache[AnyRef, AnyRef]("atomicFactoryCache", 2) { cache =>
        val set = new java.util.HashSet[String]
        set.add("Java")
        set.add("Scala")
        set.add("Groovy")
        set.add("Clojure")

        cache.put("set", set)

        withCache[AnyRef, AnyRef]("atomicFactoryCache", 1) { nextCache =>
          val setAsSaved = nextCache.get("set").asInstanceOf[java.util.HashSet[String]]
          setAsSaved should have size (4)
          setAsSaved should contain only("Java", "Scala", "Groovy", "Clojure")

          nextCache should have size (1)
          nextCache.containsKey("set") should be(true)
        }
      }
    }

ここで、ちょっと違いが出るコードを。まず、両パターンでそれぞれインスタンスを作成して、Cache登録&再取得。

    it("using HashSet, compare AtomicObjectFactory and direct") {
      withCache[AnyRef, AnyRef]("atomicFactoryCache", 2) { cache =>
        val factory = AtomicObjectFactory.forCache(cache)

        // AtomicObjectFactoryから作成
        val setFromFactory = factory.getInstanceOf(classOf[java.util.HashSet[String]], "setFromFactory")
        setFromFactory.add("Java")
        setFromFactory.add("Scala")
        setFromFactory.add("Groovy")
        setFromFactory.add("Clojure")

        factory.disposeInstanceOf(classOf[java.util.HashSet[String]], "setFromFactory", true)

        val setFromFactoryOuter = factory.getInstanceOf(classOf[java.util.HashSet[String]], "setFromFactory", false, null, false)

        // 直接インスタンスを作成
        val setDirect = new java.util.HashSet[String]
        setDirect.add("Java")
        setDirect.add("Scala")
        setDirect.add("Groovy")
        setDirect.add("Clojure")

        cache.put("setDirect", setDirect)

        val setDirectOuter = cache.get("setDirect").asInstanceOf[java.util.HashSet[String]]

次に、別途クラスタにNodeを追加するのですが、この時に先ほど最後に取得したインスタンスに対して操作すると…。

        withCache[AnyRef, AnyRef]("atomicFactoryCache", 1) { nextCache =>
          val nextFactory = AtomicObjectFactory.forCache(nextCache)
          val setFromFactoryAsSaved =
            nextFactory.getInstanceOf(classOf[java.util.HashSet[String]], "setFromFactory", false, null, false)
          setFromFactoryAsSaved should have size (4)
          setFromFactoryAsSaved should contain only("Java", "Scala", "Groovy", "Clojure")

          val setFromDirectAsSaved = nextCache.get("setDirect").asInstanceOf[java.util.HashSet[String]]
          setFromFactoryAsSaved should have size (4)
          setFromFactoryAsSaved should contain only("Java", "Scala", "Groovy", "Clojure")

          // 外側にいたインスタンスに、メンバー追加
          setFromFactoryOuter should be(empty)
          setFromFactoryOuter.add("Kotlin")
          setDirectOuter.add("Kotlin")

          // AtomicObjectFactoryで管理している方には反映される
          setFromFactoryAsSaved should have size (5)
          setFromFactoryAsSaved should contain only("Java", "Scala", "Groovy", "Clojure", "Kotlin")

          // 直接インスタンスを扱っている方には、反映されない
          setFromDirectAsSaved should have size (4)
          setFromDirectAsSaved should contain only("Java", "Scala", "Groovy", "Clojure")
        }

なんと、AtomicObjectFactoryで管理している方にはいきなり結果が反映されます。

ちなみに、このコードでは1度取り直しているのですが

        val setFromFactoryOuter = factory.getInstanceOf(classOf[java.util.HashSet[String]], "setFromFactory", false, null, false)

実際には、最初に作成したインスタンスをそのまま使ってもいいみたいなんですよね…。

どちらかというと重要なのは、以下のコードの後に操作していること、みたいです。

        withCache[AnyRef, AnyRef]("atomicFactoryCache", 1) { nextCache =>
          val nextFactory = AtomicObjectFactory.forCache(nextCache)
          val setFromFactoryAsSaved =
            nextFactory.getInstanceOf(classOf[java.util.HashSet[String]], "setFromFactory", false, null, false)

(別Nodeで)参照するインスタンスを取得した後なら、OKということのようで…。

Updatableと@Update

UpdatableクラスのサブクラスをAtomicObjectFactoryの管理対象にすることで、ちょっと挙動を変えることができます。
src/main/scala/org/littlewings/infinispan/atomicobjectfactory/Person.scala

package org.littlewings.infinispan.atomicobjectfactory

import java.io.{ObjectInput, IOException, ObjectOutput}

import org.infinispan.atomic.{Updatable, Update}

@SerialVersionUID(1L)
class Person(private var firstName: String,
             private var lastName: String,
             private var age: Integer) extends Updatable {
  def this() = this(null, null, 0)

  @Update
  def setFirstName(firstName: String): Unit =
    this.firstName = firstName

  def getFirstName: String = firstName

  @Update
  def setLastName(lastName: String): Unit =
    this.lastName = lastName

  def getLastName: String = lastName

  @Update
  def setAge(age: Integer): Unit =
    this.age = age

  def getAge: Integer = age

  @throws(classOf[IOException])
  override def writeExternal(out: ObjectOutput): Unit = {
    out.writeUTF(firstName)
    out.writeUTF(lastName)
    out.writeInt(age)
  }

  @throws(classOf[IOException])
  override def readExternal(in: ObjectInput): Unit = {
    firstName = in.readUTF()
    lastName = in.readUTF()
    age = in.readInt()
  }
}

Updatableクラスは、Externalizableインターフェースを実装しています。また、作成するクラスには、デフォルトコンストラクタが必要なようです。

Updatableクラスを継承したクラスを利用すると、AtomicObjectFactoryが特別扱いしてくれて、@Updateアノテーションが付与されたメソッドを意識するようになります。これは、AtomicObjectFactory#getInstanceOfのwithReadOptimizationをtrueにした時に作用します。

        val person =
          factory
            .getInstanceOf(classOf[Person],
              "person", // key
              false, // withReadOptimization
              null, // equalsMethod
              true, // forceNew
              "カツオ", "磯野", Integer.valueOf(11)) // args

withReadOptimizationをtrueにすると、更新系のメソッドを除いてローカルに保持しているインスタンスから結果を返そうとします。ここでいう更新系のメソッドの対象になるのが、@Updateアノテーションが付与されたものとなります。更新系のものについては、RPCが行われます(後述)。

なお、デフォルトの更新系メソッドは、List、Set、Mapのadd/addAll、put/putAllになります。

https://github.com/infinispan/infinispan/blob/7.2.2.Final/atomic-factory/src/main/java/org/infinispan/atomic/AtomicObjectFactory.java#L40

利用方法は、他とまあ同じです。

    it("using My Updatable") {
      withCache[String, Array[Byte]]("atomicFactoryCache", 2) { cache =>
        val factory = AtomicObjectFactory.forCache(cache)
        val person =
          factory
            .getInstanceOf(classOf[Person],
              "person", // key
              false, // withReadOptimization
              null, // equalsMethod
              true, // forceNew
              "カツオ", "磯野", Integer.valueOf(11)) // args

        person.getFirstName should be("カツオ")
        person.getLastName should be("磯野")
        person.getAge should be(11)

        person.setFirstName("ワカメ")
        person.setAge(9)

        factory.disposeInstanceOf(classOf[Person], "person", true)

        withCache[String, Array[Byte]]("atomicFactoryCache", 1) { nextCache =>
          val nextFactory = AtomicObjectFactory.forCache(nextCache)
          val personAsSaved =
            nextFactory.getInstanceOf(classOf[Person], "person", false, null, false)

          personAsSaved.getFirstName should be("ワカメ")
          personAsSaved.getLastName should be("磯野")
          personAsSaved.getAge should be(9)
        }
      }
    }

なお、Infinispan側で用意されているUpdatableのサブクラスとしては、ShardedTreeMapというものがあるようです。
https://github.com/infinispan/infinispan/blob/7.2.2.Final/atomic-factory/src/main/java/org/infinispan/atomic/sharded/collections/ShardedTreeMap.java

ところで、今回用意したクラスにはコンストラクタ引数があるのですが、これはAtomicObjectFactory#getInstanceOf時に実は渡すことができます。

>|scala|
        val person =
          factory
            .getInstanceOf(classOf[Person],
              "person", // key
              false, // withReadOptimization
              null, // equalsMethod
              true, // forceNew
              "カツオ", "磯野", Integer.valueOf(11)) // args

第6引数以降が可変長引数になっていて、ここでコンストラクタ引数を渡せます。が、それでもデフォルトコンストラクタは必要なのです。

もう少し裏側について

ちょっと使いこなすのが難しい気がするのですが、わからないなりにコードをさらさらと眺めてみました。

特に、先ほどの別Nodeで更新した結果が、他Nodeにいきなり反映されるところってどうなってるんでしょう?って感じですね。

        withCache[AnyRef, AnyRef]("atomicFactoryCache", 1) { nextCache =>
          val nextFactory = AtomicObjectFactory.forCache(nextCache)
          val setFromFactoryAsSaved =
            nextFactory.getInstanceOf(classOf[java.util.HashSet[String]], "setFromFactory", false, null, false)
          setFromFactoryAsSaved should have size (4)
          setFromFactoryAsSaved should contain only("Java", "Scala", "Groovy", "Clojure")

          val setFromDirectAsSaved = nextCache.get("setDirect").asInstanceOf[java.util.HashSet[String]]
          setFromFactoryAsSaved should have size (4)
          setFromFactoryAsSaved should contain only("Java", "Scala", "Groovy", "Clojure")

          // 外側にいたインスタンスに、メンバー追加
          setFromFactoryOuter should be(empty)
          setFromFactoryOuter.add("Kotlin")
          setDirectOuter.add("Kotlin")

          // AtomicObjectFactoryで管理している方には反映される
          setFromFactoryAsSaved should have size (5)
          setFromFactoryAsSaved should contain only("Java", "Scala", "Groovy", "Clojure", "Kotlin")

          // 直接インスタンスを扱っている方には、反映されない
          setFromDirectAsSaved should have size (4)
          setFromDirectAsSaved should contain only("Java", "Scala", "Groovy", "Clojure")
        }

ここなのですが、どうもAtomicObjectFactoryで生成されたインスタンスは、対象のクラスのプロキシが戻ってきているようです。

https://github.com/infinispan/infinispan/blob/7.2.2.Final/atomic-factory/src/main/java/org/infinispan/atomic/AtomicObjectFactory.java#L228

そして、JavassistによるMethodHandlerが紐付けられていて、各メソッド呼び出しはこのハンドラ越しに行われます。

この時、withReadOptimizationがtrueで更新系でなければローカルで操作が行われ、そうでなければRPCが実行され値がシリアライズされて保存されます。他のNodeから更新された値が見えるようになったのは、これが理由だと思います。

理解半分な感じがしますが、少しは挙動が見えた感じです…。でも、そうそう使われるのかな?ライフサイクル管理が難しそうな印象ですけど、嬉しい局面がどこかであるのかな…。

プロキシまわりのところは、良い勉強になりました。

今回作成したコードは、こちらに置いています。
https://github.com/kazuhira-r/infinispan-getting-started/tree/master/embedded-atomic-object-factory