CLOVER🍀

That was when it all began.

InfinispanのStateTransferを確認してみる

ちょっと他のライブラリで使っているInfinispanのデフォルトで用意されている設定ファイルを見て、StateTransferの設定がされていたのでそういえば調べてないなぁと思い、ちょっと確認してみました。

ドキュメントには、特に章立てての記載はないので、XML SchemaとかJavadocを見ています。

Infinispan 6.0.0.FinalのXML Schema
http://docs.jboss.org/infinispan/6.0/configdocs/infinispan-config-6.0.html

defaultまたはnamedCacheタグの、clusteringタグ配下にstateTransferタグとして設置して、設定します。

StateTransferは、クラスタリングモード(ReplicationまたはDistributionを選んだ場合)の時に使用し、クラスタにCacheが参加したり出て行った時にどのように状態を移すのかを設定します。

設定自体は、以下の4つの属性で設定可能です。

項目名 意味 デフォルト値
chunkSize 0より大きい値を設定した場合、Cacheのエントリを設定したチャンクサイズで移します。0以下にすると、全部で1回しか移さないため、推奨しません 10000
fetchInMemoryState trueに設定した場合、Cacheの開始時に近くのCacheに状態をもらうことを求めます。結果、Cacheはウォームアップされた状態になりますが、Cacheの起動に時間がかかるようになります Cacheがreplicationまたはdistributionの時はtrue、それ以外はfalse
awaitInitialTransfer trueに設定した場合、クラスタに参加したNode上でCacheManager#getCacheを最初に呼び出した時、近くのCacheから状態を受け取り終わるまでブロックします(fetchInMemoryStateをtrueにしていれば)。この項目をfalseに設定することには、注意してください。Cacheはすぐに利用可能になりますが、キーにアクセスした時にまた状態の受け渡しが終わっていなければ、リモートアクセスが発生することになるでしょう。これはアプリケーションのロジックには影響しませんが、パフォーマンスに影響を与えるかもしれません Cacheがreplicationまたはdistributionの時はtrue、それ以外はfalse
timeout 近くのCacheから状態を受け取り終わるまでの、最大待ち時間をミリ秒で設定します。タイムアウトした場合は、例外(org.infinispan.commons.CacheException)がスローされます 240000

それでは、いくつか試してみましょう。

まずは、プロジェクトの定義を。
build.sbt

name := "infinispan-statetransfer"

version := "0.0.1-SNAPSHOT"

scalaVersion := "2.10.3"

organization := "org.littlewings"

fork in Test := true

scalacOptions ++= Seq("-deprecation")

libraryDependencies ++= Seq(
  "org.infinispan" % "infinispan-core" % "6.0.0.Final",
  "net.jcip" % "jcip-annotations" % "1.0",
  "org.scalatest" %% "scalatest" % "2.0" % "test" 
)

あとは、テストコードを使って挙動を見ていきます。

StateTransferのデフォルト設定。

まずは、デフォルト設定で使ってみます。
src/test/resources/infinispan-simple.xml

<?xml version="1.0" encoding="UTF-8"?>
<infinispan
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="urn:infinispan:config:6.0 http://www.infinispan.org/schemas/infinispan-config-6.0.xsd"
    xmlns="urn:infinispan:config:6.0">

  <global>
    <transport clusterName="statetransfer-cluster">
      <properties>
        <property name="configurationFile" value="jgroups.xml" />
      </properties>
    </transport>

    <globalJmxStatistics
        enabled="true"
        jmxDomain="org.infinispan"
        cacheManagerName="DefaultCacheManager"
        allowDuplicateDomains="true"
        />

    <shutdown hookBehavior="REGISTER"/>
  </global>

  <default>
    <clustering mode="repl" />
  </default>

</infinispan>

クラスタのモードは、Replicationにしました。JMXの設定は、ちょっと都合上allowDuplicateDomainsをtrueにしています。JGroupsの設定は、省略。

あと、テストの共通化のために、こんなトレイトを用意。
src/test/scala/org/littlewings/infinispan/statetransfer/EmbeddedCacheSupport.scala

package org.littlewings.infinispan.statetranfer

import org.infinispan.Cache
import org.infinispan.manager.DefaultCacheManager

trait EmbeddedCacheSupport {
  def withCache[K, V](configFileName: String, cacheName: String = null)(fun: (Cache[K, V], Long) => Unit): Unit = {
    val manager = new DefaultCacheManager(configFileName)
    try {
      val startTime = System.currentTimeMillis

      val cache =
        if (cacheName != null) manager.getCache[K, V](cacheName)
        else manager.getCache[K, V]

      val elapsedTime = System.currentTimeMillis - startTime

      fun(cache, elapsedTime)
    } finally {
      manager.stop()
    }
  }
}

EmbeddedCacheManager#getCacheの時間を測定してくれると同時に、EmbeddedCacheManagerの停止も行います。

では、テストコード。
src/test/scala/org/littlewings/infinispan/statetransfer/DefaultStateTransferSpec.scala

package org.littlewings.infinispan.statetranfer

import org.infinispan.manager.DefaultCacheManager

import org.scalatest.FunSpec
import org.scalatest.Matchers._

class DefaultStateTransferSpec extends FunSpec with EmbeddedCacheSupport {
  describe("state transfer configuration spec") {
    it("default state transfer settings") {
      withCache[String, String]("infinispan-simple.xml") { (cache, _) =>
        val configuration = cache.getCacheConfiguration
        
        val stateTransferConfiguration =
          configuration.clustering.stateTransfer

        stateTransferConfiguration.awaitInitialTransfer should be (true)
        stateTransferConfiguration.chunkSize should be (10000)
        stateTransferConfiguration.fetchInMemoryState should be (true)
        stateTransferConfiguration.timeout should be (240000)
      }
    }
  }

  describe("data inserted spec") {
    it("no data") {
      withCache[String, String]("infinispan-simple.xml") { (cache1, _) =>
        withCache[String, String]("infinispan-simple.xml") { (cache2, elapsedTime) =>
          elapsedTime should be < 1500L
        }
      }
    }

    it("many data") {
      withCache[String, String]("infinispan-simple.xml") { (cache1, _) =>
        (1 to 100000).foreach { i =>
          cache1.put(s"key$i", s"value$i")
        }

        withCache[String, String]("infinispan-simple.xml") { (cache2, elapsedTime) =>
          elapsedTime should be < 6000L
          cache2.get("key100000") should be ("value100000")
        }
      }
    }
  }
}

最初は設定の確認、次はEmbeddedCacheManager#getCacheにかかる時間を、データを入れる/入れないで測定しています。データを入れる場合は、10万エントリにしました。最後に値が取れるかどうかも確認しています。

結果はテストコードの通りなのですが、デフォルトではfetchInMemoryStateもawaitInitialTransferもtrueなので、データを多く登録するとEmbeddedCacheManager#getCacheの呼び出し時に時間がかかるようになりますね。

awaitInitialTransferをfalseに

では、続いてawaitInitialTransferをfalseにしてみます。
src/test/resources/infinispan-awaitInitialTransferOff.xml

<?xml version="1.0" encoding="UTF-8"?>
<infinispan
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="urn:infinispan:config:6.0 http://www.infinispan.org/schemas/infinispan-config-6.0.xsd"
    xmlns="urn:infinispan:config:6.0">

  <global>
    <transport clusterName="statetransfer-cluster">
      <properties>
        <property name="configurationFile" value="jgroups.xml" />
      </properties>
    </transport>

    <globalJmxStatistics
        enabled="true"
        jmxDomain="org.infinispan"
        cacheManagerName="DefaultCacheManager"
        allowDuplicateDomains="true"
        />

    <shutdown hookBehavior="REGISTER"/>
  </global>

  <default>
    <clustering mode="repl">
      <stateTransfer awaitInitialTransfer="false" />
    </clustering>
  </default>

</infinispan>

clusteringタグの下にstateTransferタグを入れ、awaitInitialTransfer属性をfalseにしました。

テストコードは、こんな感じです。
src/test/scala/org/littlewings/infinispan/statetransfer/AwaitInitialTransferOffSpec.scala

package org.littlewings.infinispan.statetranfer

import org.infinispan.manager.DefaultCacheManager

import org.scalatest.FunSpec
import org.scalatest.Matchers._

class AwaitInitialTransferOffSpec extends FunSpec with EmbeddedCacheSupport {
  describe("awaitInitialTransfer Off state transfer configuration spec") {
    it("state transfer settings") {
      withCache[String, String]("infinispan-awaitInitialTransferOff.xml") { (cache, _) =>
        val configuration = cache.getCacheConfiguration
        
        val stateTransferConfiguration =
          configuration.clustering.stateTransfer

        stateTransferConfiguration.awaitInitialTransfer should be (false)
        stateTransferConfiguration.chunkSize should be (10000)
        stateTransferConfiguration.fetchInMemoryState should be (true)
        stateTransferConfiguration.timeout should be (240000)
      }
    }
  }

  describe("data inserted spec") {
    it("no data") {
      withCache[String, String]("infinispan-awaitInitialTransferOff.xml") { (cache1, _) =>
        withCache[String, String]("infinispan-awaitInitialTransferOff.xml") { (cache2, elapsedTime) =>
          elapsedTime should be < 1000L
        }
      }
    }

    it("many data") {
      withCache[String, String]("infinispan-awaitInitialTransferOff.xml") { (cache1, _) =>
        (1 to 100000).foreach { i =>
          cache1.put(s"key$i", s"value$i")
        }

        withCache[String, String]("infinispan-awaitInitialTransferOff.xml") { (cache2, elapsedTime) =>
          elapsedTime should be < 1000L
          cache2.get("key100000") should be ("value100000")
        }
      }
    }
  }
}

デフォルト設定の時との挙動の違いは、データを多く登録してもEmbeddedCacheManager#getCacheの呼び出しに時間がかからないところですね。しかも、値もちゃんと取得できます。

ですが、裏ではこんな例外が投げられたりしています…。

WARN: ISPN000071: Caught exception when handling command CacheTopologyControlCommand{cache=___defaultcache, type=REBALANCE_CONFIRM, sender=xxxxx-yyyyy-11425, joinInfo=null, topologyId=2, currentCH=null, pendingCH=null, throwable=null, viewId=1}
org.infinispan.commons.CacheException: Received invalid rebalance confirmation from xxxxx-yyyyy-11425 for cache ___defaultcache, we don't have a rebalance in progress

テストは成功するんですけどね、状態の反映中なんでしょう…。

fetchInMemoryStateをfalseに

最後に、fetchInMemoryStateをfalseにしてみます。
src/test/resources/infinispan-fetchInMemoryStateOff.xml

<?xml version="1.0" encoding="UTF-8"?>
<infinispan
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="urn:infinispan:config:6.0 http://www.infinispan.org/schemas/infinispan-config-6.0.xsd"
    xmlns="urn:infinispan:config:6.0">

  <global>
    <transport clusterName="statetransfer-cluster">
      <properties>
        <property name="configurationFile" value="jgroups.xml" />
      </properties>
    </transport>

    <globalJmxStatistics
        enabled="true"
        jmxDomain="org.infinispan"
        cacheManagerName="DefaultCacheManager"
        allowDuplicateDomains="true"
        />

    <shutdown hookBehavior="REGISTER"/>
  </global>

  <default>
    <clustering mode="repl">
      <stateTransfer fetchInMemoryState="false" />
    </clustering>
  </default>

</infinispan>

テストコード。
src/test/scala/org/littlewings/infinispan/statetransfer/FetchInMemoryStateOffSpec.scala

package org.littlewings.infinispan.statetranfer

import org.infinispan.manager.DefaultCacheManager

import org.scalatest.FunSpec
import org.scalatest.Matchers._

class FetchInMemoryStateOffSpec extends FunSpec with EmbeddedCacheSupport {
  describe("fetchInMemoryState Off state transfer configuration spec") {
    it("state transfer settings") {
      withCache[String, String]("infinispan-fetchInMemoryStateOff.xml") { (cache, _) =>
        val configuration = cache.getCacheConfiguration
        
        val stateTransferConfiguration =
          configuration.clustering.stateTransfer

        stateTransferConfiguration.awaitInitialTransfer should be (true)
        stateTransferConfiguration.chunkSize should be (10000)
        stateTransferConfiguration.fetchInMemoryState should be (false)
        stateTransferConfiguration.timeout should be (240000)
      }
    }
  }

  describe("data inserted spec") {
    it("no data") {
      withCache[String, String]("infinispan-fetchInMemoryStateOff.xml") { (cache1, _) =>
        withCache[String, String]("infinispan-fetchInMemoryStateOff.xml") { (cache2, elapsedTime) =>
          elapsedTime should be < 1000L
        }
      }
    }

    it("many data") {
      withCache[String, String]("infinispan-fetchInMemoryStateOff.xml") { (cache1, _) =>
        (1 to 100000).foreach { i =>
          cache1.put(s"key$i", s"value$i")
        }

        withCache[String, String]("infinispan-fetchInMemoryStateOff.xml") { (cache2, elapsedTime) =>
          elapsedTime should be < 1000L
          cache2.get("key100000") should be (null)
        }
      }
    }
  }
}

デフォルト設定との違いは、メモリ状態を受け渡ししないからでしょうけれど、EmbeddedCacheManager#getCacheの呼び出しが速く終わる、そしてCacheに登録されたエントリが反映されていない(nullになっている)というところですね。

なんとなく、動きはわかりました。基本的には、fetchInMemoryStateとawaitInitialTransferはtrueにして、chunkSizeとtimeoutをチューニングするような感じになるのかなぁ?と思います。fetchInMemoryStateとawaitInitialTransferの設定には、要注意ですね。

勉強になりました。

今回のコードは、こちらにアップしています。

https://github.com/kazuhira-r/infinispan-examples/tree/master/infinispan-statetransfer