ちょっと他のライブラリで使っているInfinispanのデフォルトで用意されている設定ファイルを見て、StateTransferの設定がされていたのでそういえば調べてないなぁと思い、ちょっと確認してみました。
ドキュメントには、特に章立てての記載はないので、XML SchemaとかJavadocを見ています。
Infinispan 6.0.0.FinalのXML Schema
http://docs.jboss.org/infinispan/6.0/configdocs/infinispan-config-6.0.html
defaultまたはnamedCacheタグの、clusteringタグ配下にstateTransferタグとして設置して、設定します。
StateTransferは、クラスタリングモード(ReplicationまたはDistributionを選んだ場合)の時に使用し、クラスタにCacheが参加したり出て行った時にどのように状態を移すのかを設定します。
設定自体は、以下の4つの属性で設定可能です。
項目名 | 意味 | デフォルト値 |
---|---|---|
chunkSize | 0より大きい値を設定した場合、Cacheのエントリを設定したチャンクサイズで移します。0以下にすると、全部で1回しか移さないため、推奨しません | 10000 |
fetchInMemoryState | trueに設定した場合、Cacheの開始時に近くのCacheに状態をもらうことを求めます。結果、Cacheはウォームアップされた状態になりますが、Cacheの起動に時間がかかるようになります | Cacheがreplicationまたはdistributionの時はtrue、それ以外はfalse |
awaitInitialTransfer | trueに設定した場合、クラスタに参加したNode上でCacheManager#getCacheを最初に呼び出した時、近くのCacheから状態を受け取り終わるまでブロックします(fetchInMemoryStateをtrueにしていれば)。この項目をfalseに設定することには、注意してください。Cacheはすぐに利用可能になりますが、キーにアクセスした時にまた状態の受け渡しが終わっていなければ、リモートアクセスが発生することになるでしょう。これはアプリケーションのロジックには影響しませんが、パフォーマンスに影響を与えるかもしれません | Cacheがreplicationまたはdistributionの時はtrue、それ以外はfalse |
timeout | 近くのCacheから状態を受け取り終わるまでの、最大待ち時間をミリ秒で設定します。タイムアウトした場合は、例外(org.infinispan.commons.CacheException)がスローされます | 240000 |
それでは、いくつか試してみましょう。
まずは、プロジェクトの定義を。
build.sbt
name := "infinispan-statetransfer" version := "0.0.1-SNAPSHOT" scalaVersion := "2.10.3" organization := "org.littlewings" fork in Test := true scalacOptions ++= Seq("-deprecation") libraryDependencies ++= Seq( "org.infinispan" % "infinispan-core" % "6.0.0.Final", "net.jcip" % "jcip-annotations" % "1.0", "org.scalatest" %% "scalatest" % "2.0" % "test" )
あとは、テストコードを使って挙動を見ていきます。
StateTransferのデフォルト設定。
まずは、デフォルト設定で使ってみます。
src/test/resources/infinispan-simple.xml
<?xml version="1.0" encoding="UTF-8"?> <infinispan xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:infinispan:config:6.0 http://www.infinispan.org/schemas/infinispan-config-6.0.xsd" xmlns="urn:infinispan:config:6.0"> <global> <transport clusterName="statetransfer-cluster"> <properties> <property name="configurationFile" value="jgroups.xml" /> </properties> </transport> <globalJmxStatistics enabled="true" jmxDomain="org.infinispan" cacheManagerName="DefaultCacheManager" allowDuplicateDomains="true" /> <shutdown hookBehavior="REGISTER"/> </global> <default> <clustering mode="repl" /> </default> </infinispan>
クラスタのモードは、Replicationにしました。JMXの設定は、ちょっと都合上allowDuplicateDomainsをtrueにしています。JGroupsの設定は、省略。
あと、テストの共通化のために、こんなトレイトを用意。
src/test/scala/org/littlewings/infinispan/statetransfer/EmbeddedCacheSupport.scala
package org.littlewings.infinispan.statetranfer import org.infinispan.Cache import org.infinispan.manager.DefaultCacheManager trait EmbeddedCacheSupport { def withCache[K, V](configFileName: String, cacheName: String = null)(fun: (Cache[K, V], Long) => Unit): Unit = { val manager = new DefaultCacheManager(configFileName) try { val startTime = System.currentTimeMillis val cache = if (cacheName != null) manager.getCache[K, V](cacheName) else manager.getCache[K, V] val elapsedTime = System.currentTimeMillis - startTime fun(cache, elapsedTime) } finally { manager.stop() } } }
EmbeddedCacheManager#getCacheの時間を測定してくれると同時に、EmbeddedCacheManagerの停止も行います。
では、テストコード。
src/test/scala/org/littlewings/infinispan/statetransfer/DefaultStateTransferSpec.scala
package org.littlewings.infinispan.statetranfer import org.infinispan.manager.DefaultCacheManager import org.scalatest.FunSpec import org.scalatest.Matchers._ class DefaultStateTransferSpec extends FunSpec with EmbeddedCacheSupport { describe("state transfer configuration spec") { it("default state transfer settings") { withCache[String, String]("infinispan-simple.xml") { (cache, _) => val configuration = cache.getCacheConfiguration val stateTransferConfiguration = configuration.clustering.stateTransfer stateTransferConfiguration.awaitInitialTransfer should be (true) stateTransferConfiguration.chunkSize should be (10000) stateTransferConfiguration.fetchInMemoryState should be (true) stateTransferConfiguration.timeout should be (240000) } } } describe("data inserted spec") { it("no data") { withCache[String, String]("infinispan-simple.xml") { (cache1, _) => withCache[String, String]("infinispan-simple.xml") { (cache2, elapsedTime) => elapsedTime should be < 1500L } } } it("many data") { withCache[String, String]("infinispan-simple.xml") { (cache1, _) => (1 to 100000).foreach { i => cache1.put(s"key$i", s"value$i") } withCache[String, String]("infinispan-simple.xml") { (cache2, elapsedTime) => elapsedTime should be < 6000L cache2.get("key100000") should be ("value100000") } } } } }
最初は設定の確認、次はEmbeddedCacheManager#getCacheにかかる時間を、データを入れる/入れないで測定しています。データを入れる場合は、10万エントリにしました。最後に値が取れるかどうかも確認しています。
結果はテストコードの通りなのですが、デフォルトではfetchInMemoryStateもawaitInitialTransferもtrueなので、データを多く登録するとEmbeddedCacheManager#getCacheの呼び出し時に時間がかかるようになりますね。
awaitInitialTransferをfalseに
では、続いてawaitInitialTransferをfalseにしてみます。
src/test/resources/infinispan-awaitInitialTransferOff.xml
<?xml version="1.0" encoding="UTF-8"?> <infinispan xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:infinispan:config:6.0 http://www.infinispan.org/schemas/infinispan-config-6.0.xsd" xmlns="urn:infinispan:config:6.0"> <global> <transport clusterName="statetransfer-cluster"> <properties> <property name="configurationFile" value="jgroups.xml" /> </properties> </transport> <globalJmxStatistics enabled="true" jmxDomain="org.infinispan" cacheManagerName="DefaultCacheManager" allowDuplicateDomains="true" /> <shutdown hookBehavior="REGISTER"/> </global> <default> <clustering mode="repl"> <stateTransfer awaitInitialTransfer="false" /> </clustering> </default> </infinispan>
clusteringタグの下にstateTransferタグを入れ、awaitInitialTransfer属性をfalseにしました。
テストコードは、こんな感じです。
src/test/scala/org/littlewings/infinispan/statetransfer/AwaitInitialTransferOffSpec.scala
package org.littlewings.infinispan.statetranfer import org.infinispan.manager.DefaultCacheManager import org.scalatest.FunSpec import org.scalatest.Matchers._ class AwaitInitialTransferOffSpec extends FunSpec with EmbeddedCacheSupport { describe("awaitInitialTransfer Off state transfer configuration spec") { it("state transfer settings") { withCache[String, String]("infinispan-awaitInitialTransferOff.xml") { (cache, _) => val configuration = cache.getCacheConfiguration val stateTransferConfiguration = configuration.clustering.stateTransfer stateTransferConfiguration.awaitInitialTransfer should be (false) stateTransferConfiguration.chunkSize should be (10000) stateTransferConfiguration.fetchInMemoryState should be (true) stateTransferConfiguration.timeout should be (240000) } } } describe("data inserted spec") { it("no data") { withCache[String, String]("infinispan-awaitInitialTransferOff.xml") { (cache1, _) => withCache[String, String]("infinispan-awaitInitialTransferOff.xml") { (cache2, elapsedTime) => elapsedTime should be < 1000L } } } it("many data") { withCache[String, String]("infinispan-awaitInitialTransferOff.xml") { (cache1, _) => (1 to 100000).foreach { i => cache1.put(s"key$i", s"value$i") } withCache[String, String]("infinispan-awaitInitialTransferOff.xml") { (cache2, elapsedTime) => elapsedTime should be < 1000L cache2.get("key100000") should be ("value100000") } } } } }
デフォルト設定の時との挙動の違いは、データを多く登録してもEmbeddedCacheManager#getCacheの呼び出しに時間がかからないところですね。しかも、値もちゃんと取得できます。
ですが、裏ではこんな例外が投げられたりしています…。
WARN: ISPN000071: Caught exception when handling command CacheTopologyControlCommand{cache=___defaultcache, type=REBALANCE_CONFIRM, sender=xxxxx-yyyyy-11425, joinInfo=null, topologyId=2, currentCH=null, pendingCH=null, throwable=null, viewId=1} org.infinispan.commons.CacheException: Received invalid rebalance confirmation from xxxxx-yyyyy-11425 for cache ___defaultcache, we don't have a rebalance in progress
テストは成功するんですけどね、状態の反映中なんでしょう…。
fetchInMemoryStateをfalseに
最後に、fetchInMemoryStateをfalseにしてみます。
src/test/resources/infinispan-fetchInMemoryStateOff.xml
<?xml version="1.0" encoding="UTF-8"?> <infinispan xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:infinispan:config:6.0 http://www.infinispan.org/schemas/infinispan-config-6.0.xsd" xmlns="urn:infinispan:config:6.0"> <global> <transport clusterName="statetransfer-cluster"> <properties> <property name="configurationFile" value="jgroups.xml" /> </properties> </transport> <globalJmxStatistics enabled="true" jmxDomain="org.infinispan" cacheManagerName="DefaultCacheManager" allowDuplicateDomains="true" /> <shutdown hookBehavior="REGISTER"/> </global> <default> <clustering mode="repl"> <stateTransfer fetchInMemoryState="false" /> </clustering> </default> </infinispan>
テストコード。
src/test/scala/org/littlewings/infinispan/statetransfer/FetchInMemoryStateOffSpec.scala
package org.littlewings.infinispan.statetranfer import org.infinispan.manager.DefaultCacheManager import org.scalatest.FunSpec import org.scalatest.Matchers._ class FetchInMemoryStateOffSpec extends FunSpec with EmbeddedCacheSupport { describe("fetchInMemoryState Off state transfer configuration spec") { it("state transfer settings") { withCache[String, String]("infinispan-fetchInMemoryStateOff.xml") { (cache, _) => val configuration = cache.getCacheConfiguration val stateTransferConfiguration = configuration.clustering.stateTransfer stateTransferConfiguration.awaitInitialTransfer should be (true) stateTransferConfiguration.chunkSize should be (10000) stateTransferConfiguration.fetchInMemoryState should be (false) stateTransferConfiguration.timeout should be (240000) } } } describe("data inserted spec") { it("no data") { withCache[String, String]("infinispan-fetchInMemoryStateOff.xml") { (cache1, _) => withCache[String, String]("infinispan-fetchInMemoryStateOff.xml") { (cache2, elapsedTime) => elapsedTime should be < 1000L } } } it("many data") { withCache[String, String]("infinispan-fetchInMemoryStateOff.xml") { (cache1, _) => (1 to 100000).foreach { i => cache1.put(s"key$i", s"value$i") } withCache[String, String]("infinispan-fetchInMemoryStateOff.xml") { (cache2, elapsedTime) => elapsedTime should be < 1000L cache2.get("key100000") should be (null) } } } } }
デフォルト設定との違いは、メモリ状態を受け渡ししないからでしょうけれど、EmbeddedCacheManager#getCacheの呼び出しが速く終わる、そしてCacheに登録されたエントリが反映されていない(nullになっている)というところですね。
なんとなく、動きはわかりました。基本的には、fetchInMemoryStateとawaitInitialTransferはtrueにして、chunkSizeとtimeoutをチューニングするような感じになるのかなぁ?と思います。fetchInMemoryStateとawaitInitialTransferの設定には、要注意ですね。
勉強になりました。
今回のコードは、こちらにアップしています。
https://github.com/kazuhira-r/infinispan-examples/tree/master/infinispan-statetransfer